Something More for Research

Explorer of Research #HEMBAD

Hadoop on Ubuntu


Running Hadoop on Ubuntu Linux (Single-Node Cluster)

In this tutorial I will describe the required steps for setting up a pseudo-distributed, single-node Hadoop cluster backed by the Hadoop Distributed File System, running on Ubuntu Linux.

Are you looking for the multi-node cluster tutorial? Just head over there.

Hadoop is a framework written in Java for running applications on large clusters of commodity hardware and incorporates features similar to those of the Google File System (GFS) and of the MapReducecomputing paradigm. Hadoop’s HDFS is a highly fault-tolerant distributed file system and, like Hadoop in general, designed to be deployed on low-cost hardware. It provides high throughput access to application data and is suitable for applications that have large data sets.

The main goal of this tutorial is to get a simple Hadoop installation up and running so that you can play around with the software and learn more about it.

This tutorial has been tested with the following software versions:

  • Ubuntu Linux 10.04 LTS (deprecated: 8.10 LTS, 8.04, 7.10, 7.04)
  • Hadoop 1.0.3, released May 2012

Figure 1: Cluster of machines running Hadoop at Yahoo! (Source: Yahoo!)

Prerequisites

Sun Java 6

Hadoop requires a working Java 1.5+ (aka Java 5) installation. However, using Java 1.6 (aka Java 6) is recommended for running Hadoop. For the sake of this tutorial, I will therefore describe the installation of Java 1.6.

Important Note: The apt instructions below are taken from this SuperUser.com thread. I got notified that the previous instructions that I provided no longer work. Please be aware that adding a third-party repository to your Ubuntu configuration is considered a security risk. If you do not want to proceed with the apt instructions below, feel free to install Sun JDK 6 via alternative means (e.g. by downloading the binary package from Oracle) and then continue with the next section in the tutorial.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# Add the Ferramosca Roberto's repository to your apt repositories
# See https://launchpad.net/~ferramroberto/
#
$ sudo apt-get install python-software-properties
$ sudo add-apt-repository ppa:ferramroberto/java

# Update the source list
$ sudo apt-get update

# Install Sun Java 6 JDK
$ sudo apt-get install sun-java6-jdk

# Select Sun's Java as the default on your machine.
# See 'sudo update-alternatives --config java' for more information.
#
$ sudo update-java-alternatives -s java-6-sun

The full JDK which will be placed in /usr/lib/jvm/java-6-sun (well, this directory is actually a symlink on Ubuntu).

After installation, make a quick check whether Sun’s JDK is correctly set up:

1
2
3
4
user@ubuntu:~# java -version
java version "1.6.0_20"
Java(TM) SE Runtime Environment (build 1.6.0_20-b02)
Java HotSpot(TM) Client VM (build 16.3-b01, mixed mode, sharing)

Adding a dedicated Hadoop system user

We will use a dedicated Hadoop user account for running Hadoop. While that’s not required it is recommended because it helps to separate the Hadoop installation from other software applications and user accounts running on the same machine (think: security, permissions, backups, etc).

1
2
$ sudo addgroup hadoop
$ sudo adduser --ingroup hadoop hduser

This will add the user hduser and the group hadoop to your local machine.

Configuring SSH

Hadoop requires SSH access to manage its nodes, i.e. remote machines plus your local machine if you want to use Hadoop on it (which is what we want to do in this short tutorial). For our single-node setup of Hadoop, we therefore need to configure SSH access to localhost for the hduser user we created in the previous section.

I assume that you have SSH up and running on your machine and configured it to allow SSH public key authentication. If not, there are several online guides available.

First, we have to generate an SSH key for the hduser user.

1
2
3
4
5
6
7
8
9
10
11
12
user@ubuntu:~$ su - hduser
hduser@ubuntu:~$ ssh-keygen -t rsa -P ""
Generating public/private rsa key pair.
Enter file in which to save the key (/home/hduser/.ssh/id_rsa):
Created directory '/home/hduser/.ssh'.
Your identification has been saved in /home/hduser/.ssh/id_rsa.
Your public key has been saved in /home/hduser/.ssh/id_rsa.pub.
The key fingerprint is:
9b:82:ea:58:b4:e0:35:d7:ff:19:66:a6:ef:ae:0e:d2 hduser@ubuntu
The key's randomart image is:
[...snipp...]
hduser@ubuntu:~$

The second line will create an RSA key pair with an empty password. Generally, using an empty password is not recommended, but in this case it is needed to unlock the key without your interaction (you don’t want to enter the passphrase every time Hadoop interacts with its nodes).

Second, you have to enable SSH access to your local machine with this newly created key.

1
hduser@ubuntu:~$ cat $HOME/.ssh/id_rsa.pub >> $HOME/.ssh/authorized_keys

The final step is to test the SSH setup by connecting to your local machine with the hduseruser. The step is also needed to save your local machine’s host key fingerprint to the hduseruser’s known_hosts file. If you have any special SSH configuration for your local machine like a non-standard SSH port, you can define host-specific SSH options in $HOME/.ssh/config (see man ssh_config for more information).

1
2
3
4
5
6
7
8
9
hduser@ubuntu:~$ ssh localhost
The authenticity of host 'localhost (::1)' can't be established.
RSA key fingerprint is d7:87:25:47:ae:02:00:eb:1d:75:4f:bb:44:f9:36:26.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'localhost' (RSA) to the list of known hosts.
Linux ubuntu 2.6.32-22-generic #33-Ubuntu SMP Wed Apr 28 13:27:30 UTC 2010 i686 GNU/Linux
Ubuntu 10.04 LTS
[...snipp...]
hduser@ubuntu:~$

If the SSH connect should fail, these general tips might help:

  • Enable debugging with ssh -vvv localhost and investigate the error in detail.
  • Check the SSH server configuration in /etc/ssh/sshd_config, in particular the options PubkeyAuthentication (which should be set to yes) and AllowUsers (if this option is active, add the hduser user to it). If you made any changes to the SSH server configuration file, you can force a configuration reload with sudo /etc/init.d/ssh reload.

Disabling IPv6

One problem with IPv6 on Ubuntu is that using 0.0.0.0 for the various networking-related Hadoop configuration options will result in Hadoop binding to the IPv6 addresses of my Ubuntu box. In my case, I realized that there’s no practical point in enabling IPv6 on a box when you are not connected to any IPv6 network. Hence, I simply disabled IPv6 on my Ubuntu machine. Your mileage may vary.

To disable IPv6 on Ubuntu 10.04 LTS, open /etc/sysctl.conf in the editor of your choice and add the following lines to the end of the file:

/etc/sysctl.conf
1
2
3
4
# disable ipv6
net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1

You have to reboot your machine in order to make the changes take effect.

You can check whether IPv6 is enabled on your machine with the following command:

1
$ cat /proc/sys/net/ipv6/conf/all/disable_ipv6

A return value of 0 means IPv6 is enabled, a value of 1 means disabled (that’s what we want).

Alternative

You can also disable IPv6 only for Hadoop as documented in HADOOP-3437. You can do so by adding the following line to conf/hadoop-env.sh:

conf/hadoop-env.sh
1
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

Hadoop

Installation

Download Hadoop from the Apache Download Mirrors and extract the contents of the Hadoop package to a location of your choice. I picked /usr/local/hadoop. Make sure to change the owner of all the files to the hduser user and hadoop group, for example:

1
2
3
4
$ cd /usr/local
$ sudo tar xzf hadoop-1.0.3.tar.gz
$ sudo mv hadoop-1.0.3 hadoop
$ sudo chown -R hduser:hadoop hadoop

(Just to give you the idea, YMMV – personally, I create a symlink from hadoop-1.0.3 to hadoop.)

Update $HOME/.bashrc

Add the following lines to the end of the $HOME/.bashrc file of user hduser. If you use a shell other than bash, you should of course update its appropriate configuration files instead of .bashrc.

$HOME/.bashrc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# Set Hadoop-related environment variables
export HADOOP_HOME=/usr/local/hadoop

# Set JAVA_HOME (we will also configure JAVA_HOME directly for Hadoop later on)
export JAVA_HOME=/usr/lib/jvm/java-6-sun

# Some convenient aliases and functions for running Hadoop-related commands
unalias fs &> /dev/null
alias fs="hadoop fs"
unalias hls &> /dev/null
alias hls="fs -ls"

# If you have LZO compression enabled in your Hadoop cluster and
# compress job outputs with LZOP (not covered in this tutorial):
# Conveniently inspect an LZOP compressed file from the command
# line; run via:
#
# $ lzohead /hdfs/path/to/lzop/compressed/file.lzo
#
# Requires installed 'lzop' command.
#
lzohead () {
    hadoop fs -cat $1 | lzop -dc | head -1000 | less
}

# Add Hadoop bin/ directory to PATH
export PATH=$PATH:$HADOOP_HOME/bin

You can repeat this exercise also for other users who want to use Hadoop.

Excursus: Hadoop Distributed File System (HDFS)

Before we continue let us briefly learn a bit more about Hadoop’s distributed file system.

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware. It has many similarities with existing distributed file systems. However, the differences from other distributed file systems are significant. HDFS is highly fault-tolerant and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets. HDFS relaxes a few POSIX requirements to enable streaming access to file system data. HDFS was originally built as infrastructure for the Apache Nutch web search engine project. HDFS is part of the Apache Hadoop project, which is part of the Apache Lucene project.

The following picture gives an overview of the most important HDFS components.

Configuration

Our goal in this tutorial is a single-node setup of Hadoop. More information of what we do in this section is available on the Hadoop Wiki.

hadoop-env.sh

The only required environment variable we have to configure for Hadoop in this tutorial is JAVA_HOME. Open conf/hadoop-env.sh in the editor of your choice (if you used the installation path in this tutorial, the full path is /usr/local/hadoop/conf/hadoop-env.sh) and set the JAVA_HOME environment variable to the Sun JDK/JRE 6 directory.

Change

conf/hadoop-env.sh
1
2
# The java implementation to use.  Required.
# export JAVA_HOME=/usr/lib/j2sdk1.5-sun

to

conf/hadoop-env.sh
1
2
# The java implementation to use.  Required.
export JAVA_HOME=/usr/lib/jvm/java-6-sun

Note: If you are on a Mac with OS X 10.7 you can use the following line to set up JAVA_HOME in conf/hadoop-env.sh.

conf/hadoop-env.sh (on Mac systems)
1
2
# for our Mac users
export JAVA_HOME=`/usr/libexec/java_home`

conf/*-site.xml

In this section, we will configure the directory where Hadoop will store its data files, the network ports it listens to, etc. Our setup will use Hadoop’s Distributed File System, HDFS, even though our little “cluster” only contains our single local machine.

You can leave the settings below “as is” with the exception of the hadoop.tmp.dir parameter – this parameter you must change to a directory of your choice. We will use the directory /app/hadoop/tmp in this tutorial. Hadoop’s default configurations use hadoop.tmp.dir as the base temporary directory both for the local file system and HDFS, so don’t be surprised if you see Hadoop creating the specified directory automatically on HDFS at some later point.

Now we create the directory and set the required ownerships and permissions:

1
2
3
4
$ sudo mkdir -p /app/hadoop/tmp
$ sudo chown hduser:hadoop /app/hadoop/tmp
# ...and if you want to tighten up security, chmod from 755 to 750...
$ sudo chmod 750 /app/hadoop/tmp

If you forget to set the required ownerships and permissions, you will see a java.io.IOException when you try to format the name node in the next section).

Add the following snippets between the <configuration> ... </configuration> tags in the respective configuration XML file.

In file conf/core-site.xml:

conf/core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<property>
  <name>hadoop.tmp.dir</name>
  <value>/app/hadoop/tmp</value>
  <description>A base for other temporary directories.</description>
</property>

<property>
  <name>fs.default.name</name>
  <value>hdfs://localhost:54310</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>

In file conf/mapred-site.xml:

conf/mapred-site.xml
1
2
3
4
5
6
7
8
<property>
  <name>mapred.job.tracker</name>
  <value>localhost:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
</property>

In file conf/hdfs-site.xml:

conf/hdfs-site.xml
1
2
3
4
5
6
7
8
<property>
  <name>dfs.replication</name>
  <value>1</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
</property>

See Getting Started with Hadoop and the documentation in Hadoop’s API Overview if you have any questions about Hadoop’s configuration options.

Formatting the HDFS filesystem via the NameNode

The first step to starting up your Hadoop installation is formatting the Hadoop filesystem which is implemented on top of the local filesystem of your “cluster” (which includes only your local machine if you followed this tutorial). You need to do this the first time you set up a Hadoop cluster.

Do not format a running Hadoop filesystem as you will lose all the data currently in the cluster (in HDFS)!

To format the filesystem (which simply initializes the directory specified by the dfs.name.dirvariable), run the command

1
hduser@ubuntu:~$ /usr/local/hadoop/bin/hadoop namenode -format

The output will look like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop namenode -format
10/05/08 16:59:56 INFO namenode.NameNode: STARTUP_MSG:
/************************************************************
STARTUP_MSG: Starting NameNode
STARTUP_MSG:   host = ubuntu/127.0.1.1
STARTUP_MSG:   args = [-format]
STARTUP_MSG:   version = 0.20.2
STARTUP_MSG:   build = https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20 -r 911707; compiled by 'chrisdo' on Fri Feb 19 08:07:34 UTC 2010
************************************************************/
10/05/08 16:59:56 INFO namenode.FSNamesystem: fsOwner=hduser,hadoop
10/05/08 16:59:56 INFO namenode.FSNamesystem: supergroup=supergroup
10/05/08 16:59:56 INFO namenode.FSNamesystem: isPermissionEnabled=true
10/05/08 16:59:56 INFO common.Storage: Image file of size 96 saved in 0 seconds.
10/05/08 16:59:57 INFO common.Storage: Storage directory .../hadoop-hduser/dfs/name has been successfully formatted.
10/05/08 16:59:57 INFO namenode.NameNode: SHUTDOWN_MSG:
/************************************************************
SHUTDOWN_MSG: Shutting down NameNode at ubuntu/127.0.1.1
************************************************************/
hduser@ubuntu:/usr/local/hadoop$

Starting your single-node cluster

Run the command:

1
hduser@ubuntu:~$ /usr/local/hadoop/bin/start-all.sh

This will startup a Namenode, Datanode, Jobtracker and a Tasktracker on your machine.

The output will look like this:

1
2
3
4
5
6
7
hduser@ubuntu:/usr/local/hadoop$ bin/start-all.sh
starting namenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-namenode-ubuntu.out
localhost: starting datanode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-datanode-ubuntu.out
localhost: starting secondarynamenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-secondarynamenode-ubuntu.out
starting jobtracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-jobtracker-ubuntu.out
localhost: starting tasktracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-tasktracker-ubuntu.out
hduser@ubuntu:/usr/local/hadoop$

A nifty tool for checking whether the expected Hadoop processes are running is jps (part of Sun’s Java since v1.5.0). See also How to debug MapReduce programs.

1
2
3
4
5
6
7
hduser@ubuntu:/usr/local/hadoop$ jps
2287 TaskTracker
2149 JobTracker
1938 DataNode
2085 SecondaryNameNode
2349 Jps
1788 NameNode

You can also check with netstat if Hadoop is listening on the configured ports.

1
2
3
4
5
6
7
8
9
10
11
12
hduser@ubuntu:~$ sudo netstat -plten | grep java
tcp   0  0 0.0.0.0:50070   0.0.0.0:*  LISTEN  1001  9236  2471/java
tcp   0  0 0.0.0.0:50010   0.0.0.0:*  LISTEN  1001  9998  2628/java
tcp   0  0 0.0.0.0:48159   0.0.0.0:*  LISTEN  1001  8496  2628/java
tcp   0  0 0.0.0.0:53121   0.0.0.0:*  LISTEN  1001  9228  2857/java
tcp   0  0 127.0.0.1:54310 0.0.0.0:*  LISTEN  1001  8143  2471/java
tcp   0  0 127.0.0.1:54311 0.0.0.0:*  LISTEN  1001  9230  2857/java
tcp   0  0 0.0.0.0:59305   0.0.0.0:*  LISTEN  1001  8141  2471/java
tcp   0  0 0.0.0.0:50060   0.0.0.0:*  LISTEN  1001  9857  3005/java
tcp   0  0 0.0.0.0:49900   0.0.0.0:*  LISTEN  1001  9037  2785/java
tcp   0  0 0.0.0.0:50030   0.0.0.0:*  LISTEN  1001  9773  2857/java
hduser@ubuntu:~$

If there are any errors, examine the log files in the /logs/ directory.

Stopping your single-node cluster

Run the command

1
hduser@ubuntu:~$ /usr/local/hadoop/bin/stop-all.sh

to stop all the daemons running on your machine.

Example output:

1
2
3
4
5
6
7
hduser@ubuntu:/usr/local/hadoop$ bin/stop-all.sh
stopping jobtracker
localhost: stopping tasktracker
stopping namenode
localhost: stopping datanode
localhost: stopping secondarynamenode
hduser@ubuntu:/usr/local/hadoop$

Running a MapReduce job

We will now run your first Hadoop MapReduce job. We will use the WordCount example jobwhich reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occurred, separated by a tab. More information of what happens behind the scenes is available at the Hadoop Wiki.

Download example input data

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a local temporary directory of choice, for example /tmp/gutenberg.

1
2
3
4
5
6
hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Restart the Hadoop cluster

Restart your Hadoop cluster if it’s not running already.

1
hduser@ubuntu:~$ /usr/local/hadoop/bin/start-all.sh

Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop’s HDFS.

1
2
3
4
5
6
7
8
9
10
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Run the MapReduce job

Now, we actually run the WordCount example job.

1
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output

This command will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the result in the HDFS directory /user/hduser/gutenberg-output.

Note: Some people run the command above and get the following error message:

Exception in thread "main" java.io.IOException: Error opening job jar: hadoop*examples*.jar
at org.apache.hadoop.util.RunJar.main (RunJar.java: 90)
Caused by: java.util.zip.ZipException: error in opening zip file

In this case, re-run the command with the full name of the Hadoop Examples JAR file, for example:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop-examples-1.0.3.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output

Example output of the previous command in the console:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount /user/hduser/gutenberg /user/hduser/gutenberg-output
10/05/08 17:43:00 INFO input.FileInputFormat: Total input paths to process : 3
10/05/08 17:43:01 INFO mapred.JobClient: Running job: job_201005081732_0001
10/05/08 17:43:02 INFO mapred.JobClient:  map 0% reduce 0%
10/05/08 17:43:14 INFO mapred.JobClient:  map 66% reduce 0%
10/05/08 17:43:17 INFO mapred.JobClient:  map 100% reduce 0%
10/05/08 17:43:26 INFO mapred.JobClient:  map 100% reduce 100%
10/05/08 17:43:28 INFO mapred.JobClient: Job complete: job_201005081732_0001
10/05/08 17:43:28 INFO mapred.JobClient: Counters: 17
10/05/08 17:43:28 INFO mapred.JobClient:   Job Counters
10/05/08 17:43:28 INFO mapred.JobClient:     Launched reduce tasks=1
10/05/08 17:43:28 INFO mapred.JobClient:     Launched map tasks=3
10/05/08 17:43:28 INFO mapred.JobClient:     Data-local map tasks=3
10/05/08 17:43:28 INFO mapred.JobClient:   FileSystemCounters
10/05/08 17:43:28 INFO mapred.JobClient:     FILE_BYTES_READ=2214026
10/05/08 17:43:28 INFO mapred.JobClient:     HDFS_BYTES_READ=3639512
10/05/08 17:43:28 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=3687918
10/05/08 17:43:28 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=880330
10/05/08 17:43:28 INFO mapred.JobClient:   Map-Reduce Framework
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce input groups=82290
10/05/08 17:43:28 INFO mapred.JobClient:     Combine output records=102286
10/05/08 17:43:28 INFO mapred.JobClient:     Map input records=77934
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce shuffle bytes=1473796
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce output records=82290
10/05/08 17:43:28 INFO mapred.JobClient:     Spilled Records=255874
10/05/08 17:43:28 INFO mapred.JobClient:     Map output bytes=6076267
10/05/08 17:43:28 INFO mapred.JobClient:     Combine input records=629187
10/05/08 17:43:28 INFO mapred.JobClient:     Map output records=629187
10/05/08 17:43:28 INFO mapred.JobClient:     Reduce input records=102286

Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:

1
2
3
4
5
6
7
8
9
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser
Found 2 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:43 /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
Found 2 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:43 /user/hduser/gutenberg-output/_logs
-rw-r--r--   1 hduser supergroup     880802 2010-05-08 17:43 /user/hduser/gutenberg-output/part-r-00000
hduser@ubuntu:/usr/local/hadoop$

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the "-D" option:

1
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount -D mapred.reduce.tasks=16 /user/hduser/gutenberg /user/hduser/gutenberg-output

An important note about mapred.map.tasks: Hadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but you can specify mapred.reduce.tasks.

Retrieve the job result from HDFS

To inspect the file, you can copy it from HDFS to the local file system. Alternatively, you can use the command

1
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-r-00000

to read the file directly from HDFS without copying it to the local file system. In this tutorial, we will copy the results to the local file system though.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
hduser@ubuntu:/usr/local/hadoop$ mkdir /tmp/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -getmerge /user/hduser/gutenberg-output /tmp/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$ head /tmp/gutenberg-output/gutenberg-output
"(Lo)cra"       1
"1490   1
"1498," 1
"35"    1
"40,"   1
"A      2
"AS-IS".        1
"A_     1
"Absoluti       1
"Alack! 1
hduser@ubuntu:/usr/local/hadoop$

Note that in this specific output the quote signs (“) enclosing the words in the head output above have not been inserted by Hadoop. They are the result of the word tokenizer used in the WordCount example, and in this case they matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.

The command fs -getmerge will simply concatenate any files it finds in the directory you specify. This means that the merged file might (and most likely will) not be sorted.

Hadoop Web Interfaces

Hadoop comes with several web interfaces which are by default (see conf/hadoop-default.xml) available at these locations:

These web interfaces provide concise information about what’s happening in your Hadoop cluster. You might want to give them a try.

NameNode Web Interface (HDFS layer)

The name node web UI shows you a cluster summary including information about total/remaining capacity, live and dead nodes. Additionally, it allows you to browse the HDFS namespace and view the contents of its files in the web browser. It also gives access to the local machine’s Hadoop log files.

By default, it’s available at http://localhost:50070/.

JobTracker Web Interface (MapReduce layer)

The JobTracker web UI provides information about general job statistics of the Hadoop cluster, running/completed/failed jobs and a job history log file. It also gives access to the ‘‘local machine’s’’ Hadoop log files (the machine on which the web UI is running on).

By default, it’s available at http://localhost:50030/.

TaskTracker Web Interface (MapReduce layer)

The task tracker web UI shows you running and non-running tasks. It also gives access to the ‘‘local machine’s’’ Hadoop log files.

By default, it’s available at http://localhost:50060/.

 

Running Hadoop on Ubuntu Linux (Multi-Node Cluster)

In this tutorial I will describe the required steps for setting up a distributed, multi-node Apache Hadoopcluster backed by the Hadoop Distributed File System (HDFS), running on Ubuntu Linux.

Hadoop is a framework written in Java for running applications on large clusters of commodity hardware and incorporates features similar to those of the Google File System (GFS) and of the MapReducecomputing paradigm. Hadoop’s HDFS is a highly fault-tolerant distributed file system and, like Hadoop in general, designed to be deployed on low-cost hardware. It provides high throughput access to

In a previous tutorial, I described how to setup up a Hadoop single-node cluster on an Ubuntu box. The main goal of this tutorial is to get a more sophisticated Hadoop installation up and running, namely building a multi-node cluster using two Ubuntu boxes.

This tutorial has been tested with the following software versions:

  • Ubuntu Linux 10.04 LTS (deprecated: 8.10 LTS, 8.04, 7.10, 7.04)
  • Hadoop 1.0.3, released May 2012

Figure 1: Cluster of machines running Hadoop at Yahoo! (Source: Yahoo!)

Tutorial approach and structure

From two single-node clusters to a multi-node cluster – We will build a multi-node cluster using two Ubuntu boxes in this tutorial. In my humble opinion, the best way to do this for starters is to install, configure and test a “local” Hadoop setup for each of the two Ubuntu boxes, and in a second step to “merge” these two single-node clusters into one multi-node cluster in which one Ubuntu box will become the designated master (but also act as a slave with regard to data storage and processing), and the other box will become only a slave. It’s much easier to track down any problems you might encounter due to the reduced complexity of doing a single-node cluster setup first on each machine.

Figure 2: Tutorial approach and structure

Let’s get started!

Prerequisites

Configuring single-node clusters first

The tutorial approach outlined above means that you should read now my previous tutorial on how to setup up a Hadoop single-node cluster and follow the steps described there to build a single-node Hadoop cluster on each of the two Ubuntu boxes. It is recommended that you use the ‘‘same settings’’ (e.g., installation locations and paths) on both machines, or otherwise you might run into problems later when we will migrate the two machines to the final multi-node cluster setup.

Just keep in mind when setting up the single-node clusters that we will later connect and “merge” the two machines, so pick reasonable network settings etc. now for a smooth transition later.

Done? Let’s continue then!

Now that you have two single-node clusters up and running, we will modify the Hadoop configuration to make one Ubuntu box the “master” (which will also act as a slave) and the other Ubuntu box a “slave”.

Note: We will call the designated master machine just the “master“ from now on and the slave-only machine the “slave“. We will also give the two machines these respective hostnames in their networking setup, most notably in “/etc/hosts“. If the hostnames of your machines are different (e.g. “node01“) then you must adapt the settings in this tutorial as appropriate.

Shutdown each single-node cluster with bin/stop-all.sh before continuing if you haven’t done so already.

Networking

This should come hardly as a surprise, but for the sake of completeness I have to point out that both machines must be able to reach each other over the network. The easiest is to put both machines in the same network with regard to hardware and software configuration, for example connect both machines via a single hub or switch and configure the network interfaces to use a common network such as 192.168.0.x/24.

To make it simple, we will assign the IP address 192.168.0.1 to the master machine and 192.168.0.2 to the slave machine. Update /etc/hosts on both machines with the following lines:

/etc/hosts (for master AND slave)
1
2
192.168.0.1    master
192.168.0.2    slave

SSH access

The hduser user on the master (aka hduser@master) must be able to connect a) to its own user account on the master – i.e. ssh master in this context and not necessarily ssh localhost – and b) to the hduser user account on the slave (aka hduser@slave) via a password-less SSH login. If you followed my single-node cluster tutorial, you just have to add the hduser@master’s public SSH key (which should be in $HOME/.ssh/id_rsa.pub) to the authorized_keys file of hduser@slave (in this user’s $HOME/.ssh/authorized_keys). You can do this manually or use the following SSH command:

Distribute the SSH public key of hduser@master
1
hduser@master:~$ ssh-copy-id -i $HOME/.ssh/id_rsa.pub hduser@slave

This command will prompt you for the login password for user hduser on slave, then copy the public SSH key for you, creating the correct directory and fixing the permissions as necessary.

The final step is to test the SSH setup by connecting with user hduser from the master to the user account hduser on the slave. The step is also needed to save slave’s host key fingerprint to the hduser@master’s known_hosts file.

So, connecting from master to master

1
2
3
4
5
6
7
8
hduser@master:~$ ssh master
The authenticity of host 'master (192.168.0.1)' can't be established.
RSA key fingerprint is 3b:21:b3:c0:21:5c:7c:54:2f:1e:2d:96:79:eb:7f:95.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'master' (RSA) to the list of known hosts.
Linux master 2.6.20-16-386 #2 Thu Jun 7 20:16:13 UTC 2007 i686
...
hduser@master:~$

…and from master to slave.

1
2
3
4
5
6
7
8
hduser@master:~$ ssh slave
The authenticity of host 'slave (192.168.0.2)' can't be established.
RSA key fingerprint is 74:d7:61:86:db:86:8f:31:90:9c:68:b0:13:88:52:72.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'slave' (RSA) to the list of known hosts.
Ubuntu 10.04
...
hduser@slave:~$

Hadoop

Cluster Overview (aka the goal)

The next sections will describe how to configure one Ubuntu box as a master node and the other Ubuntu box as a slave node. The master node will also act as a slave because we only have two machines available in our cluster but still want to spread data storage and processing to multiple machines.

Figure 3: How the final multi-node cluster will look like

The master node will run the “master” daemons for each layer: NameNode for the HDFS storage layer, and JobTracker for the MapReduce processing layer. Both machines will run the “slave” daemons: DataNode for the HDFS layer, and TaskTracker for MapReduce processing layer. Basically, the “master” daemons are responsible for coordination and management of the “slave” daemons while the latter will do the actual data storage and data processing work.

Masters vs. Slaves

Typically one machine in the cluster is designated as the NameNode and another machine the as JobTracker, exclusively. These are the actual “master nodes”. The rest of the machines in the cluster act as both DataNode and TaskTracker. These are the slaves or “worker nodes”.

Configuration

conf/masters (master only)

Despite its name, the conf/masters file defines on which machines Hadoop will start secondary NameNodes in our multi-node cluster. In our case, this is just the master machine. The primary NameNode and the JobTracker will always be the machines on which you run the bin/start-dfs.sh and bin/start-mapred.sh scripts, respectively (the primary NameNode and the JobTracker will be started on the same machine if you run bin/start-all.sh).

Note: You can also start an Hadoop daemon manually on a machine via bin/hadoop-daemon.sh start [namenode | secondarynamenode | datanode | jobtracker | tasktracker], which will not take the “conf/masters“ and “conf/slaves“ files into account.

Here are more details regarding the conf/masters file:

The secondary NameNode merges the fsimage and the edits log files periodically and keeps edits log size within a limit. It is usually run on a different machine than the primary NameNode since its memory requirements are on the same order as the primary NameNode. The secondary NameNode is started by “bin/start-dfs.sh“ on the nodes specified in “conf/masters“ file.

Again, the machine on which bin/start-dfs.sh is run will become the primary NameNode.

On master, update conf/masters that it looks like this:

conf/masters (on master)
1
master

conf/slaves (master only)

The conf/slaves file lists the hosts, one per line, where the Hadoop slave daemons (DataNodes and TaskTrackers) will be run. We want both the master box and the slave box to act as Hadoop slaves because we want both of them to store and process data.

On master, update conf/slaves that it looks like this:

conf/slaves (on master)
1
2
master
slave

If you have additional slave nodes, just add them to the conf/slaves file, one hostname per line.

conf/slaves (on master)
1
2
3
4
5
master
slave
anotherslave01
anotherslave02
anotherslave03
Note: The conf/slaves file on master is used only by the scripts like bin/start-dfs.shor bin/stop-dfs.sh. For example, if you want to add DataNodes on the fly (which is not described in this tutorial yet), you can “manually” start the DataNode daemon on a new slave machine via bin/hadoop-daemon.sh start datanode. Using the conf/slaves file on the master simply helps you to make “full” cluster restarts easier.

conf/*-site.xml (all machines)

You must change the configuration files conf/core-site.xml, conf/mapred-site.xml and conf/hdfs-site.xml on ALL machines as follows.

First, we have to change the fs.default.name parameter (in conf/core-site.xml), which specifies the NameNode (the HDFS master) host and port. In our case, this is the mastermachine.

conf/core-site.xml (ALL machines)
1
2
3
4
5
6
7
8
9
<property>
  <name>fs.default.name</name>
  <value>hdfs://master:54310</value>
  <description>The name of the default file system.  A URI whose
  scheme and authority determine the FileSystem implementation.  The
  uri's scheme determines the config property (fs.SCHEME.impl) naming
  the FileSystem implementation class.  The uri's authority is used to
  determine the host, port, etc. for a filesystem.</description>
</property>

Second, we have to change the mapred.job.tracker parameter (in conf/mapred-site.xml), which specifies the JobTracker (MapReduce master) host and port. Again, this is the master in our case.

conf/mapred-site.xml (ALL machines)
1
2
3
4
5
6
7
8
<property>
  <name>mapred.job.tracker</name>
  <value>master:54311</value>
  <description>The host and port that the MapReduce job tracker runs
  at.  If "local", then jobs are run in-process as a single map
  and reduce task.
  </description>
</property>

Third, we change the dfs.replication parameter (in conf/hdfs-site.xml) which specifies the default block replication. It defines how many machines a single file should be replicated to before it becomes available. If you set this to a value higher than the number of available slave nodes (more precisely, the number of DataNodes), you will start seeing a lot of “(Zero targets found, forbidden1.size=1)” type errors in the log files.

The default value of dfs.replication is 3. However, we have only two nodes available, so we set dfs.replication to 2.

conf/hdfs-site.xml (ALL machines)
1
2
3
4
5
6
7
8
<property>
  <name>dfs.replication</name>
  <value>2</value>
  <description>Default block replication.
  The actual number of replications can be specified when the file is created.
  The default is used if replication is not specified in create time.
  </description>
</property>

Additional Settings

There are some other configuration options worth studying. The following information is taken from the Hadoop API Overview.

In file conf/mapred-site.xml:

“mapred.local.dir“
Determines where temporary MapReduce data is written. It also may be a list of directories.
“mapred.map.tasks“
As a rule of thumb, use 10x the number of slaves (i.e., number of TaskTrackers).
“mapred.reduce.tasks“
As a rule of thumb, use num_tasktrackers * num_reduce_slots_per_tasktracker * 0.99. If num_tasktrackers is small (as in the case of this tutorial), use (num_tasktrackers – 1) * num_reduce_slots_per_tasktracker.

Formatting the HDFS filesystem via the NameNode

Before we start our new multi-node cluster, we must format Hadoop’s distributed filesystem (HDFS) via the NameNode. You need to do this the first time you set up an Hadoop cluster.

Warning: Do not format a running cluster because this will erase all existing data in the HDFS filesytem!

To format the filesystem (which simply initializes the directory specified by the dfs.name.dirvariable on the NameNode), run the command

Format the cluster’s HDFS file system
1
2
3
hduser@master:/usr/local/hadoop$ bin/hadoop namenode -format
... INFO dfs.Storage: Storage directory /app/hadoop/tmp/dfs/name has been successfully formatted.
hduser@master:/usr/local/hadoop$

Background: The HDFS name table is stored on the NameNode’s (here: master) local filesystem in the directory specified by dfs.name.dir. The name table is used by the NameNode to store tracking and coordination information for the DataNodes.

Starting the multi-node cluster

Starting the cluster is performed in two steps.

  1. We begin with starting the HDFS daemons: the NameNode daemon is started on master, and DataNode daemons are started on all slaves (here: master and slave).
  2. Then we start the MapReduce daemons: the JobTracker is started on master, and TaskTracker daemons are started on all slaves (here: master and slave).

HDFS daemons

Run the command bin/start-dfs.sh on the machine you want the (primary) NameNode to run on. This will bring up HDFS with the NameNode running on the machine you ran the previous command on, and DataNodes on the machines listed in the conf/slaves file.

In our case, we will run bin/start-dfs.sh on master:

Start the HDFS layer
1
2
3
4
5
6
7
hduser@master:/usr/local/hadoop$ bin/start-dfs.sh
starting namenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-namenode-master.out
slave: Ubuntu 10.04
slave: starting datanode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-datanode-slave.out
master: starting datanode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-datanode-master.out
master: starting secondarynamenode, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-secondarynamenode-master.out
hduser@master:/usr/local/hadoop$

On slave, you can examine the success or failure of this command by inspecting the log file logs/hadoop-hduser-datanode-slave.log.

Example output:

1
2
3
4
5
6
7
8
9
10
11
12
13
... INFO org.apache.hadoop.dfs.Storage: Storage directory /app/hadoop/tmp/dfs/data is not formatted.
... INFO org.apache.hadoop.dfs.Storage: Formatting ...
... INFO org.apache.hadoop.dfs.DataNode: Opened server at 50010
... INFO org.mortbay.util.Credential: Checking Resource aliases
... INFO org.mortbay.http.HttpServer: Version Jetty/5.1.4
... INFO org.mortbay.util.Container: Started org.mortbay.jetty.servlet.WebApplicationHandler@17a8a02
... INFO org.mortbay.util.Container: Started WebApplicationContext[/,/]
... INFO org.mortbay.util.Container: Started HttpContext[/logs,/logs]
... INFO org.mortbay.util.Container: Started HttpContext[/static,/static]
... INFO org.mortbay.http.SocketListener: Started SocketListener on 0.0.0.0:50075
... INFO org.mortbay.util.Container: Started org.mortbay.jetty.Server@56a499
... INFO org.apache.hadoop.dfs.DataNode: Starting DataNode in: FSDataset{dirpath='/app/hadoop/tmp/dfs/data/current'}
... INFO org.apache.hadoop.dfs.DataNode: using BLOCKREPORT_INTERVAL of 3538203msec

As you can see in slave’s output above, it will automatically format its storage directory (specified by the dfs.data.dir parameter) if it is not formatted already. It will also create the directory if it does not exist yet.

At this point, the following Java processes should run on master

Java processes on master after starting HDFS daemons
1
2
3
4
5
6
hduser@master:/usr/local/hadoop$ jps
14799 NameNode
15314 Jps
14880 DataNode
14977 SecondaryNameNode
hduser@master:/usr/local/hadoop$

(the process IDs don’t matter of course)

…and the following on slave.

Java processes on slave after starting HDFS daemons
1
2
3
4
hduser@slave:/usr/local/hadoop$ jps
15183 DataNode
15616 Jps
hduser@slave:/usr/local/hadoop$

MapReduce daemons

Run the command bin/start-mapred.sh on the machine you want the JobTracker to run on. This will bring up the MapReduce cluster with the JobTracker running on the machine you ran the previous command on, and TaskTrackers on the machines listed in the conf/slaves file.

In our case, we will run bin/start-mapred.sh on master:

Start the MapReduce layer
1
2
3
4
5
6
hduser@master:/usr/local/hadoop$ bin/start-mapred.sh
starting jobtracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hadoop-jobtracker-master.out
slave: Ubuntu 10.04
slave: starting tasktracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-tasktracker-slave.out
master: starting tasktracker, logging to /usr/local/hadoop/bin/../logs/hadoop-hduser-tasktracker-master.out
hduser@master:/usr/local/hadoop$

On slave, you can examine the success or failure of this command by inspecting the log file logs/hadoop-hduser-tasktracker-slave.log. Example output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
... INFO org.mortbay.util.Credential: Checking Resource aliases
... INFO org.mortbay.http.HttpServer: Version Jetty/5.1.4
... INFO org.mortbay.util.Container: Started org.mortbay.jetty.servlet.WebApplicationHandler@d19bc8
... INFO org.mortbay.util.Container: Started WebApplicationContext[/,/]
... INFO org.mortbay.util.Container: Started HttpContext[/logs,/logs]
... INFO org.mortbay.util.Container: Started HttpContext[/static,/static]
... INFO org.mortbay.http.SocketListener: Started SocketListener on 0.0.0.0:50060
... INFO org.mortbay.util.Container: Started org.mortbay.jetty.Server@1e63e3d
... INFO org.apache.hadoop.ipc.Server: IPC Server listener on 50050: starting
... INFO org.apache.hadoop.ipc.Server: IPC Server handler 0 on 50050: starting
... INFO org.apache.hadoop.mapred.TaskTracker: TaskTracker up at: 50050
... INFO org.apache.hadoop.mapred.TaskTracker: Starting tracker tracker_slave:50050
... INFO org.apache.hadoop.ipc.Server: IPC Server handler 1 on 50050: starting
... INFO org.apache.hadoop.mapred.TaskTracker: Starting thread: Map-events fetcher for all reduce tasks on tracker_slave:50050

At this point, the following Java processes should run on master

Java processes on master after starting MapReduce daemons
1
2
3
4
5
6
7
8
hduser@master:/usr/local/hadoop$ jps
16017 Jps
14799 NameNode
15686 TaskTracker
14880 DataNode
15596 JobTracker
14977 SecondaryNameNode
hduser@master:/usr/local/hadoop$

(the process IDs don’t matter of course)

…and the following on slave.

Java processes on slave after starting MapReduce daemons
1
2
3
4
5
hduser@slave:/usr/local/hadoop$ jps
15183 DataNode
15897 TaskTracker
16284 Jps
hduser@slave:/usr/local/hadoop$

Stopping the multi-node cluster

Like starting the cluster, stopping it is done in two steps. The workflow however is the opposite of starting.

  1. We begin with stopping the MapReduce daemons: the JobTracker is stopped on master, and TaskTracker daemons are stopped on all slaves (here: master and slave).
  2. Then we stop the HDFS daemons: the NameNode daemon is stopped on master, and DataNode daemons are stopped on all slaves (here: master and slave).

MapReduce daemons

Run the command bin/stop-mapred.sh on the JobTracker machine. This will shut down the MapReduce cluster by stopping the JobTracker daemon running on the machine you ran the previous command on, and TaskTrackers on the machines listed in the conf/slaves file.

In our case, we will run bin/stop-mapred.sh on master:

Stopping the MapReduce layer
1
2
3
4
5
6
hduser@master:/usr/local/hadoop$ bin/stop-mapred.sh
stopping jobtracker
slave: Ubuntu 10.04
master: stopping tasktracker
slave: stopping tasktracker
hduser@master:/usr/local/hadoop$
Note: The output above might suggest that the JobTracker was running and stopped on “slave“, but you can be assured that the JobTracker ran on “master“.

At this point, the following Java processes should run on master

Java processes on master after stopping MapReduce daemons
1
2
3
4
5
6
hduser@master:/usr/local/hadoop$ jps
14799 NameNode
18386 Jps
14880 DataNode
14977 SecondaryNameNode
hduser@master:/usr/local/hadoop$

…and the following on slave.

Java processes on slave after stopping MapReduce daemons
1
2
3
4
hduser@slave:/usr/local/hadoop$ jps
15183 DataNode
18636 Jps
hduser@slave:/usr/local/hadoop$

HDFS daemons

Run the command bin/stop-dfs.sh on the NameNode machine. This will shut down HDFS by stopping the NameNode daemon running on the machine you ran the previous command on, and DataNodes on the machines listed in the conf/slaves file.

In our case, we will run bin/stop-dfs.sh on master:

Stopping the HDFS layer
1
2
3
4
5
6
7
hduser@master:/usr/local/hadoop$ bin/stop-dfs.sh
stopping namenode
slave: Ubuntu 10.04
slave: stopping datanode
master: stopping datanode
master: stopping secondarynamenode
hduser@master:/usr/local/hadoop$

(again, the output above might suggest that the NameNode was running and stopped on slave, but you can be assured that the NameNode ran on master)

At this point, the only following Java processes should run on master

Java processes on master after stopping HDFS daemons
1
2
3
hduser@master:/usr/local/hadoop$ jps
18670 Jps
hduser@master:/usr/local/hadoop$

…and the following on slave.

Java processes on slave after stopping HDFS daemons
1
2
3
hduser@slave:/usr/local/hadoop$ jps
18894 Jps
hduser@slave:/usr/local/hadoop$

Running a MapReduce job

Just follow the steps described in the section Running a MapReduce job of the single-node cluster tutorial.

I recommend however that you use a larger set of input data so that Hadoop will start several Map and Reduce tasks, and in particular, on both master and slave. After all this installation and configuration work, we want to see the job processed by all machines in the cluster, don’t we?

Here’s the example input data I have used for the multi-node cluster setup described in this tutorial. I added four more Project Gutenberg etexts to the initial three documents mentioned in the single-node cluster tutorial. All etexts should be in plain text us-ascii encoding.

Download these etexts, copy them to HDFS, run the WordCount example MapReduce job on master, and retrieve the job result from HDFS to your local filesystem.

Here’s the example output on master… after executing the MapReduce job…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
hduser@master:/usr/local/hadoop$ bin/hadoop jar hadoop*examples*.jar wordcount  /user/hduser/gutenberg /user/hduser/gutenberg-output
... INFO mapred.FileInputFormat: Total input paths to process : 7
... INFO mapred.JobClient: Running job: job_0001
... INFO mapred.JobClient:  map 0% reduce 0%
... INFO mapred.JobClient:  map 28% reduce 0%
... INFO mapred.JobClient:  map 57% reduce 0%
... INFO mapred.JobClient:  map 71% reduce 0%
... INFO mapred.JobClient:  map 100% reduce 9%
... INFO mapred.JobClient:  map 100% reduce 68%
... INFO mapred.JobClient:  map 100% reduce 100%
.... INFO mapred.JobClient: Job complete: job_0001
... INFO mapred.JobClient: Counters: 11
... INFO mapred.JobClient:   org.apache.hadoop.examples.WordCount$Counter
... INFO mapred.JobClient:     WORDS=1173099
... INFO mapred.JobClient:     VALUES=1368295
... INFO mapred.JobClient:   Map-Reduce Framework
... INFO mapred.JobClient:     Map input records=136582
... INFO mapred.JobClient:     Map output records=1173099
... INFO mapred.JobClient:     Map input bytes=6925391
... INFO mapred.JobClient:     Map output bytes=11403568
... INFO mapred.JobClient:     Combine input records=1173099
... INFO mapred.JobClient:     Combine output records=195196
... INFO mapred.JobClient:     Reduce input groups=131275
... INFO mapred.JobClient:     Reduce input records=195196
... INFO mapred.JobClient:     Reduce output records=131275
hduser@master:/usr/local/hadoop$

…and the logging output on slave for its DataNode daemon…

logs/hadoop-hduser-datanode-slave.log (on slave)
1
2
3
4
5
6
7
8
9
10
11
12
... INFO org.apache.hadoop.dfs.DataNode: Received block blk_5693969390309798974 from  /192.168.0.1
... INFO org.apache.hadoop.dfs.DataNode: Received block blk_7671491277162757352 from /192.168.0.1
<snipp>
... INFO org.apache.hadoop.dfs.DataNode: Served block blk_-7112133651100166921 to /192.168.0.2
... INFO org.apache.hadoop.dfs.DataNode: Served block blk_-7545080504225510279 to /192.168.0.2
... INFO org.apache.hadoop.dfs.DataNode: Served block blk_-4114464184254609514 to /192.168.0.2
... INFO org.apache.hadoop.dfs.DataNode: Served block blk_-4561652742730019659 to /192.168.0.2
<snipp>
... INFO org.apache.hadoop.dfs.DataNode: Received block blk_-2075170214887808716 from /192.168.0.2 and mirrored to /192.168.0.1:50010
... INFO org.apache.hadoop.dfs.DataNode: Received block blk_1422409522782401364 from /192.168.0.2 and mirrored to /192.168.0.1:50010
... INFO org.apache.hadoop.dfs.DataNode: Deleting block blk_-2942401177672711226 file /app/hadoop/tmp/dfs/data/current/blk_-2942401177672711226
... INFO org.apache.hadoop.dfs.DataNode: Deleting block blk_-3019298164878756077 file /app/hadoop/tmp/dfs/data/current/blk_-3019298164878756077

…and on slave for its TaskTracker daemon.

logs/hadoop-hduser-tasktracker-slave.log (on slave)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
... INFO org.apache.hadoop.mapred.TaskTracker: LaunchTaskAction: task_0001_m_000000_0
... INFO org.apache.hadoop.mapred.TaskTracker: LaunchTaskAction: task_0001_m_000001_0
... task_0001_m_000001_0 0.08362164% hdfs://master:54310/user/hduser/gutenberg/ulyss12.txt:0+1561677
... task_0001_m_000000_0 0.07951202% hdfs://master:54310/user/hduser/gutenberg/19699.txt:0+1945731
<snipp>
... task_0001_m_000001_0 0.35611463% hdfs://master:54310/user/hduser/gutenberg/ulyss12.txt:0+1561677
... Task task_0001_m_000001_0 is done.
... task_0001_m_000000_0 1.0% hdfs://master:54310/user/hduser/gutenberg/19699.txt:0+1945731
... LaunchTaskAction: task_0001_m_000006_0
... LaunchTaskAction: task_0001_r_000000_0
... task_0001_m_000000_0 1.0% hdfs://master:54310/user/hduser/gutenberg/19699.txt:0+1945731
... Task task_0001_m_000000_0 is done.
... task_0001_m_000006_0 0.6844295% hdfs://master:54310/user/hduser/gutenberg/132.txt:0+343695
... task_0001_r_000000_0 0.095238104% reduce > copy (2 of 7 at 1.68 MB/s) >
... task_0001_m_000006_0 1.0% hdfs://master:54310/user/hduser/gutenberg/132.txt:0+343695
... Task task_0001_m_000006_0 is done.
... task_0001_r_000000_0 0.14285716% reduce > copy (3 of 7 at 1.02 MB/s) >
<snipp>
... task_0001_r_000000_0 0.14285716% reduce > copy (3 of 7 at 1.02 MB/s) >
... task_0001_r_000000_0 0.23809525% reduce > copy (5 of 7 at 0.32 MB/s) >
... task_0001_r_000000_0 0.6859089% reduce > reduce
... task_0001_r_000000_0 0.7897389% reduce > reduce
... task_0001_r_000000_0 0.86783284% reduce > reduce
... Task task_0001_r_000000_0 is done.
... Received 'KillJobAction' for job: job_0001
... task_0001_r_000000_0 done; removing files.
... task_0001_m_000000_0 done; removing files.
... task_0001_m_000006_0 done; removing files.
... task_0001_m_000001_0 done; removing files.

If you want to inspect the job’s output data, you need to retrieve the job results from HDFS to your local file system (see instructions in the single-node cluster tutorial.

Caveats

java.io.IOException: Incompatible namespaceIDs

If you observe the error “java.io.IOException: Incompatible namespaceIDs” in the logs of a DataNode (logs/hadoop-hduser-datanode-.log), chances are you are affected by issue HDFS-107 (formerly known as HADOOP-1212).

The full error looked like this on my machines:

1
2
3
4
5
6
7
8
9
  ... ERROR org.apache.hadoop.dfs.DataNode: java.io.IOException: Incompatible namespaceIDs in /app/hadoop/tmp/dfs/data: namenode namespaceID = 308967713; datanode namespaceID = 113030094
        at org.apache.hadoop.dfs.DataStorage.doTransition(DataStorage.java:281)
        at org.apache.hadoop.dfs.DataStorage.recoverTransitionRead(DataStorage.java:121)
        at org.apache.hadoop.dfs.DataNode.startDataNode(DataNode.java:230)
        at org.apache.hadoop.dfs.DataNode.(DataNode.java:199)
        at org.apache.hadoop.dfs.DataNode.makeInstance(DataNode.java:1202)
        at org.apache.hadoop.dfs.DataNode.run(DataNode.java:1146)
        at org.apache.hadoop.dfs.DataNode.createDataNode(DataNode.java:1167)
        at org.apache.hadoop.dfs.DataNode.main(DataNode.java:1326)

There are basically two solutions to fix this error as I will describe below.

Solution 1: Start from scratch

This step fixes the problem at the cost of erasing all existing data in the cluster’s HDFS file system.

  1. Stop the full cluster, i.e. both MapReduce and HDFS layers.
  2. Delete the data directory on the problematic DataNode: the directory is specified by dfs.data.dir in conf/hdfs-site.xml; if you followed this tutorial, the relevant directory is /app/hadoop/tmp/dfs/data.
  3. Reformat the NameNode. WARNING: all HDFS data is lost during this process!
  4. Restart the cluster.

When deleting all the HDFS data and starting from scratch does not sound like a good idea (it might be ok during the initial setup/testing), you might give the second approach a try.

Solution 2: Manually update the namespaceID of problematic DataNodes

Big thanks to Jared Stehler for the following suggestion. This workaround is “minimally invasive” as you only have to edit a single file on the problematic DataNodes:

  1. Stop the problematic DataNode(s).
  2. Edit the value of namespaceID in ${dfs.data.dir}/current/VERSION to match the corresponding value of the current NameNode in ${dfs.name.dir}/current/VERSION.
  3. Restart the fixed DataNode(s).

If you followed the instructions in my tutorials, the full paths of the relevant files are:

  • NameNode: /app/hadoop/tmp/dfs/name/current/VERSION
  • DataNode: /app/hadoop/tmp/dfs/data/current/VERSION (background: dfs.data.dir is by default set to ${hadoop.tmp.dir}/dfs/data, and we set hadoop.tmp.dir in this tutorial to /app/hadoop/tmp).

If you wonder how the contents of VERSION look like, here’s one of mine:

contents of current/VERSION
1
2
3
4
5
namespaceID=393514426
storageID=DS-1706792599-10.10.10.1-50010-1204306713481
cTime=1215607609074
storageType=DATA_NODE
layoutVersion=-13
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
Extracts from a Personal Diary

dedicated to the life of a silent girl who eventually learnt to open up

Num3ri v 2.0

I miei numeri - seconda versione

ThuyDX

Just another WordPress.com site

Algunos Intereses de Abraham Zamudio Chauca

Matematica, Linux , Programacion Serial , Programacion Paralela (CPU - GPU) , Cluster de Computadores , Software Cientifico

josephdung

thoughts...

Tech_Raj

A great WordPress.com site

Travel tips

Travel tips

Experience the real life.....!!!

Shurwaat achi honi chahiye ...

Ronzii's Blog

Just your average geek's blog

Karan Jitendra Thakkar

Everything I think. Everything I do. Right here.

VentureBeat

News About Tech, Money and Innovation

Chetan Solanki

Helpful to u, if u need it.....

ScreenCrush

Explorer of Research #HEMBAD

managedCUDA

Explorer of Research #HEMBAD

siddheshsathe

A great WordPress.com site

Ari's

This is My Space so Dont Mess With IT !!

%d bloggers like this: