Custom Input formatter for Map-Reduce
programmer
Input file
1929143,Cloud,137970,23,3/7/1989,Female,LBEIDY,3009,19|1764937,FSI,106975,40,15/11/1972,Male,VBFMLI,4531,20
Schema of
input
employeeNo_Position=0;
unit_Position=1;
salary_Position=2;
age_Position=3;
dOB_Position=4;
gender_Position=5;
projectCode_Position=6;
billingRevenue_Position=7;
noOfBillableDays_Position=8;
delimiter=",";
Requirment
For the given input file, find the total revenue of each unit.
But here we split the input files based as pipe delimited.ie; The
data set before ‘|’ must be taken as first set of input and after ‘|’ is taken as second set of input.
By default , the input formatter for a map reduce framework is Text Input Formatter which will take
inputs as text files by ‘\n’ separated values of that particular text files as
input splits. For making it as pipe separated we have to override that particular
method.
Default Text
input formatter
public class TextInputFormat extends
FileInputFormat<LongWritable, Text> {
@Override
public
RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext
context) {
// By default,textinputformat.record.delimiter
= ‘/n’(Set in configuration file)
String delimiter =
context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[]
recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes();
return new
LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean
isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new
CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
Custom Input formatter
public class TextInputFormat extends
FileInputFormat<LongWritable, Text> {
@Override
public
RecordReader<LongWritable, Text>
createRecordReader(InputSplit split,
TaskAttemptContext
context) {
// Hardcoding this value as “|”.
String delimiter = “|”;
byte[]
recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes = delimiter.getBytes();
return new
LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean
isSplitable(JobContext context, Path file) {
CompressionCodec codec =
new
CompressionCodecFactory(context.getConfiguration()).getCodec(file);
return codec == null;
}
}
Map Reduce
Explanation
For every map reduce
programming, we need a mapper as well as a reducer. Inside a driver class we
are setting the configurations. The output and input of mapper and reducer are
key value pairs.
Mapper -
In our scenario the input
key to mapper is offset value of each line and input value will be the line
itself. Output key is set as unit
and output value is set as revenue
from input file provided.
The output
from mapper is done with a shuffle and sort and then given to the reducer.
Reducer –
Input key to reducer is a text value which is unit and values will be a set of iteratable Intwitable values(Revenue).
We are doing a sum of revenue for each key and thereby obtained
unitwise revenue as reducer output.
Java
Implementation
Driver
class
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.conf.Configured;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import
org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import
org.apache.hadoop.util.Tool;
import
org.apache.hadoop.util.ToolRunner;
import
org.apache.log4j.Logger;
/**
*
* @author Amal_Babu
*
*/
public class Driver extends Configured implements Tool {
private static Logger log = Logger.getLogger(Driver.class);
public static void main(String[] args)
{
// int res;
try {
int res = ToolRunner.run(new Configuration(), new Driver(), args);
System.exit(res);
}
catch (Exception e1) {
e1.printStackTrace();
}
}
@Override
public int run(String[] arg0) throws Exception {
// getting
configuration object and setting job name
Configuration
conf = getConf();
Job
job = new Job(conf, "RevenueCalculater");
// setting the
class names
job.setJarByClass(Driver.class);
job.setMapperClass(RecordSelector.class);
job.setReducerClass(RevenuePerUnitCalculator.class);
// setting the
output data type classes
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setCombinerClass(RevenuePerUnitCalculator.class);
// to accept the
hdfs input and outpur dir at run time
FileInputFormat.addInputPath(job,
new Path(arg0[0]));
FileOutputFormat.setOutputPath(job,
new Path(arg0[1]));
return
job.waitForCompletion(true) ? 0 : 1;
}
}
Mapper
class
import java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.LongWritable;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.log4j.Logger;
/**
*
* @author Amal_Babu
*
* @Description
RecordSelector used to set the
key values
*
*/
public class RecordSelector extends
Mapper<LongWritable, Text, Text, IntWritable>{
private Logger log = Logger.getLogger(RecordSelector.class);
public void map(LongWritable
key, Text value, Context context){
Text
word = new Text();
String
line=value.toString();
{
TextFilePositions
Tokeniser=new TextFilePositions();
String
unit=Tokeniser.getElementByPosition(line, TextFilePositions.unit_Position);
String
revenueval=Tokeniser.getElementByPosition(line,TextFilePositions.billingRevenue_Position);
System.out.println(revenueval);
if(revenueval!=null)
{
IntWritable
revenue=new IntWritable(Integer.parseInt(revenueval));
word.set(unit);
try {
log.info("Emitting
Record from Mapper: word-->" + word
+
"
value-->" + value);
context.write(word,revenue);
}
catch (IOException e) {
log.error("IOException
caught when emitting the record from RecordSelector(Mapper)",e);
}
catch
(InterruptedException e) {
log.error("InterruptedException
caught when emitting the record from RecordSelector(Mapper)",e);
}
}
}
}
}
Tokenizer
class
import
java.util.StringTokenizer;
/**
*
* @author Amal_Babu
*
* @Description used to assign
values to constants
*
*/
public class TextFilePositions {
public static final int employeeNo_Position=0;
public static final int unit_Position=1;
public static final int salary_Position=2;
public static final int age_Position=3;
public static final int dOB_Position=4;
public static final int gender_Position=5;
public static final int projectCode_Position=6;
public static final int billingRevenue_Position=7;
public static final int noOfBillableDays_Position=8;
public static final String delimiter=",";
public String getElementByPosition(String
element,int position)
{
String
fieldToReturn=null;
StringTokenizer
strToken=new StringTokenizer(element,delimiter);
for(int i=0; i<= position
&& strToken.hasMoreElements();i++)
fieldToReturn=strToken.nextToken();
return fieldToReturn;
}
}
Reducer class
import java.io.IOException;
import
org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import
org.apache.log4j.Logger;
/**
*
* @author Amal_Babu
* @Description
RevenuePerUnitCalculator used get the revenue per unit
*
*/
public class
RevenuePerUnitCalculator extends Reducer<Text, IntWritable, Text,
IntWritable>{
private Logger log = Logger.getLogger(RevenuePerUnitCalculator.class);
public void reduce(Text key,
Iterable<IntWritable> values, Context context){
int sum = 0;
for (IntWritable val :
values) {
sum
+= val.get();
}
try {
context.write(key, new IntWritable(sum));
}
catch (IOException e) {
log.error("IOException
caught when emitting the record from reducer");
}
catch
(InterruptedException e) {
log.error("InterruptedException
caught when emitting the record from reducer");
}
}
}
How to execute?
1. Export the project
as jar file.
2. Place input file
in HDFS.
3. Execute the
command given below
hadoop jar <jar_name> <input_hdfs_path>
<output_hdfs_path>
No comments:
Post a Comment