Module 5: Advanced MapReduce Features
Posted by Superadmin on November 26 2015 03:12:13
Custom Data Types

Hadoop MapReduce uses typed data at all times when it interacts with user-provided Mappers and Reducers: data read from files into Mappers, emitted by mappers to reducers, and emitted by reducers into output files is all stored in Java objects.

Writable Types

Objects which can be marshaled to or from files and across the network must obey a particular interface, called Writable, which allows Hadoop to read and write the data in a serialized form for transmission. Hadoop provides several stock classes which implement Writable: Text (which stores String data), IntWritable, LongWritable, FloatWritable, BooleanWritable, and several others. The entire list is in the org.apache.hadoop.io package of the Hadoop source (see the API reference).

In addition to these types, you are free to define your own classes which implement Writable. You can organize a structure of virtually any layout to fit your data and be transmitted by Hadoop. As a motivating example, consider a mapper which emits key-value pairs where the key is the name of an object, and the value is its coordinates in some 3-dimensional space. The key is some string-based data, and the value is a structure of the form:

struct point3d {
float x;
float y;
float z;
}
The key can be represented as a Text object, but what about the value? How do we build a Point3D class which Hadoop can transmit? The answer is to implement the Writable interface, which requires two methods:

public interface Writable {
void readFields(DataInput in);
void write(DataOutput out);
}
The first of these methods initializes all of the fields of the object based on data contained in the binary stream in. The latter writes all the information needed to reconstruct the object to the binary stream out. The DataInput and DataOutput classes (part of java.io) contain methods to serialize most basic types of data; the important contract between your readFields() and write() methods is that they read and write the data from and to the binary stream in the same order. The following code implements a Point3D class usable by Hadoop:

public class Point3D implements Writable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}
}
Listing 5.1: A Point class which implements Writable

Custom Key Types

As written, the Point3D type will work as a value type like we require for the mapper problem described above. But what if we want to emit Point3D objects as keys too? In Hadoop MapReduce, if (key, value) pairs sent to a single reduce task include multiple keys, the reducer will process the keys in sorted order. So key types must implement a stricter interface, WritableComparable. In addition to being Writable so they can be transmitted over the network, they also obey Java's Comparable interface. The following code listing extends Point3D to meet this interface:

public class Point3D implements WritableComparable {
public float x;
public float y;
public float z;

public Point3D(float x, float y, float z) {
this.x = x;
this.y = y;
this.z = z;
}

public Point3D() {
this(0.0f, 0.0f, 0.0f);
}

public void write(DataOutput out) throws IOException {
out.writeFloat(x);
out.writeFloat(y);
out.writeFloat(z);
}

public void readFields(DataInput in) throws IOException {
x = in.readFloat();
y = in.readFloat();
z = in.readFloat();
}

public String toString() {
return Float.toString(x) + ", "
+ Float.toString(y) + ", "
+ Float.toString(z);
}

/** return the Euclidean distance from (0, 0, 0) */
public float distanceFromOrigin() {
return (float)Math.sqrt(x*x + y*y + z*z);
}

public int compareTo(Point3D other) {
float myDistance = distanceFromOrigin();
float otherDistance = other.distanceFromOrigin();

return Float.compare(myDistance, otherDistance);
}

public boolean equals(Object o) {
if (!(other instanceof Point3D)) {
return false;
}

Point3D other = (Point3D)o;
return this.x == other.x && this.y == other.y
&& this.z == other.z;
}

public int hashCode() {
return Float.floatToIntBits(x)
^ Float.floatToIntBits(y)
^ Float.floatToIntBits(z);
}
}
Listing 5.2: A WritableComparable version of Point3D

It is important for key types to implement hashCode() as well; the section on Partitioners later in this module explains why. The methods hashCode() and equals() have been provided in this version of the class as well.

Using Custom Types

Now that you have created a custom data type, Hadoop must be told to use it. You can control the output key or value data type for a job by using the setOutputKeyClass() and setOutputValueClass() methods of the JobConf object that defines your job. By default, this will set the types expected as output from both the map and reduce phases. If your Mapper emits different types than the Reducer, you can set the types emitted by the mapper with the JobConf's setMapOutputKeyClass() and setMapOutputValueClass() methods. These implicitly set the input types expected by the Reducer. The types delivered as input to the Mapper are governed by the InputFormat used; see the next section of this module for more details.

Faster Comparison Operations

The default sorting process for keys will read instances of the key type in from a stream, parsing the byte stream with the readFields() method of the key class, and then call the compareTo() method of the key class on the two objects. For faster performance, it may be possible to decide on an ordering between two keys just by looking at the byte streams and without parsing all of the data contained therein. For example, consider comparing strings of text. If characters are read in sequentially, then a decision can be made on their ordering as soon as a character position is found where the two strings differ. Even if all of the bytes for the object must be read in, the object itself does not necessarily need to be instantiated around those bytes. To support this higher-speed sorting mechanism, you can extend the WritableComparator class with a comparator specific to your own data type. In particular, the method which should be overridden is public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2).

The default implementation is in the class org.apache.hadoop.io.WritableComparator. The relevant method has been reproduced here:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
buffer.reset(b1, s1, l1); // parse key1
key1.readFields(buffer);

buffer.reset(b2, s2, l2); // parse key2
key2.readFields(buffer);

} catch (IOException e) {
throw new RuntimeException(e);
}

return compare(key1, key2); // compare them
}
Its operation is exactly as described above; it performs the straightforward comparison of the two objects after they have been individually deserialized from their separate byte streams (the b variables), which each have their own start offset (s) and length (l) attributes. Both objects must be fully constructed and deserialized before comparison can occur. The Text class, on the other hand, allows incremental comparison via its own implementation of this method. The code from org.apache.hadoop.io.Text is shown here:

/** A WritableComparator optimized for Text keys. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(Text.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int n1 = WritableUtils.decodeVIntSize(b1[s1]);
int n2 = WritableUtils.decodeVIntSize(b2[s2]);
return compareBytes(b1, s1+n1, l1-n1, b2, s2+n2, l2-n2);
}
}
The Text object is serialized by first writing its length field to the byte stream, followed by the UTF-encoded string. The method decodeVIntSize determines the length of the integer describing the length of the byte stream. The comparator then skips these bytes, directly comparing the UTF-encoded bytes of the actual string-portion of the stream in the compareBytes() method. As soon as it finds a character in which the two streams differ, it returns a result without examining the rest of the strings.

Note that you do not need to manually specify this comparator's use in your Hadoop programs. Hadoop automatically uses this special comparator implementation for Text data due to the following code being added to Text's static initialization:

static {
// register this comparator
WritableComparator.define(Text.class, new Comparator());
}
Final Writable Notes

Defining custom writable types allows you to intelligently use Hadoop to manipulate higher-level data structures, without needing to use toString() to convert all your data types to text for sending over the network. If you will be using a type in a lot of MapReduce jobs, or you must process a very large volume of them (as is usually the case in Hadoop), defining your own data type classes will provide a significant performance benefit.

Exercise: Assume that we have a mapper which emits line segments as keys and values. A line segment is defined by its endpoints. For our purposes, line segments can be ordered by their lengths. Implement a LineSegment class which implements WritableComparable. Hint: make use of Point3D objects.


Input Formats

The InputFormat defines how to read data from a file into the Mapper instances. Hadoop comes with several implementations of InputFormat; some work with text files and describe different ways in which the text files can be interpreted. Others, like SequenceFileInputFormat, are purpose-built for reading particular binary file formats. These types are described in more detail in Module 4.

More powerfully, you can define your own InputFormat implementations to format the input to your programs however you want. For example, the default TextInputFormat reads lines of text files. The key it emits for each record is the byte offset of the line read (as a LongWritable), and the value is the contents of the line up to the terminating '\n' character (as a Text object). If you have multi-line records each separated by a $ character, you could write your own InputFormat that parses files into records split on this character instead.

Another important job of the InputFormat is to divide the input data sources (e.g., input files) into fragments that make up the inputs to individual map tasks. These fragments are called "splits" and are encapsulated in instances of the InputSplit interface. Most files, for example, are split up on the boundaries of the underlying blocks in HDFS, and are represented by instances of the FileInputSplit class. Other files may be unsplittable, depending on application-specific data. Dividing up other data sources (e.g., tables from a database) into splits would be performed in a different, application-specific fashion. When dividing the data into input splits, it is important that this process be quick and cheap. The data itself should not need to be accessed to perform this process (as it is all done by a single machine at the start of the MapReduce job).

The TextInputFormat divides files into splits strictly by byte offsets. It then reads individual lines of the files from the split in as record inputs to the Mapper. The RecordReader associated with TextInputFormat must be robust enough to handle the fact that the splits do not necessarily correspond neatly to line-ending boundaries. In fact, the RecordReader will read past the theoretical end of a split to the end of a line in one record. The reader associated with the next split in the file will scan for the first full line in the split to begin processing that fragment. All RecordReader implementations must use some similar logic to ensure that they do not miss records that span InputSplit boundaries.

Custom File Formats

In this section we will describe how to develop a custom InputFormat that reads files of a particular format.

Rather than implement InputFormat directly, it is usually best to subclass the FileInputFormat. This abstract class provides much of the basic handling necessary to manipulate files. If we want to parse the file in a particular way, then we must override the getRecordReader() method, which returns an instance of RecordReader: an object that can read from the input source. To motivate this discussion with concrete code, we will develop an InputFormat and RecordReader implementation which can read lists of objects and positions from files. We assume that we are reading text files where each line contains the name of an object and then its coordinates as a set of three comma-separated floating-point values. For instance, some sample data may look like the following:

ball, 3.5, 12.7, 9.0
car, 15, 23.76, 42.23
device, 0.0, 12.4, -67.1
We must read individual lines of the file, separate the key (Text) from the three floats, and then read those into a Point3D object as we developed earlier.

The ObjectPositionInputFormat class itself is very straightforward. Since it will be reading from files, all we need to do is define a factory method for RecordReader implementations:

public class ObjectPositionInputFormat extends
FileInputFormat {

public RecordReader getRecordReader(
InputSplit input, JobConf job, Reporter reporter)
throws IOException {

reporter.setStatus(input.toString());
return new ObjPosRecordReader(job, (FileSplit)input);
}
}
Listing 5.3: InputFormat for object-position files

Note that we define the types of the keys and values emitted by the InputFormat in its definition; these must match the types read in as input by the Mapper in its class definition.

The RecordReader implementation is where the actual file information is read and parsed. We will implement this by making use of the LineRecordReader class; this is the RecordReader implementation used by TextInputFormat to read individual lines from files and return them unparsed. We will wrap the LineRecordReader with our own implementation which converts the values to the expected types. By using LineRecordReader, we do not need to worry about what happens if a record spans an InputSplit boundary, since this underlying record reader already has logic to take care of this fact.

class ObjPosRecordReader implements RecordReader {

private LineRecordReader lineReader;
private LongWritable lineKey;
private Text lineValue;

public ObjPosRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new LineRecordReader(job, split);

lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}

public boolean next(Text key, Point3D value) throws IOException {
// get the next line
if (!lineReader.next(lineKey, lineValue)) {
return false;
}

// parse the lineValue which is in the format:
// objName, x, y, z
String [] pieces = lineValue.toString().split(",");
if (pieces.length != 4) {
throw new IOException("Invalid record received");
}

// try to parse floating point components of value
float fx, fy, fz;
try {
fx = Float.parseFloat(pieces[1].trim());
fy = Float.parseFloat(pieces[2].trim());
fz = Float.parseFloat(pieces[3].trim());
} catch (NumberFormatException nfe) {
throw new IOException("Error parsing floating point value in record");
}

// now that we know we'll succeed, overwrite the output objects

key.set(pieces[0].trim()); // objName is the output key.

value.x = fx;
value.y = fy;
value.z = fz;

return true;
}

public Text createKey() {
return new Text("");
}

public Point3D createValue() {
return new Point3D();
}

public long getPos() throws IOException {
return lineReader.getPos();
}

public void close() throws IOException {
lineReader.close();
}

public float getProgress() throws IOException {
return lineReader.getProgress();
}
}
Listing 5.4: RecordReader for object-position files

You can control the InputFormat used by your MapReduce job with the JobConf.setInputFormat() method.

Exercise: Write an InputFormat and RecordReader that read strings of text separated by '$' characters instead of newlines.

Alternate Data Sources

An InputFormat describes both how to present the data to the Mapper and where the data originates from. Most implementations descend from FileInputFormat, which reads from files on the local machine or HDFS. If your data does not come from a source like this, you can write an InputFormat implementation that reads from an alternate source. For example, HBase (a distributed database system) provides a TableInputFormat that reads records from a database table. You could imagine a system where data is streamed to each machine over the network on a particular port; the InputFormat reads data from the port and parses it into individual records for mapping.

Output Formats

The InputFormat and RecordReader interfaces define how data is read into a MapReduce program. By analogy, the OutputFormat and RecordWriter interfaces dictate how to write the results of a job back to the underlying permanent storage. Several useful OutputFormat implementations are described in Module 4. The default format (TextOutputFormat) will write (key, value) pairs as strings to individual lines of an output file (using the toString() methods of the keys and values). The SequenceFileOutputFormat will keep the data in binary, so it can be later read quickly by the SequenceFileInputFormat. These classes make use of the write() and readFields() methods of the specific Writable classes used by your MapReduce pass.

You can define your own OutputFormat implementation that will write data to an underlying medium in the format that you control. If you want to write to output files on the local system or in HDFS, you should extend the FileOutputFormat abstract class. When you want to use a different output format, you can control this with the JobConf.setOutputFormat() method.

Why might we want to define our own OutputFormat? A custom OutputFormat allows you to exactly control what data is put into a file, and how it is laid out. Suppose another process you use has a custom input file format. Your MapReduce job is supposed to generate inputs compatible with this program. You may develop an OutputFormat implementation which will produce the correct type of file to work with this subsequent process in your tool chain. As an example of how to write an OutputFormat, we will walk through the implementation of a simple XML-based format developed for this tutorial, XmlOutputFormat. Given a set of (key, value) pairs from the Reducer, (e.g., (k1, v1), (k2, v2), etc...) this will generate a file laid out like so:


v1
v2
...

The code to generate these files is presented below:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;

public class XmlOutputFormat extends FileOutputFormat {

protected static class XmlRecordWriter implements RecordWriter {
private static final String utf8 = "UTF-8";

private DataOutputStream out;

public XmlRecordWriter(DataOutputStream out) throws IOException {
this.out = out;
out.writeBytes("\n");
}

/**
* Write the object to the byte stream, handling Text as a special case.
*
* @param o
* the object to print
* @throws IOException
* if the write throws, we pass it on
*/
private void writeObject(Object o) throws IOException {
if (o instanceof Text) {
Text to = (Text) o;
out.write(to.getBytes(), 0, to.getLength());
} else {
out.write(o.toString().getBytes(utf8));
}
}

private void writeKey(Object o, boolean closing) throws IOException {
out.writeBytes("<");
if (closing) {
out.writeBytes("/");
}
writeObject(o);
out.writeBytes(">");
if (closing) {
out.writeBytes("\n");
}
}

public synchronized void write(K key, V value) throws IOException {

boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;

if (nullKey && nullValue) {
return;
}

Object keyObj = key;

if (nullKey) {
keyObj = "value";
}

writeKey(keyObj, false);

if (!nullValue) {
writeObject(value);
}

writeKey(keyObj, true);
}

public synchronized void close(Reporter reporter) throws IOException {
try {
out.writeBytes("
\n");
} finally {
// even if writeBytes() fails, make sure we close the stream
out.close();
}
}
}

public RecordWriter getRecordWriter(FileSystem ignored, JobConf job,
String name, Progressable progress) throws IOException {
Path file = FileOutputFormat.getTaskOutputPath(job, name);
FileSystem fs = file.getFileSystem(job);
FSDataOutputStream fileOut = fs.create(file, progress);
return new XmlRecordWriter(fileOut);
}
}
The FileOutputFormat which XmlOutputFormat subclasses will handle most of the heavy lifting. The only method directly implemented in XmlOutputFormat is getRecordWriter(), which is a factory method for the RecordWriter object which will actually write the file. The inner class XmlRecordWriter is the implementation which generates files in the format shown above. The RecordWriter is initialized with an output stream connected to a file in the output file system. At the same time, the XML prologue is written into the output file. The particular output file system and filename associated with this output stream are determined based on the current job configuration. The XmlRecordWriter's write() method is then called each time a (key, value) pair is provided to the OutputCollector by the Reducer. When the Reducer finishes, the close() method of the XmlRecordWriter will write the XML epilogue and close the underlying stream.

Partitioning Data

"Partitioning" is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same. If the key "cat" is generated in two separate (key, value) pairs, they must both be reduced together. It is also important for performance reasons that the mappers be able to partition data independently -- they should never need to exchange information with one another to determine the partition for a particular key.

Hadoop uses an interface called Partitioner to determine which partition a (key, value) pair will go to. A single partition refers to all (key, value) pairs which will be sent to a single reduce task. Hadoop MapReduce determines when the job starts how many partitions it will divide the data into. If twenty reduce tasks are to be run (controlled by the JobConf.setNumReduceTasks()) method), then twenty partitions must be filled.

The Partitioner defines one method which must be filled:

public interface Partitioner extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}
The getPartition() method receives a key and a value and the number of partitions to split the data across; a number in the range [0, numPartitions) must be returned by this method, indicating which partition to send the key and value to. For any two keys k1 and k2, k1.equals(k2) implies getPartition(k1, *, n) == getPartition(k2, *, n).

The default Partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.

For most randomly-distributed data, this should result in all partitions being of roughly equal size. If the data in your data set is skewed in some way, however, this might not produce good results. For example, if you know that the key 0 will appear much more frequently than any other key, then you may want to send all the 0-keyed data to one partition, and distribute the other keys over all other partitions by their hashCode(). Also, if the hashCode() method for your data type does not provide uniformly-distributed values over its range, then data may not be sent to reducers evenly. Poor partitioning of data means that some reducers will have more data input than others, which usually means they'll have more work to do than the other reducers. Thus the entire job will wait for one reducer to finish its extra-large share of the load, when it might have been possible to spread that across many different reducers.

If you are dissatisfied with the performance of HashPartitioner, you are free to write your own Partitioner implementation. It can be general-purpose, or tailored to the specific data types or values that you expect to use in your application. After implementing the Partitioner interface, use the JobConf.setPartitionerClass() method to tell Hadoop to use it for your job.

Reporting Custom Metrics

The Hadoop system records a set of metric counters for each job that it runs. For example, the number of input records mapped, the number of bytes it reads from or writes to HDFS, etc. To profile your applications, you may wish to record other values as well. For example, if the records sent into your mappers fall into two categories (call them "A" and "B"), you may wish to count the total number of A-records seen vs. the total number of B-records.

The Reporter object passed in to your Mapper and Reducer classes can be used to update counters. The same set of counter variables can be contributed to by all Mapper and Reducer instances across your cluster. The values are aggregated by the master node of the cluster, so they are "thread-safe" in this manner.

Counters are incremented through the Reporter.incrCounter() method. The names of the counters are defined as Java enum's. The following example demonstrates how to count the number of "A" vs. "B" records seen by the mapper:

public class MyMapper extends MapReduceBase implements
Mapper {

static enum RecordCounters { TYPE_A, TYPE_B, TYPE_UNKNOWN };

// actual definitions elided
public boolean isTypeARecord(Text input) { ... }
public boolean isTypeBRecord(Text input) { ... }

public void map(Text key, Text val, OutputCollector output,
Reporter reporter) throws IOException {

if (isTypeARecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_A, 1);
} else if (isTypeBRecord(key)) {
reporter.incrCounter(RecordCounters.TYPE_B, 1);
} else {
reporter.incrCounter(RecordCounters.TYPE_UNKNOWN, 1);
}

// actually process the record here, call
// output.collect( .. ), etc.
}
}
If you launch your job with JobClient.runJob(), the diagnostic information printed to stdout when the job completes will contain the values of all the counters. Both runJob() and submitJob() will return a RunningJob object that refers to the job in question. The RunningJob.getCounters() method will return a Counters object that contains the values of all the counters so that you can query them programmatically. The Counters.getCounter(Enum key) method returns the value of a particular counter.

Distributing Auxiliary Job Data

The bulk of the data that you process in a MapReduce job will probably be stored in large files spread across the HDFS. You can reliably store petabytes of information in HDFS and individual jobs can process several terabytes at a time. The HDFS access model, however, assumes that the data from a file should be read into a single mapper. The individual files stored in HDFS are very large and can possibly be broken into different chunks for processing in parallel.

Sometimes it is necessary for every Mapper to read a single file; for example, a distributed spell-check application would require every Mapper to read in a copy of the dictionary before processing documents. The dictionary will be small (only a few megabytes), but needs to be widely available so that all nodes can reach it.

Hadoop provides a mechanism specifically for this purpose, called the distributed cache. The distributed cache can contain small data files needed for initialization or libraries of code that may need to be accessed on all nodes in the cluster.

To use the distributed cache to disseminate files, create an instance of the DistributedCache class when setting up your job. Use the DistributedCache.addCacheFile() method to add names of files which should be sent to all nodes on the system. The file names are specified as URI objects; unless qualified otherwise, they assume that the file is present on the HDFS in the path indicated. You can copy local files to HDFS with the FileSystem.copyFromLocalFile() method.

When you want to retrieve files from the distributed cache (e.g., when the mapper is in its configure() step and wants to load config data like the dictionary mentioned above), use the DistributedCache.getLocalCacheFiles() method to retrieve the list of paths local to the current node for the cached files. These are copies of all cached files, placed in the local file system of each worker machine. (They will be in a subdirectory of mapred.local.dir.) Each of the paths returned by getLocalCacheFiles() can be accessed via regular Java file I/O mechanisms, such as java.io.FileInputStream.

As a cautionary note: If you use the local JobRunner in Hadoop (i.e., what happens if you call JobClient.runJob() in a program with no or an empty hadoop-conf.xml accessible), then no local data directory is created; the getLocalCacheFiles() call will return an empty set of results. Unit test code should take this into account.

Suppose that we were writing an inverted index builder. We do not want to include very common words such "the," "a," "and," etc. These so-called stop words might all be listed in a file. All the mappers should read the stop word list when they are initialized, and then filter the index they generate against this list. We can disseminate a list of stop words to all the Mappers with the following code. The first listing will put the stop-words file into the distributed cache:

public static final String LOCAL_STOPWORD_LIST =
"/home/aaron/stop_words.txt";

public static final String HDFS_STOPWORD_LIST = "/data/stop_words.txt";

void cacheStopWordList(JobConf conf) throws IOException {
FileSystem fs = FileSystem.get(conf);
Path hdfsPath = new Path(HDFS_STOPWORD_LIST);

// upload the file to hdfs. Overwrite any existing copy.
fs.copyFromLocalFile(false, true, new Path(LOCAL_STOPWORD_LIST),
hdfsPath);

DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
}

This code copies the local stop_words.txt file into HDFS, and then tells the distributed cache to send the HDFS copy to all nodes in the system. The next listing actually uses the file in the mapper:

class IndexMapperExample implements Mapper {
void configure(JobConf conf) {
try {
String stopwordCacheName = new Path(HDFS_STOPWORD_LIST).getName();
Path [] cacheFiles = DistributedCache.getLocalCacheFiles(conf);
if (null != cacheFiles && cacheFiles.length > 0) {
for (Path cachePath : cacheFiles) {
if (cachePath.getName().equals(stopwordCacheName)) {
loadStopWords(cachePath);
break;
}
}
}
} catch (IOException ioe) {
System.err.println("IOException reading from distributed cache");
System.err.println(ioe.toString());
}
}

void loadStopWords(Path cachePath) throws IOException {
// note use of regular java.io methods here - this is a local file now
BufferedReader wordReader = new BufferedReader(
new FileReader(cachePath.toString()));
try {
String line;
this.stopWords = new HashSet();
while ((line = wordReader.readLine()) != null) {
this.stopWords.add(line);
}
} finally {
wordReader.close();
}
}

/* actual map() method, etc go here */
}
The code above belongs in the Mapper instance associated with the index generation process. We retrieve the list of files cached in the distributed cache. We then compare the basename of each file (using Path.getName()) with the one we expect for our stop word list. Once we find this file, we read the words, one per line, into a Set instance that we will consult during the mapping process.

The distributed cache has additional uses too. For instance, you can use the DistributedCache.addArchiveToClassPath() method to send a .jar file to all the nodes. It will be inserted into the classpath as well, so that classes in the archive can be accessed by all the nodes.

Distributing Debug Scripts

Hadoop will generate a large number of log files for a job, distributed across all the nodes that participated in the job's execution. Often times only a subset of these logs will be of interest when debugging failing tasks. MapReduce can help with this by running a user-provided script when either a map or reduce task fails. These scripts are provided the names of files containing the stdout and stderr from the task, as well as the task's Hadoop log and job.xml file (i.e., its complete JobConf in serialized form).

These scripts will be run on whichever node encounters failing tasks. You can use these scripts to perform automation to allow you to more easily inspect only the failing tasks: e.g., email the stdout/stderr to an administrator email address; upload the failed task's log files to a common NFS-mounted "debug dump" directory, preserve local state modifications made by map tasks, etc.

Separate scripts can be provided for map and reduce task failure. They each receive as arguments, in order, the names of files containing the task's stdout, stderr, syslog, and jobconf. Because they are run on all the task nodes, and not on the client machine where the job was submitted, these scripts must be sent to the nodes through the distributed cache listed above.

The following method will enable failed task scripts on a MapReduce job being prepared. It assumes that you have given it the names of two scripts (e.g., bash scripts) which do your debug actions with the log filenames provided (e.g., copy them to a shared NFS mount). In this script these are /home/aaron/src/map-fail and reduce-fail.

private static final String FAILED_MAP_SCRIPT_NAME = "map-fail";
private static final String FAILED_REDUCE_SCRIPT_NAME = "reduce-fail";

private static final String HDFS_SCRIPT_DIR = "/debug";

private static final String HDFS_FAILED_MAP_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_MAP_SCRIPT_NAME;
private static final String HDFS_FAILED_REDUCE_SCRIPT =
HDFS_SCRIPT_DIR + "/" + FAILED_REDUCE_SCRIPT_NAME;
private static final String LOCAL_FAILED_MAP_SCRIPT =
"/home/aaron/src/" + FAILED_MAP_SCRIPT_NAME;
private static final String LOCAL_FAILED_REDUCE_SCRIPT =
"/home/aaron/src/" + FAILED_REDUCE_SCRIPT_NAME;

public static void setupFailedTaskScript(JobConf conf) throws IOException {

// create a directory on HDFS where we'll upload the fail scripts
FileSystem fs = FileSystem.get(conf);
Path debugDir = new Path(HDFS_SCRIPT_DIR);

// who knows what's already in this directory; let's just clear it.
if (fs.exists(debugDir)) {
fs.delete(debugDir, true);
}

// ...and then make sure it exists again
fs.mkdirs(debugDir);

// upload the local scripts into HDFS
fs.copyFromLocalFile(new Path(LOCAL_FAILED_MAP_SCRIPT),
new Path(HDFS_FAILED_MAP_SCRIPT));
fs.copyFromLocalFile(new Path(LOCAL_FAILED_REDUCE_SCRIPT),
new Path(HDFS_FAILED_REDUCE_SCRIPT));

conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);

URI fsUri = fs.getUri();

String mapUriStr = fsUri.toString() + HDFS_FAILED_MAP_SCRIPT
+ "#" + FAILED_MAP_SCRIPT_NAME;
URI mapUri = null;
try {
mapUri = new URI(mapUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}

DistributedCache.addCacheFile(mapUri, conf);

String reduceUriStr = fsUri.toString() + HDFS_FAILED_REDUCE_SCRIPT
+ "#" + FAILED_REDUCE_SCRIPT_NAME;
URI reduceUri = null;
try {
reduceUri = new URI(reduceUriStr);
} catch (URISyntaxException use) {
throw new IOException(use);
}

DistributedCache.addCacheFile(reduceUri, conf);
}
How does this all work? The scripts are, presumably, initially hosted on the client machine that is submitting the job. The client is responsible for injecting them into HDFS and enabling them in the distributed cache. It first creates the HDFS_SCRIPT_DIR and then uploads the local script files into this directory.

It must then set the commands for the TaskTracker to execute to run the scripts. This is accomplished by the lines:

conf.setMapDebugScript("./" + FAILED_MAP_SCRIPT_NAME);
conf.setReduceDebugScript("./" + FAILED_REDUCE_SCRIPT_NAME);
DistributedCache.createSymlink(conf);
The distributed cache copies the files to the mapred.local.dir on each task node. The TaskTracker will then execute the scripts if necessary. But the TaskTracker does not run with its working directory set to mapred.local.dir. Fortunately, the distributed cache can be told to create symlinks in the working directory for files in the distributed cache. The third line of the snippit above enables this functionality. Now ./FAILED_MAP_SCRIPT_NAME will point to the copy of FAILED_MAP_SCRIPT_NAME in the local cache directory, and the script can be run.

But before that can happen, we must add the files themselves to the distributed cache. (As of now they are only in HDFS.) Ordinarily, we could just call DistributedCache.addCacheFile(new Path("hdfs_path_to_some_file").toUri()) on a filename and that would be sufficient. But since we need to create symlinks, we must provide the distributed cache with information as to how the symlink should be created--what filename it should take in the working directory. This is provided as the URI "anchor" part following the "#" in the URI. A subtlety of Hadoop's Path class is that if you put a '#' in the path string, it will URL-encode it and treat it as part of the filename. Therefore, we use some extra code to construct our URIs manually to ensure that the '#' remains unescaped.

Using Amazon Web Services

Hadoop's power comes from its ability to perform work on a large number of machines simultaneously. What if you want to experiment with Hadoop, but do not have many machines? While operations on a two or four-node cluster are functionally equivalent to those on a 40 or 100-node cluster, processing larger volumes of data will require a larger number of nodes.

Amazon provides machines for rent on demand through their Elastic Compute Cloud (a.k.a. EC2) service. EC2 is part of a broader set of services collectively called the Amazon Web Services, or AWS. EC2 allows you request a set of nodes ("instances" in their parlance) for as long as you need them. You pay by the instance*hour, plus costs for bandwidth. You can use EC2 instances to run a Hadoop cluster. Hadoop comes with a set of scripts which will provision EC2 instances.

The first step in this process is visit the EC2 web site (link above) and click "Sign Up For This Web Service". You will need to create an account and provide billing information. Then follow the instructions in the Getting started guide to set up your account and configure your system to run the AWS tools.

Once you have done so, follow the instructions in the Hadoop wiki specific to running Hadoop on Amazon EC2. While more details are available in the above document, the shortest steps to provisioning a cluster are:

Edit src/contrib/ec2/bin/hadoop-ec2-env.sh to contain your Amazon account information and parameters about the desired cluster size.
Execute src/contrib/ec2/bin/hadoop-ec2 launch-cluster.
After the cluster has been started, you can log in to the head node over ssh with the bin/hadoop-ec2 login script, and perform your MapReduce computation. When you are done, log out and type bin/hadoop-ec2 terminate-cluster to release the EC2 instances. The contents of the virtual hard drives on the instances will disappear, so be sure to copy off any important data with scp or another tool first!

A very thorough introduction to configuring Hadoop on EC2 and running a test job is provided in this article in the Amazon Web Services Developer Connection site.