Introduction
After spending more and more time working with Scala I found myself attempting to apply the language to more and more domains. One candidate was a long-standing issue with the mechanism used by Hadoop to express map, reduce and/or combine functions. These functions are expressed within the framework via a pair of interfaces, Mapper and Reducer. Jobs then add class references to concrete implementations of these interfaces.
The problem
So what's the problem? These interfaces largely exist to define a single function to perform either a map or reduce operation. Both Mapper and Reducer include a setup() and cleanup() method (as well as a run() method to change runtime behaviour if needed) but the heavy lifting for both is performed within a single method (map() and reduce() respectively). Strip away the overhead and it's clear that in a fair number of cases we really only need an anonymous block of code to implement our map or reduce functions. At least some of the setup work could be removed if this anonymous block of code could somehow "contain" the environment that was present when the function was defined.
It should be obvious that what's being described here is a closure. Scala supports closures, but so does Groovy, JRuby and several other JVM languages. Why prefer Scala to these options? It's all a matter of typing; my belief was that the typing of closures in Scala would provide easier integration with Hadoop. This assumption may prove to be incorrect in the long run (Clojure's strong Java integration might make it a better fit for example) but it does provide a starting point.
Note that defining these functions inline would prevent the kind of re-use provided by defining this functionality in distinct classes. This is only a concern if the map and/or reduce functions are sufficiently complex; if they reduce down to something in the neighbourhood of a one-liner then re-use isn't a large concern.
Finally, it's worth noting that the problem here does not stem from Hadoop itself. Implementing this functionality in discrete classes is a reasonable approach when the language doesn't provide a clear mechanism for expressing anonymous functions.
A sample
To make this more concrete let's analyze the "grep" sample job referenced in the document describing a single-node Hadoop setup. The main work is performed by a single job using a regex-based map implementation and a reducer which does little more than sum up long values. The meat of the mapping class is as follows:
org.apache.hadoop.mapreduce.lib.map.RegexMapper
public class RegexMapperextends Mapper {
public static String PATTERN = "mapreduce.mapper.regex";
public static String GROUP = "mapreduce.mapper.regexmapper..group";
private Pattern pattern;
private int group;
public void setup(Context context) {
Configuration conf = context.getConfiguration();
pattern = Pattern.compile(conf.get(PATTERN));
group = conf.getInt(GROUP, 0);
}
public void map(K key, Text value,
Context context)
throws IOException, InterruptedException {
String text = value.toString();
Matcher matcher = pattern.matcher(text);
while (matcher.find()) {
context.write(new Text(matcher.group(group)), new LongWritable(1));
}
}
}
The reducer implementation doesn't fare much better:
org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer
public class LongSumReducerextends Reducer KEY,LongWritable> {
private LongWritable result = new LongWritable();
public void reduce(KEY key, Iterablevalues,
Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
That's a lot of boilerplate code in order to implement a pair of very simple functions. Surely we can do better with a language that offers native support for many elements of functional programming.
An initial implementation
As a first step we convert the "grep" example into Scala and run it using Hadoop 0.21. The conversion itself was fairly straightforward and the results can be found on github. At present the port isn't much more than a translation of the existing Java code into more idiomatic Scala; the hope is that this code will provide a basis for future work. The code has been verified to run using the following steps:
- Add the Scala runtime to the Hadoop classpath by editing hadoop-env.sh:
export HADOOP_CLASSPATH=$HOME/local/scala/lib/scala-library.jar - Replace org.apache.hadoop.examples.Grep in mapred-examples with our Scala implementation:
jar uvf hadoop-mapred-examples-0.21.0.jar org/apache/hadoop/examples/Grep* - bin/hdfs namenode -format
- bin/start-dfs.sh
- bin/start-mapred.sh
- bin/hadoop fs -put conf input
- bin/hadoop jar hadoop-mapred-examples-0.21.0.jar grep input output-scala 'dfs[a-z.]+'
- bin/hadoop fs -cat output-scala/*
Future work
In order to get the most out of writing Hadoop jobs in Scala we need to be able to define these functions directly within the job class. This is complicated somewhat by the current Hadoop architecture; jobs are configured to reference mapper and/or reducer classes rather than instances of those classes. Something akin to Groovy's support for using a closure as an implementation of a class with a Single Abstract Method (SAM) might be useful here, but near as I can tell Scala does not offer such support.