Thursday, January 30, 2014

Pig - Scripting Interface to Map Reduce

Hive Vs Pig

 

Feature
Hive
Pig
Language
SQL-like
PigLatin
Schemas/Types
Yes (explicit)
Yes (implicit)
Partitions
Yes
No
Server
Optional (Thrift)
No
User Defined Functions (UDF)
Yes (Java)
Yes (Java)
Custom Serializer/Deserializer
Yes
Yes
DFS Direct Access
Yes (implicit)
Yes (explicit)
Join/Order/Sort
Yes
Yes
Shell
Yes
Yes
Streaming
Yes
Yes
Web Interface
Yes
No
JDBC/ODBC
Yes (limited)
No

Apache Pig and Hive are two projects that layer on top of Hadoop, and provide a higher-level language for using Hadoop's MapReduce library. Apache Pig provides a scripting language for describing operations like reading, filtering, transforming, joining, and writing data -- exactly the operations that MapReduce was originally designed for. Rather than expressing these operations in thousands of lines of Java code that uses MapReduce directly, Pig lets users express them in a language not unlike a bash or perl script. Pig is excellent for prototyping and rapidly developing MapReduce-based jobs, as opposed to coding MapReduce jobs in Java itself.
If Pig is "Scripting for Hadoop", then Hive is "SQL queries for Hadoop". Apache Hive offers an even more specific and higher-level language, for querying data by running Hadoop jobs, rather than directly scripting step-by-step the operation of several MapReduce jobs on Hadoop. The language is, by design, extremely SQL-like. Hive is still intended as a tool for long-running batch-oriented queries over massive data; it's not "real-time" in any sense. Hive is an excellent tool for analysts and business development types who are accustomed to SQL-like queries and Business Intelligence systems; it will let them easily leverage your shiny new Hadoop cluster to perform ad-hoc queries or generate report data across data stored in storage systems mentioned above.

 

WORD COUNT EXAMPLE - PIG SCRIPT

Q) How to find the number of occurrences of the words in a file using the pig script?

You can find the famous word count example written in map reduce programs in apache website. Here we will write a simple pig script for the word count problem.

The following pig script finds the number of times a word repeated in  a file:

Word Count Example Using Pig Script:

lines = LOAD '/user/hadoop/HDFS_File.txt' AS (line:chararray);
words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) as word;
grouped = GROUP words BY word;
wordcount = FOREACH grouped GENERATE group, COUNT(words);
DUMP wordcount;

The above pig script, first splits each line into words using the TOKENIZE operator. The tokenize function creates a bag of words. Using the FLATTEN function, the bag is converted into a tuple. In the third statement, the words are grouped together so that the count can be computed which is done in fourth statement.

You can see just with 5 lines of pig program, we have solved the word count problem very easily.

 

HOW TO FILTER RECORDS - PIG TUTORIAL EXAMPLES

Pig allows you to remove unwanted records based on a condition. The Filter functionality is similar to the WHERE clause in SQL. The FILTER operator in pig is used to remove unwanted records from the data file. The syntax of FILTER operator is shown below: 
<new relation> = FILTER <relation> BY <condition>

Here relation is the data set on which the filter is applied, condition is the filter condition and new relation is the relation created after filtering the rows. 

Pig Filter Examples: 

Lets consider the below sales data set as an example 
year,product,quantity
---------------------
2000, iphone, 1000
2001, iphone, 1500 
2002, iphone, 2000
2000, nokia,  1200
2001, nokia,  1500
2002, nokia,  900

1. select products whose quantity is greater than or equal to 1000. 
grunt> A = LOAD '/user/hadoop/sales' USING PigStorage(',') AS (year:int,product:chararray,quantity:int);
grunt> B = FILTER A BY quantity >= 1000;
grunt> DUMP B;
(2000,iphone,1000)
(2001,iphone,1500)
(2002,iphone,2000)
(2000,nokia,1200)
(2001,nokia,1500)

2. select products whose quantity is greater than 1000 and year is 2001 
grunt> C = FILTER A BY quantity > 1000 AND year == 2001;
(2001,iphone,1500)
(2001,nokia,1500)

3. select products with year not in 2000 
grunt> D = FILTER A BY year != 2000;
grunt> DUMP D;
(2001,iphone,1500)
(2002,iphone,2000)
(2001,nokia,1500)
(2002,nokia,900)

You can use all the logical operators (NOT, AND, OR) and relational operators (< , >, ==, !=, >=, <= ) in the filter conditions.

 

CREATING SCHEMA, READING AND WRITING DATA - PIG TUTORIAL

The first step in processing a data set using pig is to define a schema for the data set. A schema is a representation of the data set in terms of fields. Let see how to define a schema with an example. 

Consider the following products data set in Hadoop as an example: 
10, iphone,  1000
20, samsung, 2000
30, nokia,   3000

Here first field is the product id, second field is the product name and third field is the product price. 

Defining Schema: 

The LOAD operator is used to define a schema for a data set. Let see different usages of the LOAD operator for defining the schema for the above dataset. 

1. Creating Schema without specifying any fields. 

In this method, we don't specify any field names for creating the schema. An example is shown below: 
grunt> A = LOAD '/user/hadoop/products';

Pig is a data flow language. Each operational statement in pig consists of a relation and an operation. The left side of the statement is called relation and the right side is called the operation. Pig statements must terminated with a semicolon. Here A is a relation. /user/hadoop/products is the file in the hadoop. 

To view the schema of a relation, use the describe statement which is shown below: 
grunt> describe A;
Schema for A unknown.

As there are no fields are defined, the above describe statement on A shows that "Schema for A unkown". To display the contents on the console use the DUMP operator. 
grunt> DUMP A;
(10,iphone,1000)
(20,samsung,2000)
(30,nokia,3000)

To write the data set into HDFS, use the STORE operator as shown below 
grunt> STORE A INTO 'hadoop directory name'

2. Defining schema without specifying any data types. 

We can create a schema just by specifying the field names without any data types. An example is shown below: 
grunt> A = LOAD '/user/hadoop/products' USING PigStorage(',') AS (id, product_name, price);

grunt> describe A;
A: {id: bytearray,product_name: bytearray,price: bytearray}

grunt> STORE A into '/user/hadoop/products' USING PigStorage('|'); --Writes data with pipe as delimiter into hdfs product directory.

The PigStorge is used to specify the field delimiter. The default field delimiter is tab. If your data is a tab separated, then you can ignore the USING PigStorage keywords. In the STORE operation, you can use the PigStorage class for specifying the output separator. 

You have to specify the field names in the 'AS' clause. As we didn't specified any data type, by default pig assigned bytearray as the data type for the fields. 

3. Defining schema with field names and data types. 

To specify the data type use the colon. Take a look at the below example: 
grunt> A = LOAD '/user/hadoop/products' USING PigStorage(',') AS (id:int, product_name:chararray, price:int);

grunt> describe A;
A: {id: int,product_name: chararray,price: int}

Accessing the Fields: 

So far, we have seen how to define a schema, how to print the contents of the data on the console and how to write data to hdfs. Now we will see how to access the fields. 

The fields can be accessed in two ways: 

  • Field Names: We can specify the field name to access the values from that particular value.
  • Positional Parameters: The field positions start from 0 to n. $0 indicates first field, $1 indicates second field.

Example:
grunt> A = LOAD '/user/products/products' USING PigStorage(',') AS (id:int, product_name:chararray, price:int);
grunt> B = FOREACH A GENERATE id;
grunt> C = FOREACH A GENERATE $1,$2;
grunt> DUMP B;
(10)
(20)
(30)
grunt> DUMP C;
(iphone,1000)
(samsung,2000)
(nokia,3000)

FOREACH is like a for loop used to iterate over the records of a relation. The GENERATE keyword specifies what operation to do on the record. In the above example, the GENERATE is used to get the fields from the relation A. 

Note: It is always good practice to see the schema of a relation using the describe statement before performing a operation. By knowing the schema, you will know how to access the fields in the schema.

 

PIG DATA TYPES - PRIMITIVE AND COMPLEX

Pig has a very limited set of data types. Pig data types are classified into two types. They are:
  • Primitive
  • Complex

Primitive Data Types: The primitive datatypes are also called as simple datatypes. The simple data types that pig supports are: 
  • int : It is signed 32 bit integer. This is similar to the Integer in java.
  • long : It is a 64 bit signed integer. This is similar to the Long in java.
  • float : It is a 32 bit floating point. This data type is similar to the Float in java.
  • double : It is a 63 bit floating pint. This data type is similar to the Double in java.
  • chararray : It is character array in unicode UTF-8 format. This corresponds to java's String object.
  • bytearray : Used to represent bytes. It is the default data type. If you don't specify a data type for a filed, then bytearray datatype is assigned for the field.
  • boolean : to represent true/false values.

Complex Types: Pig supports three complex data types. They are listed below: 
  • Tuple : An ordered set of fields. Tuple is represented by braces. Example: (1,2)
  • Bag : A set of tuples is called a bag. Bag is represented by flower or curly braces. Example: {(1,2),(3,4)}
  • Map : A set of key value pairs. Map is represented in a square brackets. Example: [key#value] . The # is used to separate key and value.

Pig allows nesting of complex data structures. Example: You can nest a tuple inside a tuple, bag and a Map 

Null: Null is not a datatype. Null is an undefined value or corrupted value. Example: Let say you have declared a field as int type. However that field contains character values. When reading data from this field, pig converts those character values(corrupted) values into Nulls. Any operation with Null results in Null. The Null in pig is similar to the Null in SQL.

 

RELATIONS, BAGS, TUPLES, FIELDS - PIG TUTORIAL

In this article, we will see what is a relation, bag, tuple and field. Let see each one of these in detail. 

Lets consider the following products dataset as an example: 

Id, product_name
-----------------------
10, iphone
20, samsung
30, Nokia

  • Field: A field is a piece of data. In the above data set product_name is a field. 
  • Tuple: A tuple is a set of fields. Here Id and product_name form a tuple. Tuples are represented by braces. Example: (10, iphone). 
  • Bag: A bag is collection of tuples. Bag is represented by flower braces. Example: {(10,iphone),(20, samsung),(30,Nokia)}. 
  • Relation: Relation represents the complete database. A relation is a bag. To be precise relation is an outer bag. We can call a relation as a bag of tuples.
To compare with RDBMS, a relation is a table, where as the tuples in the bag corresponds to the rows in the table. Note that tuples in pig doesn't require to contain same number of fields and fields in the same position have the same data type.

 

HOW TO RUN PIG PROGRAMS - EXAMPLES

Pig programs can be run in three methods which work in both local and MapReduce mode. They are

  • Script Mode
  • Grunt Mode
  • Embedded Mode
Let see each mode in detail 

Script Mode or Batch Mode: In script mode, pig runs the commands specified in a script file. The following example shows how to run a pig programs from a script file: 
> cat scriptfile.pig
A = LOAD 'script_file';
DUMP A;
> pig scriptfile.pig

(pig script mode example)
(pig runs on top of hadoop)

Grunt Mode or Interactive Mode: The grunt mode can also be called as interactive mode. Grunt is pig's interactive shell. It is started when no file is specified for pig to run. 
> pig
grunt> A = LOAD 'grunt_file';
grunt> DUMP A;

(pig grunt or interactive mode example)
(pig runs on top of hadoop)

You can also run pig scripts from grunt using run and exec commands. 
grunt> run scriptfile.pig
grunt> exec scriptfile.pig

Embedded Mode: You can embed pig programs in java and can run from java.

Secondary sorting

 

Sometimes, we would like to sort the values coming into the Reducer of a Hadoop Map/Reduce (MR) Job. You can indirectly sort the values by using a combination of implementations. They are as follows.
  1. Use a composite key.
  2. Extend org.apache.hadoop.mapreduce.Partitioner.
  3. Extend org.apache.hadoop.io.WritableComparator.
Other tutorials that explains this approach on sorting values going into a Reducer are explained in the links below. In this blog, I summarize what I have learned from the links below and also provide a self-contained example. The main difference between this blog and the links below is that I will show how to do this using the new M/R API (i.e. org.apache.hadoop.mapreduce.*).

The problem

Imagine we have stock data that looks like the following. Each line represents the value of a stock at a particular time. Each value in a line is delimited by a comma. The first value is the stock symbol (i.e. GOOG), the second value is the timestamp (i.e. the number of milliseconds since January 1, 1970, 00:00:00 GMT), and the third value is the stock’s price. The data below is a toy data set. As you can see, there are 3 stock symbols: a, b, and c. The timestamps are also simple: 1, 2, 3, 4. The values are fake as well: 1.0, 2.0, 3.0, and 4.0.

a, 1, 1.0
b, 1, 1.0
c, 1, 1.0
a, 2, 2.0
b, 2, 2.0
c, 2, 2.0
a, 3, 3.0
b, 3, 3.0
c, 3, 3.0
a, 4, 4.0
b, 4, 4.0
c, 4, 4.0
 


Let’s say we want for each stock symbol (the reducer key input, or alternatively, the mapper key output), to order the values descendingly by timestamp when they come into the reducer. How do we sort the timestamp descendingly? This problem is known as secondary sorting. Hadoop’s M/R platform sorts the keys, but not the values. (Note, Google’s M/R platform explicitly supports secondary sorting, see Lin and Dyer 2010).

A solution for secondary sorting

Use a composite key

A solution for secondary sorting involves doing multiple things. First, instead of simply emitting the stock symbol as the key from the mapper, we need to emit a composite key, a key that has multiple parts. The key will have the stock symbol and timestamp. If you remember, the process for a M/R Job is as follows.
  • (K1,V1) –> Map –> (K2,V2)
  • (K2,List[V2]) –> Reduce –> (K3,V3)
The notation here is a little bit different from what you may be accustomed to. Instead of angles (i.e. < and >), I use parentheses to describe a key-value pair. Also, for an array, I use List[]. In actuality, it’s an Iterator.
In the toy data above, K1 will be of type LongWritable, and V1 will be of type Text. Without secondary sorting, K2 will be of type Text and V2 will be of type DoubleWritable (we simply emit the stock symbol and price from the mapper to the reducer). So, K2=symbol, and V2=price, or (K2,V2) = (symbol,price). However, if we emit such an intermediary key-value pair, secondary sorting is not possible. We have to emit a composite key, K2={symbol,timestamp}. So the intermediary key-value pair is (K2,V2) = ({symbol,timestamp},price). Note that composite data structures, such as the composite key, is held within the curly braces. Our reducer simply outputs a K3 of type Text and V3 of type Text; (K3,V3) = (symbol, price). The complete M/R job with the new composite key is shown below.
  • (LongWritable,Text) –> Map –> ({symbol,timestamp},price)
  • ({symbol,timestamp},List[price]) –> Reduce –> (symbol,price)
K2 is a composite key, but inside it, the symbol part/component is referred to as the “natural” key. It is the key which values will be grouped by.

Use a composite key comparator

The composite key comparator is where the secondary sorting takes place. It compares composite key by symbol ascendingly and timestamp descendingly. It is shown below. Notice here we sort based on symbol and timestamp. All the components of the composite key is considered.

public class CompositeKeyComparator extends WritableComparator {
    protected CompositeKeyComparator() {
        super(StockKey.class, true);
    }  
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        StockKey k1 = (StockKey)w1;
        StockKey k2 = (StockKey)w2;
        
        int result = k1.getSymbol().compareTo(k2.getSymbol());
        if(0 == result) {
            result = -1* k1.getTimestamp().compareTo(k2.getTimestamp());
        }
        return result;
    }
}

Use a natural key grouping comparator

The natural key group comparator “groups” values together according to the natural key. Without this component, each K2={symbol,timestamp} and its associated V2=price may go to different reducers. Notice here, we only consider the “natural” key.





public class NaturalKeyGroupingComparator extends WritableComparator {
    protected NaturalKeyGroupingComparator() {
        super(StockKey.class, true);
    }  
    @SuppressWarnings("rawtypes")
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
        StockKey k1 = (StockKey)w1;
        StockKey k2 = (StockKey)w2;
        
        return k1.getSymbol().compareTo(k2.getSymbol());
    }
}

Use a natural key partitioner

The natural key partitioner uses the natural key to partition the data to the reducer(s). Again, note that here, we only consider the “natural” key.

public class NaturalKeyPartitioner extends Partitioner<StockKey, DoubleWritable> {

    @Override
    public int getPartition(StockKey key, DoubleWritable val, int numPartitions) {
        int hash = key.getSymbol().hashCode();
        int partition = hash % numPartitions;
        return partition;
    }

}

The M/R Job

Once we define the Mapper, Reducer, natural key grouping comparator, natural key partitioner, composite key comparator, and composite key, in Hadoop’s new M/R API, we may configure the Job as follows.




public class SsJob extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new SsJob(), args);
    }  
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf, "secondary sort");
        
        job.setJarByClass(SsJob.class);
        job.setPartitionerClass(NaturalKeyPartitioner.class);
        job.setGroupingComparatorClass(NaturalKeyGroupingComparator.class);
        job.setSortComparatorClass(CompositeKeyComparator.class);
        
        job.setMapOutputKeyClass(StockKey.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        
        job.setMapperClass(SsMapper.class);
        job.setReducerClass(SsReducer.class);
        
        job.waitForCompletion(true);
        
        return 0;
    }
}

Summary and conclusion

Implementing secondary sorting in Hadoop’s M/R programming paradigm is not trivial. As you can see, you need at least 4 new classes. The composite key class needs to hold the natural key and other data that you will sort on. The composite key comparator will perform the sorting of the keys (and thus values). The natural key grouping comparator will group values based on the natural key. The natural key partitioner will send values with the same natural key to the same reducer.

How to migrate a hive table from one hive instance to another or between hive databases


Hive has the EXPORT IMPORT feature since hive 0.8. With this feature you can export the metadata as well as the data for a corresponding table to a file in hdfs using the EXPORT command. The data is stored in json format. Data once exported this way could be imported back to another database or hive instance using the IMPORT command.
The syntax looks something like this:
EXPORT TABLE table_or_partition TO hdfs_path;
IMPORT [[EXTERNAL] TABLE table_or_partition] FROM hdfs_path [LOCATION [table_location]];
Some sample statements would look like:
EXPORT TABLE <table name> TO 'location in hdfs';
Use test_db;
IMPORT FROM 'location in hdfs';
Export Import can be appled on a partition basis as well:
EXPORT TABLE <table name> PARTITION (loc="USA") to 'location in hdfs';
The below import commands imports to an external table instead of a managed one
IMPORT EXTERNAL TABLE FROM 'location in hdfs' LOCATION ‘/location/of/external/table’;

How to recover deleted files from hdfs/ Enable trash in hdfs

If you enable thrash in hdfs, when an rmr is issued the file will be still available in trash for some period. There by you can recover accidentally deleted ones. To enable hdfs thrash
set fs.trash.interval > 1

 
This specifies the time interval a file deleted would be available in trash. There is a property (fs.trash.checkpoint.interval) that specifies the checkpoint interval NN checks the trash dir at every intervals and deletes all files older than specified fs.trash.interval . ie say you have your
fs.trash.interval as 60 mins and fs.trash.checkpoint.interval as 30 mins, then in every 30 mins a check is performed and deletes all files that are more than 60 mins old.

fs.trash.checkpoint.interval should be equal to or less than fs.trash.interval

The value of fs.trash.interval  is specified in minutes.

fs.trash.interval should be enabled in client node as well as Name Node. Name Node it should be present for check pointing purposes. Based the value in client node it is decided whether to remove a file completely from hdfs or thrash it on an rmr issued from client.

The trash dir by default is /user/X/.Trash

Mahout Recomendation - AlphaNumeric ItemIds


In real world data we can’t always ensure that the input data supplied to us in order to generate recommendations should contain only integer values for User and Item Ids. If these values or any one of these are not integers the default data models that mahout provides won’t be suitable to process our data. Here let us consider the case where out Item ID is Strings we’d define our custom data model. In our data model we need to override a method in order to read item id as string and convert the same into long and return the unique long value

Data Model Class

import java.io.File;
import java.io.IOException;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;

public class AlphaItemFileDataModel extends FileDataModel {
      private final ItemMemIDMigrator  memIdMigtr = new ItemMemIDMigrator();
     
      public AlphaItemFileDataModel(File dataFile) throws IOException {
            super(dataFile);       
      }

      public AlphaItemFileDataModel(File dataFile, boolean transpose) throws IOException {
            super(dataFile, transpose);
      }

      @Override
      protected long readItemIDFromString(String value) {
            long retValue =  memIdMigtr.toLongID(value);
            if(null == memIdMigtr.toStringID(retValue)){
                  try {
                        memIdMigtr.singleInit(value);
                  } catch (TasteException e) {
                        e.printStackTrace();
                  }
            }
            return retValue;
      }
   
      String getItemIDAsString(long itemId){
            return memIdMigtr.toStringID(itemId);
      }
}

Class that defines the map to store the String to Long values

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
import org.apache.mahout.cf.taste.impl.model.AbstractIDMigrator;

      public  class ItemMemIDMigrator extends AbstractIDMigrator {
       
        private final FastByIDMap<String> longToString;
       
        public ItemMemIDMigrator() {
          this.longToString = new FastByIDMap<String>(100);
        }
       
        @Override
        public void storeMapping(long longID, String stringID) {
          synchronized (longToString) {
            longToString.put(longID, stringID);
          }
        }
       
        @Override
        public String toStringID(long longID) {
          synchronized (longToString) {
            return longToString.get(longID);
          }
        }
        public void singleInit(String stringID) throws TasteException {
            storeMapping(toLongID(stringID), stringID);
        }
       
      }

In your Recommender implementation you can use this Data Model class instead of the default file data model to accept an input that contains alpha numeric Item Ids. Similar you can device the code to form a data model that would accommodate alpha numeric User Ids as well.