Using the Hadoop MapReduce API

Using the Hadoop mapreduce API

Hadoop’s MapReduce subsystem runs tasks, where each task may be composed of one or more jobs. A job is a MapReduce run of a single map phase and a single reduce phase. A large amount of the complexity in Hadoop is the (very poorly documented) set up and configuration for jobs and tasks.

ToolRunner and Tool

The ToolRunner is a helper class which contains code to parse the command line of a MapReduce job started via the hadoop jar command. It parses “standard” Hadoop command line options, and can use them to modify the configuration of the task (see “Configuration”, below).

The ToolRunner interface is used to run classes implementing the Tool interface, which consists of a run() method that you need to supply. It is in the run() method that you should place the code to set up your MapReduce jobs.

Your class should inherit from Configured, which simply gives a location to store a Configuration object (see below).

You also need to write a stub main() method which uses the ToolRunner interface to parse the common options and set up the configuration:

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyClass extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new MyClass(), args);
        System.exit(res);
    }
    public int run(String[] args) throws Exception {
        // Set up Job(s) and run them
    }
}

Configuration

A Configuration is a key/value store which is used to hold configuration details. This includes all of the Hadoop configuration, read in when the tool starts up. You can also add your own configuration keys to it; these will be available to mappers and reducers, and appears to be the best way of passing parameters to them. Within your tool, you can get hold of the Configuration object through the Configured superclass’s getConf() method:

        Configuration conf = getConf();
        conf.set("speedtrap.junctions.file", args[0]);

Job

The Job is the fundamental unit of Hadoop tasks. It represents a MapReduce run with a single map phase followed by a single reduce phase. You can safely ignore most if not all of the online documentation about the Job class, as it is either outdated (new Job() is deprecated) or simply wrong (Hadoop Javadoc, I'm looking at you).

Create a new Job with the Job.getInstance() method:

        Job getTrips = Job.getInstance(conf, "Speed Trap");

Note that we pass the Configuration object to the job -- so the objects and methods (mappers, reducers) in the job also get access to any configuration you may want to pass around. The job also has a name set on it, which is used to identify the job type in things like the logs.

The Job class defines a large number of methods for setting job parameters. These include things like the data types used for output from the mapper and reducer, the actual mapper and reducer (and combiner) classes to use, sort keys for sorting and grouping, and input and output file formats. It also defines methods for running the job and monitoring it as it executes.

Jar file

A Job needs to know (so that it can distribute the code to the compute nodes) which jar file the code for the job lives in. You can either specify this explicitly with the setJar() method, or, much easier, have it determined automatically by Java's classloader, with the setJarByClass() method:

        getTrips.setJarByClass(SpeedTrap.class);

Job functions: map, combine, reduce

The next most important piece of configuration for a Job is to set the functions which will be used for the map, combine and reduce phases. These are actually classes, encapsulating setup, tear-down and operation functions in a single class (see below for how to write one of these classes). The Job can take one of each of mapper, combiner and reducer. If these are not set, a simple "pass-through" is the default, returning the same output data as its input.

        getTrips.setMapperClass(GTMapper.class);
        getTrips.setCombinerClass(GTCombiner.class);
        getTrips.setReducerClass(GTReducer.class);

Note that in the configuration above, we are passing the class (with .class) to the configuration functions. Sometimes, particularly with the Hadoop library classes, you need to specify a class with a particular set of generic parameters. In this case, you generally need to create a throwaway instance of the class first:

        LongSumReducer<JctPair> lsr = new LongSumReducer<JctPair>();
        getTrips.setReducerClass(lsr.getClass());

Output data types

By default, Hadoop assumes that the input and the output for every process has a LongWritable key and a Text value. If this is not the case, then you need to specify the output format for the relevant phase. The output from the mapper is specified with the setMapOutputKeyClass() and setMapOutputValueClass() methods:

        getTrips.setMapOutputKeyClass(IdDate.class); // Defaults to LongWritable.class
        getTrips.setMapOutputValueClass(Trap.class); // Defaults to Text.class

Both the input and output of any combiner should match these key/value types as well.

The input to the reducer should also match these mapper output types. The output from the reducer can, again, be any key/value types, and is specified by setOutputKeyClass() and setOutputValueClass():

        getTrips.setOutputKeyClass(Text.class);   // Defaults to LongWritable.class
        getTrips.setOutputValueClass(Trip.class); // Defaults to Text.class

If you do not set these, and your output types are not LongWritable and Text, then you will get runtime errors. What errors?

Sorting and grouping

If you want to sort or group your data differently (e.g. use only part of a key to decide which reducer process to send a value to, or using a secondary sort technique to ensure that the reducer receives its data in a given order), then you can control this by specifying the comparator to use for grouping and sorting. A comparator is a class which implements the Comparator interface, and defines a method (compare()) which returns values less than, equal to, or greater than zero, depending on whether its first parameter is less than, equal to, or greater than its second parameter. These comparators can be used to override the default compareTo() method on any class.

For grouping, all records with keys that are equal according to the comparator will be sent to the same reducer process. For sorting, the comparator is used to impose the sort order on the records.

        getTrips.setGroupingComparatorClass(IdDate.CompareId.class);
        getTrips.setSortComparatorClass(IdDate.CompareIdDate.class);

If a sort or grouping comparator is not defined, then the default compareTo() method of the key is used (see WritableComparable, below). To implement a secondary sort, the grouping comparator should operate on a subset of the fields that the sort comparator uses.

Note that when using a secondary sort, in order for the secondary sort criterion to be visible, it must be copied in both the key and the value. For example, to group temperature by year, and sort by year and month, you should use the following configuration:

key: <year, month>
value: <month, temperature>

sort comparator: compare on year first, and then on month if the year is equal
grouping comparator: compare on year

The reducer will receive a single <year, month> key, and a list of <month, temperature> values. The month from the key should be ignored, as it will only show the first month in the list; if you need to use the month value for each record during processing, you should use the month from each value in the list as you read it.

Partitioning

File locations and formats

You will need to specify where your job reads its data from, and where to write its output to. The configuration for this has a slightly peculiar API, in that the format to use is passed the Job it should be used for, rather than telling the job what format it should be using:

        FileInputFormat.addInputPath(getTrips, new Path(args[0]));
        FileOutputFormat.setOutputPath(getTrips, new Path(args[1]));

Note the difference between input and output: input adds an input file to the sequence (allowing a simple concatenation of input files). Output sets an output file (just one). Also see that we're configuring the job (getTrips) by calling static methods on the input/output format.

Hadoop can read and write data in many different formats. By default, plain tab-separated text files are expected to be read and written. However, this causes overhead on pipelines of jobs, in that each job has to parse its input data as a string, and then serialise it to a string on output. It is possible to use binary output formats, or alternative text output formats.

Most of the pre-built input formats are in the org.apache.hadoop.mapreduce.lib.input package, and most of the output formats are in org.apache.hadoop.mapreduce.lib.output. There are other formats elsewhere in the org.apache.hadoop.mapreduce.lib... packages too.

By default, the I/O classes are FileInputFormat and FileOutputFormat, which expect to have a single Text input (with a LongWritable key on input, holding the line number), and plain Text key and value on output -- this means that you have to parse and serialise any structured data multiple times within a processing pipeline. If you have several jobs in an internal pipeline, you could end up spending much of your processing time converting to and from strings (and much of your developer time writing the code to do so).

Instead, you could use sequence files, which are simply files using Hadoop's (fast) internal serialisation mechanisms [Avro]. You need to do two things to use a sequence file: tell the job which I/O class it should be using, with setInputFormatClass() and setOutputFormatClass, and tell the relevant class where it should be reading from or writing to:

        getTrips.setInputFormatClass(SequenceFileInputFormat.class);
        SequenceFileInputFormat.addInputPath(getTrips, new Path(args[0]));

There is a SequenceFileAsBinaryInputFormat (and output format); it is not clear what the immediate difference with the non-binary version is. Both forms write binary data to the file.

Other formats are also available, such as TextInputFormat and TextOutputFormat, which are similar [identical?] to the default File formats:

        getTrips.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(getTrips, new Path(args[0]));

Compression can also be applied to the I/O classes:

Fixme: SequenceFileOutputFormat.setOutputCompressorClass()

Other formats can be used to read from or write to SQL databases, or multiple files with different semantics (e.g. one stream of data to one location; a different stream to somewhere else). See the "Advanced Usage" section below for more on these.

Data types

Hadoop uses its own internal data types, derived from the Writable class. These have the property of fast value-setting for reuse of objects (Java's object creation is very slow, and should be avoided in performance-sensitive areas), and of fast serialisation via the Avro library. Most of the built-in Hadoop data types are defined in the org.apache.hadoop.io package.

Custom data structures

Often, you will need to use a composite data type as a key or a value. Writing these composite types is highly formulaic, and can largely be automated (see the mkmrstruct tool). The main constraint is that the data structure should implement the Writable interface, or the WritableComparable interface if it is going to be a key. When writing a composite data type, you should provide at minimum a default constructor, accessor methods (get*() and put*() for each element), the readFields() and write() methods for the Writable interface, and a toString() method so that it will render properly when serialised to a TextOutputFormat. If you are only going to be serialising to SequenceFileOutputFormat, you can omit the toString() method.

Or... just use the mkmrstruct tool.

Writables

Comparables and Comparators

WritableComparables

Writing a function

The core concepts in MapReduce are the mapper and the reducer: these are functions which operate on subsets of the input data, receiving input records one at a time, and producing output records as a result. For any particular MapReduce Job, a mapper and a reducer function must be supplied by the user, through the setMapperClass() and setReducerClass() methods of the Job. In addition, a combiner class can be supplied, which acts as a reducer for a subset of the records output by a mapper.

Since Java does not have support for functions as first class objects, these functions must be wrapped up in a class. Two base classes are provided: Mapper and Reducer. In both cases, the class has four generic parameters: <KEYIN, VALUEIN, KEYOUT, VALUEOUT>, which give, respectively, the key and value data types for the input and output records.

The KEYOUT and VALUEOUT types for the mapper must match the types set by the setMapOutputKeyClass() and setMapOutputValueClass() methods of the job. Similarly, the KEYOUT and VALUEOUT types for the reducer must match the types set by the setOutputKeyClass() and setOutputValueClass() methods of the job. If these do not match, you will get runtime errors.

A typical mapper definition might look something like this:

[...]
       // Job set-up
        getTrips.setMapperClass(GT.Map.class);
        getTrips.setMapOutputKeyClass(IdDate.class);
        getTrips.setMapOutputValueClass(Trap.class);
[...]

    public static class Map
        extends Mapper<LongWritable, Text, IdDate, Trap> {
        private IdDate key;
        private Trap value;
        private DateFormat date_parser;
        private ParsePosition pos;

        public void setup(Context context) {
            key = new IdDate();
            value = new Trap();
            date_parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
            pos = new ParsePosition(0);
        }

        public void map(LongWritable lineno, Text text, Context context)
            throws IOException, InterruptedException
        {
            // Process the input record into an output key and value
[...]
            // Output the parsed record
[...]
        }
    }
Fixme: make this a nicer example

Mappers and reducers share some other common features: Context@s, and @setup() and cleanup() methods.

The setup() and cleanup() methods can be used to perform one-time initialisation and clean-up of a mapper or reducer. This can be used, for example, to load side-data from a file into structures in memory (e.g. to perform fast lookups in small data sets), or to perform other kinds of initialisation. Each process will have its setup() method called once, and then the map() or reduce() method called multiple times, and finally the cleanup() method called once at the end.

The other important feature shared between mappers and reducers is the Context object. This is passed as a parameter to all of the methods of the class, and, as the name suggests, holds the complete context of the computation. The fundamental use of the Context object is for output. It has a write() method, typically taking two parameters, the key and the value of the record to output. Each call to the write() method outputs a single record; within one call to the map() or reduce() method, the write() method may be called once, many times, or not at all, depending on how many records need to be generated.

The Context object also includes the Configuration that was used for the job (accessible with the getConfiguration() method), all of the Job parameters (many get*() methods), and the capability to retrieve user-defined job counters (getCounter() -- see below).

The interfaces for the Context classes can be found in the documentation under MapContext and ReduceContext.

Mappers

Library mappers

Combiners and reducers

Library reducers

Running a Job

With a set of jobs fully configured in your code, the only remaining thing is to run them. The main Job interface can run jobs either synchronously or asynchronously -- allowing for sequential or parallel execution.

Sequential jobs

The simplest approach is just to submit each job and wait for it to complete before submitting the next. This is the synchronous interface, and consists simply of the waitForCompletion() method of the Job. For simple MapReduce tasks, this is probably the easiest approach to take.

        getTrips.waitForCompletion(true);

Parallel jobs

Sometimes, you will have several jobs which can be performed in parallel, in any order. In this instance, you can use the asynchronous interface. This consists of a submit() method, which starts the job and returns immediately. After submitting a job, the job's progress can be tracked with the setupProgress(), mapProgress() and reduceProgress() methods, and its overall status with the isComplete(), isRetired() and isSuccessful() methods.

        jctStats.submit();
        totDist.submit();
        spdList.submit();

        while(!jctStats.isComplete()
              || !totDist.isComplete()
              || !spdList.isComplete()) {
            Thread.sleep(1000);
            System.out.println(
                "jctStats map: " + (int)(jctStats.mapProgress()*100)
                + "% red: " + (int)(jctStats.reduceProgress()*100)
                + "% totDist map: " + (int)(totDist.mapProgress()*100)
                + "% red: " + (int)(totDist.reduceProgress()*100)
                + "% spdList map: " + (int)(spdList.mapProgress()*100)
                + "% red: " + (int)(spdList.reduceProgress()*100)
                + "%");
        }

Job dependencies

If you have a complex set of jobs with many dependencies between them, you can manage the jobs and ensure that they are started in the correct order using the JobControl class. Fixme: fill this in in detail

Advanced usage

RecordReaders

Database interface

Multiple input/output files