Sometimes, we would like to sort the values coming into the Reducer of a Hadoop Map/Reduce (MR) Job. You can indirectly sort the values by using a combination of implementations. They are as follows.
- Use a composite key.
- Extend org.apache.hadoop.mapreduce.Partitioner.
- Extend org.apache.hadoop.io.WritableComparator.
-
http://www.cloudera.com/blog/2011/04/simple-moving-average-secondary-sort-and-mapreduce-part-3/
-
http://pkghosh.wordpress.com/2011/04/13/map-reduce-secondary-sort-does-it-all/
-
http://www.riccomini.name/Topics/DistributedComputing/Hadoop/SortByValue/
The problem
Imagine we have stock data that looks like the following. Each line represents the value of a stock at a particular time. Each value in a line is delimited by a comma. The first value is the stock symbol (i.e. GOOG), the second value is the timestamp (i.e. the number of milliseconds since January 1, 1970, 00:00:00 GMT), and the third value is the stock’s price. The data below is a toy data set. As you can see, there are 3 stock symbols: a, b, and c. The timestamps are also simple: 1, 2, 3, 4. The values are fake as well: 1.0, 2.0, 3.0, and 4.0.a, 1, 1.0 b, 1, 1.0 c, 1, 1.0 a, 2, 2.0 b, 2, 2.0 c, 2, 2.0 a, 3, 3.0 b, 3, 3.0 c, 3, 3.0 a, 4, 4.0 b, 4, 4.0 c, 4, 4.0 |
Let’s say we want for each stock symbol (the reducer key input, or alternatively, the mapper key output), to order the values descendingly by timestamp when they come into the reducer. How do we sort the timestamp descendingly? This problem is known as secondary sorting. Hadoop’s M/R platform sorts the keys, but not the values. (Note, Google’s M/R platform explicitly supports secondary sorting, see Lin and Dyer 2010).
A solution for secondary sorting
Use a composite key
A solution for secondary sorting involves doing multiple things. First, instead of simply emitting the stock symbol as the key from the mapper, we need to emit a composite key, a key that has multiple parts. The key will have the stock symbol and timestamp. If you remember, the process for a M/R Job is as follows.- (K1,V1) –> Map –> (K2,V2)
- (K2,List[V2]) –> Reduce –> (K3,V3)
In the toy data above, K1 will be of type LongWritable, and V1 will be of type Text. Without secondary sorting, K2 will be of type Text and V2 will be of type DoubleWritable (we simply emit the stock symbol and price from the mapper to the reducer). So, K2=symbol, and V2=price, or (K2,V2) = (symbol,price). However, if we emit such an intermediary key-value pair, secondary sorting is not possible. We have to emit a composite key, K2={symbol,timestamp}. So the intermediary key-value pair is (K2,V2) = ({symbol,timestamp},price). Note that composite data structures, such as the composite key, is held within the curly braces. Our reducer simply outputs a K3 of type Text and V3 of type Text; (K3,V3) = (symbol, price). The complete M/R job with the new composite key is shown below.
- (LongWritable,Text) –> Map –> ({symbol,timestamp},price)
- ({symbol,timestamp},List[price]) –> Reduce –> (symbol,price)
Use a composite key comparator
The composite key comparator is where the secondary sorting takes place. It compares composite key by symbol ascendingly and timestamp descendingly. It is shown below. Notice here we sort based on symbol and timestamp. All the components of the composite key is considered.public class CompositeKeyComparator extends WritableComparator { protected CompositeKeyComparator() { super(StockKey.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { StockKey k1 = (StockKey)w1; StockKey k2 = (StockKey)w2; int result = k1.getSymbol().compareTo(k2.getSymbol()); if(0 == result) { result = -1* k1.getTimestamp().compareTo(k2.getTimestamp()); } return result; } } |
Use a natural key grouping comparator
The natural key group comparator “groups” values together according to the natural key. Without this component, each K2={symbol,timestamp} and its associated V2=price may go to different reducers. Notice here, we only consider the “natural” key.public class NaturalKeyGroupingComparator extends WritableComparator { protected NaturalKeyGroupingComparator() { super(StockKey.class, true); } @SuppressWarnings("rawtypes") @Override public int compare(WritableComparable w1, WritableComparable w2) { StockKey k1 = (StockKey)w1; StockKey k2 = (StockKey)w2; return k1.getSymbol().compareTo(k2.getSymbol()); } } |
Use a natural key partitioner
The natural key partitioner uses the natural key to partition the data to the reducer(s). Again, note that here, we only consider the “natural” key.public class NaturalKeyPartitioner extends Partitioner<StockKey, DoubleWritable> { @Override public int getPartition(StockKey key, DoubleWritable val, int numPartitions) { int hash = key.getSymbol().hashCode(); int partition = hash % numPartitions; return partition; } } |
The M/R Job
Once we define the Mapper, Reducer, natural key grouping comparator, natural key partitioner, composite key comparator, and composite key, in Hadoop’s new M/R API, we may configure the Job as follows.public class SsJob extends Configured implements Tool { public static void main(String[] args) throws Exception { ToolRunner.run(new Configuration(), new SsJob(), args); } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "secondary sort"); job.setJarByClass(SsJob.class); job.setPartitionerClass(NaturalKeyPartitioner.class); job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class); job.setSortComparatorClass(CompositeKeyComparator.class); job.setMapOutputKeyClass(StockKey.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapperClass(SsMapper.class); job.setReducerClass(SsReducer.class); job.waitForCompletion(true); return 0; } } |
No comments:
Post a Comment