Trifork Blog

AngularJS training

Introduction to Hadoop

August 4th, 2009 by
| Reply

Recently I was playing around with Hadoop, after a while I really recognized that this was a great technology. Hadoop allows you to write and run your application in a distributed manner and process large amounts of data with it. It consists out of a MapReduce implementation and a distributed file system. Personally I did not have any experience with distributed computing beforehand, but I found MapReduce quiet easily to understand.

In this blog post I will give an introduction to Hadoop by showing a relative simple MapReduce application. This application will count the unique tokens inside text files. With this example I will try to explain how Hadoop works. Before we start creating our example application we need to know the basics of MapReduce itself.

MapReduce

MapReduce is a programming paradigm designed to handle computations that would normally take considerable amount of time to complete (mainly due to large datasets and / or expensive algorithms) and finish them in an acceptable time frame. It was first introduced by Google [ref: whitepaper] as one of the measures they took to handle the vast amount of data they had to process.

In MapReduce your data is modeled into key / value pairs. Modeling your data into this format may sound odd at first, but almost all data can be modeled into this format. This simple data structure allows Hadoop to easily process your data in an efficient manner. The key and value can be anything you choose strings, integers, complex types or dummy types to be ignored.

MapReduce word count example

A MapReduce program has two major phases, the map phase and the reduce phase. The map phase applies user specified logic to the input data. The result of that (a.k.a the intermediate results) is then fed into the reducer phase so it can be aggregated and written into the final result. The input data, intermediate result and final result are all in key / value pair format.
As you can see in the diagram during the map and reduce phases multiple map and reduce jobs are executed simultaneously. MapReduce is also usually described with the following functions:
map (k1,v1) ? list(k2,v2)
reduce (k2,list(v2)) ? list(k3,v3)

Off course there are more things inside MapReduce that make the map and reduce phases really work, but these are more generic functionalities that Hadoop supports out of the box. We will cover them in a moment. It is important to understand that the map phase has to transform the input data into something meaningful that the reduce phase can aggregate on.

Hadoop

The Apache Hadoop project is actually an umbrella for a set of projects that are associated with solving problems the MapReduce way. The Hadoop Core project provides the MapReduce implementation and the distributed file system HDFS (Hadoop Distributed File System). Both can be used separately. In this blog we will focus on Hadoop Core project.

You may probably ask yourself: 'Why do I need a distributed file system?'. To answer this question you need to ask your self the following question: ‘How can I access a large dataset in a fast and concurrent manner?’ One way to go is to invest a lot of money in the best hardware you can find. While this take you a few steps forward, when it comes to really large data sets, you soon hit limits again. The other alternative is to distribute your data and that is why HDFS is used in Hadoop. So each process can access each chunk of data simultaneously, while the data may be scattered across the network. That being said, to get our word counting Hadoop application started we do not have to use HDFS. We can also use the local file system.

Implementing the map function

The first thing we need to create is the map function. This map function needs to collect words (tokens) from the input file(s). The input is a body of text that contains many (possible duplicate) words. The key here is the offset from the start of the input file and the value is a token. The map function needs to transform this into an intermediate structure, so that the reduce function can easily count the unique tokens. One simple way to do is to have the token as key and the number of times it occurred (during map phase) as the value.

MapPhase

As you can see the map function is executed three times. The result of each individual map process can even contain duplicates and between the results there are more duplicates. Actually the counting in the map function is not important as the structure that is created. The reduce function can now easily count the unique occurrences.

Now that we know the basic structure of the map phase, let's translate it to actual code. Hadoop provides an API that we can utilize to create MapReduce applications. It provides Mapper and Reducer interfaces that you can implement for your application. Lets implement our Mapper interface for our word count program.

public class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

     private final static IntWritable one = new IntWritable(1);
     private Text word = new Text();

     public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
         String line = value.toString();
         StringTokenizer itr = new StringTokenizer(line);
         while (itr.hasMoreTokens()) {
             word.set(itr.nextToken());
             output.collect(word, one);
        }
    }

}

As you can see the work is done inside the map method. It splits every line it encounters into tokens and then passes the tokens with count one to a collector. Everything that is passed to a collector will end up in the reduce phase. As you can see there is no counting done in this method, this is done in the Reducer class.

What you also properly noticed is the generic types of the Mapper class. The first two are the key and value types of the input format the latter two are the map result key and value types (these types will be read by the reduce function).

The last parameter is an interesting parameter. Although we don’t use it here, you can use it to report the status of your job the Hadoop environment. Hadoop can show this in for example the web interface or the console output. Without this feature it is hard to monitor the status of a MapReduce job in a production cluster of many nodes.

Implementing the reducer function

The reducer has to do the last part of the word counting job. The map function has already produced a data structure that the reducer can easily utilize. Note that the key / value input format is slightly different then the one Mapper produced. Hadoop has grouped every value under a key. Thus for every key there is now a unique group with one or more values (in our case with one’s). The reduce function can now sum these values without a hassle. The reduce function will output a unique token as key and the total count of value. The grouping is a very important concept in Hadoop and I will cover that in the next part of this blog.

ReducePhase

And now back to the code again. Hadoop provides a Reducer interface that you can implement; let’s take a look at its implementation.

public class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
       int sum = 0;
       while (values.hasNext()) {
           sum += values.next().get();
       }
       output.collect(key, new IntWritable(sum));
   }

}

As you can see it is quiet straight forward and has the same elements as the Mapper (i.e. the key and value pairs specified via generics, the OutputCollector and Reporter).

Gluing it all together

Now that we have created the Mapper and Reducer we somehow need to let Hadoop do its magic and run them as a MapReduce application. In Hadoop this is done via the JobConf class, which we'll construct in a simple java main method. Let’s take a look at it.

public class WordCount {
   public static void main(String[] args) throws Exception {
      JobConf conf = new JobConf(WordCount.class);
      conf.setJobName("wordcount");

      conf.setOutputKeyClass(Text.class);
      conf.setOutputValueClass(IntWritable.class);

      conf.setMapperClass(Map.class);
      conf.setCombinerClass(Reduce.class);
      conf.setReducerClass(Reduce.class);

      conf.setInputFormat(TextInputFormat.class);
      conf.setOutputFormat(TextOutputFormat.class);

      FileInputFormat.setInputPaths(conf, new Path(args[0]));
      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
      JobClient.runJob(conf);
   }
}

As you see in the code you construct a JobConf and set a number of properties. The most obvious ones are off course setting the Mapper and Reducer classes. The last lines sets the input and output path to your JobConf specified via arguments from the console and tells Hadoop to run the application.

There are two things that are set implicitly, but are very important for the internal working of the whole process. The input format and the Partitioner, which can be changed via the JobConf.

The input format tells Hadoop what kind of data it is dealing with and how to split into smaller chunks that can be given to individual map jobs. The default input format is TextInputFormat. It expects the data be in a line based text format. It guarantees that the data split line based and by default is split into chunks of 64MB.

After an individual map job has completed the data needs to be stored in such a way that one reducer job has all the values for one key. This process is called shuffling, and is done by the Partitioner. Its job is to sort the map output and redirect it to the reduce tasks. The default Partitioner that is used is the HashPartitioner which groups the keys (with values) based on the key’s hashcode. In our example this is good enough as each unique token (which is a Java String) has a unique hashcode, but depending on your actual application you may need to provide a custom Partitioner implementation. The shuffling process as described above is very important. If this is not done then the reduce phase cannot be executed in parallel, meaning that only one reduce job will run and aggregate the outputs of many map jobs.

Word count execution overview

Executing a Hadoop application

Now that we have created the glue code, we can execute our Hadoop application. Compile the classes against Hadoop libraries and create a jar containing your application.

Hadoop is by default configured to run in standalone mode and we will use that to run our application. Configuring Hadoop to run in a cluster of multiple nodes is outside the scope of this blog. (I will however cover that in the upcoming techmeeting – see details below)

The next thing to be is to find interesting input data for your application. I suggest to use relative small text files that can be found at www.textfiles.com and then use large text files like Wikipedia xml dumps to see the difference in process time.

Running a Hadoop application in standalone mode is really easy and can be done with the following shell command:
/bin/hadoop jar [path-to-jar] wordcount [path-to-file] [path-to-output-directory]

Well, I guess this is it for now, if you're like me, the more you play around with it the more exciting it gets. I find Hadoop a really cool technology and it really amazes me to see it in action (especially when you run it on a cluster) and see how it “consumes” the input data and creates really useful output data.

If you want to know more about Hadoop and happen to be in Amsterdam on August 6 you are welcome to join our monthly tech. Meeting in the JTeam head quarter office. Visit our web site for more details.

5 Responses

  1. August 6, 2009 at 11:16 by Toolman

    "It guarantees that the data split line based and by default is split into chunks of 64MB"

    Is it really 64 megabytes? I wonder if it is supposed to be kilobytes?

  2. August 6, 2009 at 11:17 by Toolman

    Or is the intention for the dataset to be huuuge?

  3. August 6, 2009 at 20:29 by Martijn van Groningen

    Hey toolman, how is life in NZ?

    1) The 64MB needs more clarification. It depends what filesystem your are using. If you are using Hadoop with HDFS (Hadoop Distributed File System) then the chucks are by default 64MB, this is because the default blocksize of HDFS is 64MB. However if you are using Hadoop in local mode (as in this blog entry) then you are not using HDFS, but the local filesystem. The chucks then are much smaller (I don't recall the actual size).

    2) Yes, the intention for dataset is to be enormous. People have been using it with terabytes of input data and Hadoop can perfectly handle that and is designed to do that is a efficient manner.

  4. [...] Javadocs for similarity algorithms. And of course there are classes to run your recommender on Hadoop and to evaluate the quality of your recommender with training sets. In the next few posts I will go [...]

  5. March 1, 2010 at 20:43 by Michael Chang

    Since Hadoop is based on MapReduce semantics, and since MapReduce was invented by Google who uses a BigTable implementation defaulting to 64MB chunks, then it makes sense that Hadoop would also use the same (albeit probably configurable) defaults. 64MB is large, but then again Google processes HUGE amounts of data which end up making 64MB seem rather small in comparison.

Leave a Reply