Monday, May 13, 2013

Custom Input Formatter for a Map-Reduce Programme



Custom Input formatter for Map-Reduce programmer

Input file
1929143,Cloud,137970,23,3/7/1989,Female,LBEIDY,3009,19|1764937,FSI,106975,40,15/11/1972,Male,VBFMLI,4531,20

Schema of input
       employeeNo_Position=0;
       unit_Position=1;
       salary_Position=2;
       age_Position=3;
       dOB_Position=4;
       gender_Position=5;
       projectCode_Position=6;
       billingRevenue_Position=7;
       noOfBillableDays_Position=8;

       delimiter=",";

Requirment
For the given input file, find the total revenue of each unit.
But here we split the input files based as pipe delimited.ie; The data set before ‘|must be taken as first set of input and after ‘|’ is taken as second set of input.
By default , the input formatter for a map reduce framework is Text Input Formatter which will take inputs as text files by ‘\n’ separated values of that particular text files as input splits. For making it as pipe separated we have to override that particular method.

Default Text input formatter
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
// By default,textinputformat.record.delimiter = ‘/n’(Set in configuration file)
    String delimiter = context.getConfiguration().get(
        "textinputformat.record.delimiter");
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }

}

Custom Input formatter
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

  @Override
  public RecordReader<LongWritable, Text>
    createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
// Hardcoding this value as “|”.
    String delimiter = “|”;
    byte[] recordDelimiterBytes = null;
    if (null != delimiter)
      recordDelimiterBytes = delimiter.getBytes();
    return new LineRecordReader(recordDelimiterBytes);
  }

  @Override
  protected boolean isSplitable(JobContext context, Path file) {
    CompressionCodec codec =
      new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
    return codec == null;
  }

}

Map Reduce Explanation
      For every map reduce programming, we need a mapper as well as a reducer. Inside a driver class we are setting the configurations. The output and input of mapper and reducer are key value pairs.
Mapper -
 In our scenario the input key to mapper is offset value of each line and input value will be the line itself. Output key is set as unit and output value is set as revenue from input file provided.
                The output from mapper is done with a shuffle and sort and then given to the reducer.
Reducer –
Input key to reducer is a text value which is unit and values will be a set of iteratable Intwitable values(Revenue).
We are doing a sum of revenue for each key and thereby obtained unitwise revenue as reducer output.

Java Implementation

Driver class
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;

/**
 *
 * @author Amal_Babu
 *
 */
public class Driver extends Configured implements Tool {

       private static Logger log = Logger.getLogger(Driver.class);

       public static void main(String[] args) {

              // int res;
              try {
                     int res = ToolRunner.run(new Configuration(), new Driver(), args);
                     System.exit(res);
              } catch (Exception e1) {
                     e1.printStackTrace();
              }

       }

       @Override
       public int run(String[] arg0) throws Exception {
              // getting configuration object and setting job name
              Configuration conf = getConf();
              Job job = new Job(conf, "RevenueCalculater");

              // setting the class names
              job.setJarByClass(Driver.class);
              job.setMapperClass(RecordSelector.class);
              job.setReducerClass(RevenuePerUnitCalculator.class);

              // setting the output data type classes
              job.setOutputKeyClass(Text.class);
              job.setOutputValueClass(IntWritable.class);

              job.setNumReduceTasks(1);
              job.setInputFormatClass(TextInputFormat.class);
              job.setOutputFormatClass(TextOutputFormat.class);
              job.setCombinerClass(RevenuePerUnitCalculator.class);

              // to accept the hdfs input and outpur dir at run time
              FileInputFormat.addInputPath(job, new Path(arg0[0]));
              FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

              return job.waitForCompletion(true) ? 0 : 1;
       }

}

Mapper class
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
/**
 *
 * @author Amal_Babu
 *
 * @Description RecordSelector  used  to set the  key values
 *
 */
public class RecordSelector extends Mapper<LongWritable, Text, Text, IntWritable>{

       private Logger log = Logger.getLogger(RecordSelector.class);
       public void map(LongWritable key, Text value, Context context){
              Text word = new Text();
              String line=value.toString();
              {
              TextFilePositions Tokeniser=new TextFilePositions();
              String unit=Tokeniser.getElementByPosition(line, TextFilePositions.unit_Position);
              String revenueval=Tokeniser.getElementByPosition(line,TextFilePositions.billingRevenue_Position);
              System.out.println(revenueval);
              if(revenueval!=null)
              {
              IntWritable revenue=new IntWritable(Integer.parseInt(revenueval));
              word.set(unit);
              try {
                     log.info("Emitting Record from Mapper: word-->" + word
                                  + " value-->" + value);
                     context.write(word,revenue);
              } catch (IOException e) {

                     log.error("IOException caught when emitting the record from RecordSelector(Mapper)",e);

              } catch (InterruptedException e) {

                     log.error("InterruptedException caught when emitting the record from RecordSelector(Mapper)",e);

              }
              }
              }

       }


}

Tokenizer class

import java.util.StringTokenizer;
/**
 *
 * @author Amal_Babu
 *
 * @Description used to assign values to constants
 *
 */
public  class TextFilePositions {
       public static final int employeeNo_Position=0;
       public static final int unit_Position=1;
       public static final int salary_Position=2;
       public static final int age_Position=3;
       public static final int dOB_Position=4;
       public static final int gender_Position=5;
       public static final int projectCode_Position=6;
       public static final int billingRevenue_Position=7;
       public static final int noOfBillableDays_Position=8;
       public static final String delimiter=",";
      
      
      
       public String getElementByPosition(String element,int position)
       {
              String fieldToReturn=null;
              StringTokenizer strToken=new StringTokenizer(element,delimiter);
              for(int i=0; i<= position && strToken.hasMoreElements();i++)
                     fieldToReturn=strToken.nextToken();
                     return fieldToReturn;
       }
      
}

Reducer class
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.Logger;
/**
 *
 * @author Amal_Babu
 * @Description RevenuePerUnitCalculator  used  get the revenue per unit
 *
 */
public class RevenuePerUnitCalculator extends Reducer<Text, IntWritable, Text, IntWritable>{
       private Logger log = Logger.getLogger(RevenuePerUnitCalculator.class);
       public void reduce(Text key, Iterable<IntWritable> values, Context context){
              int sum = 0;
              for (IntWritable val : values) {
                     sum += val.get();
              }
              try {
                     context.write(key, new IntWritable(sum));
              } catch (IOException e) {

                     log.error("IOException caught when emitting the record from reducer");

              } catch (InterruptedException e) {

                     log.error("InterruptedException caught when emitting the record from reducer");

              }

       }

}

How to execute?
1. Export the project as jar file.
2. Place input file in HDFS.
3. Execute the command given below
                hadoop  jar <jar_name> <input_hdfs_path> <output_hdfs_path>
               

No comments:

Post a Comment