Thursday, January 30, 2014

Secondary sorting

 

Sometimes, we would like to sort the values coming into the Reducer of a Hadoop Map/Reduce (MR) Job. You can indirectly sort the values by using a combination of implementations. They are as follows.
  1. Use a composite key.
  2. Extend org.apache.hadoop.mapreduce.Partitioner.
  3. Extend org.apache.hadoop.io.WritableComparator.
Other tutorials that explains this approach on sorting values going into a Reducer are explained in the links below. In this blog, I summarize what I have learned from the links below and also provide a self-contained example. The main difference between this blog and the links below is that I will show how to do this using the new M/R API (i.e. org.apache.hadoop.mapreduce.*).

The problem

Imagine we have stock data that looks like the following. Each line represents the value of a stock at a particular time. Each value in a line is delimited by a comma. The first value is the stock symbol (i.e. GOOG), the second value is the timestamp (i.e. the number of milliseconds since January 1, 1970, 00:00:00 GMT), and the third value is the stock’s price. The data below is a toy data set. As you can see, there are 3 stock symbols: a, b, and c. The timestamps are also simple: 1, 2, 3, 4. The values are fake as well: 1.0, 2.0, 3.0, and 4.0.

a, 1, 1.0
b, 1, 1.0
c, 1, 1.0
a, 2, 2.0
b, 2, 2.0
c, 2, 2.0
a, 3, 3.0
b, 3, 3.0
c, 3, 3.0
a, 4, 4.0
b, 4, 4.0
c, 4, 4.0
 


Let’s say we want for each stock symbol (the reducer key input, or alternatively, the mapper key output), to order the values descendingly by timestamp when they come into the reducer. How do we sort the timestamp descendingly? This problem is known as secondary sorting. Hadoop’s M/R platform sorts the keys, but not the values. (Note, Google’s M/R platform explicitly supports secondary sorting, see Lin and Dyer 2010).

A solution for secondary sorting

Use a composite key

A solution for secondary sorting involves doing multiple things. First, instead of simply emitting the stock symbol as the key from the mapper, we need to emit a composite key, a key that has multiple parts. The key will have the stock symbol and timestamp. If you remember, the process for a M/R Job is as follows.
  • (K1,V1) –> Map –> (K2,V2)
  • (K2,List[V2]) –> Reduce –> (K3,V3)
The notation here is a little bit different from what you may be accustomed to. Instead of angles (i.e. < and >), I use parentheses to describe a key-value pair. Also, for an array, I use List[]. In actuality, it’s an Iterator.
In the toy data above, K1 will be of type LongWritable, and V1 will be of type Text. Without secondary sorting, K2 will be of type Text and V2 will be of type DoubleWritable (we simply emit the stock symbol and price from the mapper to the reducer). So, K2=symbol, and V2=price, or (K2,V2) = (symbol,price). However, if we emit such an intermediary key-value pair, secondary sorting is not possible. We have to emit a composite key, K2={symbol,timestamp}. So the intermediary key-value pair is (K2,V2) = ({symbol,timestamp},price). Note that composite data structures, such as the composite key, is held within the curly braces. Our reducer simply outputs a K3 of type Text and V3 of type Text; (K3,V3) = (symbol, price). The complete M/R job with the new composite key is shown below.
  • (LongWritable,Text) –> Map –> ({symbol,timestamp},price)
  • ({symbol,timestamp},List[price]) –> Reduce –> (symbol,price)
K2 is a composite key, but inside it, the symbol part/component is referred to as the “natural” key. It is the key which values will be grouped by.

Use a composite key comparator

The composite key comparator is where the secondary sorting takes place. It compares composite key by symbol ascendingly and timestamp descendingly. It is shown below. Notice here we sort based on symbol and timestamp. All the components of the composite key is considered.

public class CompositeKeyComparator extends WritableComparator {
    protected CompositeKeyComparator() {
        super(StockKey.class, true);
    }  
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        StockKey k1 = (StockKey)w1;
        StockKey k2 = (StockKey)w2;
        
        int result = k1.getSymbol().compareTo(k2.getSymbol());
        if(0 == result) {
            result = -1* k1.getTimestamp().compareTo(k2.getTimestamp());
        }
        return result;
    }
}

Use a natural key grouping comparator

The natural key group comparator “groups” values together according to the natural key. Without this component, each K2={symbol,timestamp} and its associated V2=price may go to different reducers. Notice here, we only consider the “natural” key.





public class NaturalKeyGroupingComparator extends WritableComparator {
    protected NaturalKeyGroupingComparator() {
        super(StockKey.class, true);
    }  
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        StockKey k1 = (StockKey)w1;
        StockKey k2 = (StockKey)w2;
        
        return k1.getSymbol().compareTo(k2.getSymbol());
    }
}

Use a natural key partitioner

The natural key partitioner uses the natural key to partition the data to the reducer(s). Again, note that here, we only consider the “natural” key.

public class NaturalKeyPartitioner extends Partitioner<StockKey, DoubleWritable> {

    @Override
    public int getPartition(StockKey key, DoubleWritable val, int numPartitions) {
        int hash = key.getSymbol().hashCode();
        int partition = hash % numPartitions;
        return partition;
    }

}

The M/R Job

Once we define the Mapper, Reducer, natural key grouping comparator, natural key partitioner, composite key comparator, and composite key, in Hadoop’s new M/R API, we may configure the Job as follows.




public class SsJob extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SsJob(), args);
    }  
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf, "secondary sort");
        
        job.setJarByClass(SsJob.class);
        job.setPartitionerClass(NaturalKeyPartitioner.class);
        job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
        job.setSortComparatorClass(CompositeKeyComparator.class);
        
        job.setMapOutputKeyClass(StockKey.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setMapperClass(SsMapper.class);
        job.setReducerClass(SsReducer.class);
        
        job.waitForCompletion(true);
        
        return 0;
    }
}

Summary and conclusion

Implementing secondary sorting in Hadoop’s M/R programming paradigm is not trivial. As you can see, you need at least 4 new classes. The composite key class needs to hold the natural key and other data that you will sort on. The composite key comparator will perform the sorting of the keys (and thus values). The natural key grouping comparator will group values based on the natural key. The natural key partitioner will send values with the same natural key to the same reducer.

No comments:

Post a Comment