Saturday, June 1, 2013

Passing parameters to a Map-Reduce program

Passing parameters to a Map-Reduce program

There might be a requirement to pass additional parameters to the mapper and reducers, besides the the inputs which they process.

Setting the parameter:

1. Use the -D command line option to set the parameter while running the job. OR

2. Before launching the job using the old MR API
?
1
2
JobConf job = (JobConf) getConf();
job.set("Amal", "myValue");

3. Before launching the job using the new MR API
?
1
2
3
Configuration conf = new Configuration();
conf.set("Amal", "myValue");
Job job = new Job(conf);

Getting the parameter:

1. Using the old API in the Mapper and Reducer. The JobConfigurable#configure has to be implemented in the Mapper and Reducer class.
?
1
2
3
4
private static Long N;
public void configure(JobConf job) {
    N = Long.parseLong(job.get("Amal"));
}

The variable N can then be used with the map and reduce functions.

2. Using the new API in the Mapper and Reducer. The context is passed to the setup, map, reduce and cleanup functions.
?
1
2
Configuration conf = context.getConfiguration();
String param = conf.get("Amal");

Hive and Pig - A practical comparison


Hive and Pig - A practical comparison

Data from the external systems can be pushed into HDFS using Sqoop, Flume and in many other ways. For now, lets assume that the user data is already there is HDFS as shown below.


?
1
2
3
4
5
/user/training/user/part-m-00000
1,Praveen,20,India,M
2,Prajval,5,India,M
3,Prathibha,15,India,F
?
1
2
3
4
5
6
7
8
9
10
11
12
/user/training/clickstream.txt
1,www.bbc.com
1,www.abc.com
1,www.gmail.com
2,www.cnn.com
2,www.eenadu.net
2,www.stackoverflow.com
2,www.businessweek.com
3,www.eenadu.net
3,www.stackoverflow.com
3,www.businessweek.com
Let's create the 'user' table in Hive and map the user data in HDFS to the table
?
1
2
3
4
5
6
7
CREATE TABLE user (
   user_id INT,
   name STRING,
   age INT,
   country STRING,
   gender STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
?
1
LOAD DATA INPATH '/user/training/user/part-m-00000' OVERWRITE INTO TABLE user;
Similarly, let's create the 'clickstream' table in Hive and map the user data in HDFS to the table 
?
1
2
3
4
CREATE TABLE clickstream (
   userid INT,
   url STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
?
1
LOAD DATA INPATH '/user/training/clickstream.txt' OVERWRITE INTO TABLE clickstream;
Here is the HiveQL query to get the top 3 URLs visited by user whose age is less than 16. The query looks very similar to SQL, which makes it easy to get started with Hive. Hive automatically creates a plan for the below query and submits it to the Hadoop cluster. SQL interfaces are being added to the Big Data frameworks to make it easier for those who are familiar with SQL to get started with the different Big Data frameworks easily. Here is an interesting article from GigaOM on the same.
?
1
SELECT url,count(url) c FROM user u JOIN clickstream c ON (u.user_id=c.userid) where u.age<16 group by url order by c DESC LIMIT 3;
Here is the PigLatin code for the same.
?
1
2
3
4
5
6
7
8
9
Users1 = load '/user/training/user/part-m-00000' using PigStorage(',') as (user_id, name, age:int, country, gender);
Fltrd = filter Users1 by age <= 16;
Pages  = load '/user/training/clickstream.txt' using PigStorage(',') as (userid, url);
Jnd = join Fltrd by user_id, Pages by userid;
Grpd = group Jnd by url;
Smmd = foreach Grpd generate group, COUNT(Jnd) as clicks;
Srtd = order Smmd by clicks desc;
Top3 = limit Srtd 3;
dump Top3;
Here is the output after running the HiveQL query. 
?
1
2
3
4
5
6
7
8
9
10
MapReduce Jobs Launched:
Job 0: Map: 2  Reduce: 1   Cumulative CPU: 2.4 sec   HDFS Read: 686 HDFS Write: 238 SUCCESS
Job 1: Map: 1  Reduce: 1   Cumulative CPU: 1.35 sec   HDFS Read: 694 HDFS Write: 238 SUCCESS
Job 2: Map: 1  Reduce: 1   Cumulative CPU: 1.32 sec   HDFS Read: 694 HDFS Write: 64 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 70 msec
OK
www.businessweek.com    2
www.eenadu.net    2
www.stackoverflow.com    2
Time taken: 105.076 seconds
Here is the output after running the PigLatin script. 
?
1
2
3
4
5
6
7
2013-03-04 21:06:57,654 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2013-03-04 21:06:57,656 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.
2013-03-04 21:06:57,667 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1
2013-03-04 21:06:57,667 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1
(www.eenadu.net,2)
(www.businessweek.com,2)
(www.stackoverflow.com,2
Just a quick comparison Hive is converting the query into 3 MR jobs and took around 105 seconds for completion while Pig is converting the PigLatin script into 5 MR jobs and is taking around 210 seconds for completion.

Didn't spend much time  to look at why Pig is twice as slow when compared to Hive. Could have used partitions in Hive to make the Hive query run faster.