Sunday, May 26, 2013

HDFS Read/Write/Delete/MakingDirectory - Java API


Using HDFS in java (0.20.0)

Below is a code sample of how to read from and write to HDFS in java.

1. Creating a configuration object: To be able to read from or write to HDFS, you need to create a Configuration object and pass configuration parameter to it using hadoop configuration files.
 
    // Conf object will read the HDFS configuration parameters from these XML
    // files. You may specify the parameters for your own if you want.


    Configuration conf = new Configuration();
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
    conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

    If you do not assign the configurations to conf object (using hadoop xml file) your HDFS operation will be performed on the local file system and not on the HDFS.

2. Adding file to HDFS:
 Create a FileSystem object and use a file stream to add a file.

    FileSystem fileSystem = FileSystem.get(conf);
   
    // Check if the file already exists

    Path path = new Path("/path/to/file.ext");
    if (fileSystem.exists(path)) {
        System.out.println("File " + dest + " already exists");
        return;
    }

    // Create a new file and write data to it.
    FSDataOutputStream out = fileSystem.create(path);
    InputStream in = new BufferedInputStream(new FileInputStream(
        new File(source)));


    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    // Close all the file descripters
    in.close();
    out.close();
    fileSystem.close();
3. Reading file from HDFS: Create a file stream object to a file in HDFS and read it.

    FileSystem fileSystem = FileSystem.get(conf);
    Path path = new Path("/path/to/file.ext");

    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    FSDataInputStream in = fileSystem.open(path);


    String filename = file.substring(file.lastIndexOf('/') + 1,
        file.length());


    OutputStream out = new BufferedOutputStream(new FileOutputStream(
        new File(filename)));


    byte[] b = new byte[1024];
    int numBytes = 0;
    while ((numBytes = in.read(b)) > 0) {
        out.write(b, 0, numBytes);
    }

    in.close();
    out.close();
    fileSystem.close();

3. Deleting file from HDFS: Create a file stream object to a file in HDFS and delete it.

    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path("/path/to/file.ext");
    if (!fileSystem.exists(path)) {
        System.out.println("File does not exists");
        return;
    }

    // Delete file
    fileSystem.delete(new Path(file), true);


    fileSystem.close();

3. Create dir in HDFS: Create a file stream object to a file in HDFS and read it.

    FileSystem fileSystem = FileSystem.get(conf);

    Path path = new Path(dir);
    if (fileSystem.exists(path)) {
        System.out.println("Dir " + dir + " already not exists");
        return;
    }

    // Create directories
    fileSystem.mkdirs(path);


    fileSystem.close();

Code:

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSClient {
    public HDFSClient() {

    }

    public void addFile(String source, String dest) throws IOException {
        Configuration conf = new Configuration();

        // Conf object will read the HDFS configuration parameters from these
        // XML files.
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/hdfs-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        // Get the filename out of the file path
        String filename = source.substring(source.lastIndexOf('/') + 1,
            source.length());


        // Create the destination path including the filename.
        if (dest.charAt(dest.length() - 1) != '/') {
            dest = dest + "/" + filename;
        } else {
            dest = dest + filename;
        }

        // System.out.println("Adding file to " + destination);

        // Check if the file already exists
        Path path = new Path(dest);
        if (fileSystem.exists(path)) {
            System.out.println("File " + dest + " already exists");
            return;
        }

        // Create a new file and write data to it.
        FSDataOutputStream out = fileSystem.create(path);
        InputStream in = new BufferedInputStream(new FileInputStream(
            new File(source)));


        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        // Close all the file descripters
        in.close();
        out.close();
        fileSystem.close();
    }

    public void readFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        FSDataInputStream in = fileSystem.open(path);

        String filename = file.substring(file.lastIndexOf('/') + 1,
            file.length());


        OutputStream out = new BufferedOutputStream(new FileOutputStream(
            new File(filename)));


        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }

        in.close();
        out.close();
        fileSystem.close();
    }

    public void deleteFile(String file) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }

        fileSystem.delete(new Path(file), true);

        fileSystem.close();
    }

    public void mkdir(String dir) throws IOException {
        Configuration conf = new Configuration();
        conf.addResource(new Path("/opt/hadoop-0.20.0/conf/core-site.xml"));

        FileSystem fileSystem = FileSystem.get(conf);

        Path path = new Path(dir);
        if (fileSystem.exists(path)) {
            System.out.println("Dir " + dir + " already not exists");
            return;
        }

        fileSystem.mkdirs(path);

        fileSystem.close();
    }

    public static void main(String[] args) throws IOException {

        if (args.length < 1) {
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");

            System.exit(1);
        }

        HDFSClient client = new HDFSClient();
        if (args[0].equals("add")) {
            if (args.length < 3) {
                System.out.println("Usage: hdfsclient add <local_path> " +
                "<hdfs_path>");

                System.exit(1);
            }

            client.addFile(args[1], args[2]);
        } else if (args[0].equals("read")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient read <hdfs_path>");
                System.exit(1);
            }

            client.readFile(args[1]);
        } else if (args[0].equals("delete")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient delete <hdfs_path>");
                System.exit(1);
            }

            client.deleteFile(args[1]);
        } else if (args[0].equals("mkdir")) {
            if (args.length < 2) {
                System.out.println("Usage: hdfsclient mkdir <hdfs_path>");
                System.exit(1);
            }

            client.mkdir(args[1]);
        } else {   
            System.out.println("Usage: hdfsclient add/read/delete/mkdir" +
                " [<local_path> <hdfs_path>]");
            System.exit(1);

        }

        System.out.println("Done!");
    }
}

Word Count Example - Hadoop Map Reduce


Word Count - Hadoop Map Reduce Example


                Word count is a typical example where Hadoop map reduce developers start their hands on with. This sample map reduce is intended to count the no of occurrences of each word  in the provided input files.

What are the minimum requirements?
1.       Input text files – any text file
2.       Cloudera test VM
3.       The mapper, reducer and driver classes to process the input files

 How it works
                The word count operation takes place in two stages a mapper phase and a reducer phase. In mapper phase first the test is tokenized into words then we form a key value pair with these words where the key being the word itself and value ‘1’. For example consider the sentence
“tring tring the phone rings”
In map phase the sentence would be split as words and form the initial key value pair as
<tring,1>
<tring,1>
<the,1>
<phone,1>
<rings,1>

In the reduce phase the keys are grouped together and the values for similar keys are added. So here there are only one pair of similar keys ‘tring’ the values for these keys would be added so the out put key value pairs would be
<tring,2>
<the,1>
<phone,1>
<rings,1>
This would give the number of occurrence of each word in the input. Thus reduce forms an aggregation phase for keys.

The point to be noted here is that first the mapper class executes completely on the entire data set splitting the words and forming the initial key value pairs. Only after this entire process is completed the reducer starts. Say if we have a total of 10 lines in our input files combined together, first the 10 lines are tokenized and key value pairs are formed in parallel, only after this the aggregation/ reducer would start its operation.

The figure below would throw more light to your understanding

Now coming to the practical side of implementation we need our input file and map reduce program jar to do the process job. In a common map reduce process two methods do the key job namely the map and reduce , the main method would trigger the map and reduce methods. For convenience and readability it is better to include the map , reduce and main methods in 3 different class files . We’d look at the 3 files we require to perform the word count job

Word Count Mapper

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCountMapper extends MapReduceBase implementsMapper<LongWritable, Text, Text, IntWritable>
{
      //hadoop supported data types
      private final static IntWritable one = new IntWritable(1);
      private Text word = new Text();
     
      //map method that performs the tokenizer job and framing the initial key value pairs
      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException
      {
            //taking one line at a time and tokenizing the same
            String line = value.toString();
          StringTokenizer tokenizer = new StringTokenizer(line);
         
          //iterating through all the words available in that line and forming the key value pair
            while (tokenizer.hasMoreTokens())
            {
               word.set(tokenizer.nextToken());
               //sending to output collector which inturn passes the same to reducer
                 output.collect(wordone);
            }
       }
}



Let us dive in details of this source code we can see the usage of a few deprecated classes and interfaces; this is because the code has been written to be compliant with Hadoop versions 0.18 and later. From Hadoop version 0.20 some of the methods are deprecated by still supported.

Lets now focus on the class definition part
implements Mapper<LongWritable, Text, Text, IntWritable>
What does this Mapper<LongWritable, Text, Text, IntWritable> stand for?
The data types provided here are Hadoop specific data types designed for operational efficiency suited for massive parallel and lightning fast read write operations. All these data types are based out of java data types itself, for example LongWritable is the equivalent for long in java, IntWritable for int and Text for String.
When we use it as Mapper<LongWritable, Text, Text, IntWritable> , it refers to the data type of input and output key value pairs specific to the mapper or rateher the map method, ie Mapper<Input Key Type, Input Value Type, Output Key Type, Output Value Type>. In our example the input to a mapper is a single line, so this Text (one input line) forms the input value. The input key would a long value assigned in default based on the position of Text in input file. Our output from the mapper is of the format “Word, 1“ hence the data type of our output key value pair is <Text(String),  IntWritable(int)>

The next key component out here is the map method
map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter)
We’d now look into each of the input parameters in detail. The first and second parameter refers to the Data type of the input Key and Value to the mapper. The third parameter is the output collector which does the job of taking the  output data either from the mapper or reducer, with the output collector we need to specify the Data Types of the output Key and Value from the mapper. The fourth parameter, the reporter is used to report the task status internally in Hadoop environment to avoid time outs.

The functionality of the map method is as follows
1.       Create a IntWritable variable ‘one’ with value as 1
2.       Convert the input line in Text type to a String
3.       Use a tokenizer to split the line into words
4.       Iterate through each word and a form key value pairs as
a.       Assign each work from the tokenizer(of String type) to a Text ‘word
b.      Form key value pairs for each word as <word,one> and push it to the output collector

Word Count Reducer

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;

public class WordCountReducer extends MapReduceBase implementsReducer<Text, IntWritable, Text, IntWritable>
{
      //reduce method accepts the Key Value pairs from mappers, do the aggregation based on keys and produce the final out put
      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)throws IOException
      {
            int sum = 0;
            /*iterates through all the values available with a key and add them together and give the
            final result as the key and sum of its values*/
          while (values.hasNext())
          {
               sum += values.next().get();
          }
          output.collect(key, new IntWritable(sum));
      }
}


Here like for the mapper the reducer implements
Reducer<Text, IntWritable, Text, IntWritable>
The first two refers to data type of Input Key and Value to the reducer and the last two refers to data type of output key and value. Our mapper emits output as <apple,1> , <grapes,1> , <apple,1> etc. This is the input for reducer so here the data types of key and value in java would be String and int, the equivalent in Hadoop would be Text and IntWritable. Also we get the output as<word, no of occurrences> so the data type of output Key Value would be <Text, IntWritable>

Now the key component here, the reduce method.
The input to reduce method from the mapper after the sort and shuffle phase would be the key with the list of associated values with it. For example here we have multiple values for a single key from our mapper like <apple,1> , <apple,1> , <apple,1> , <apple,1> . This key values would be fed into the reducer as < apple, {1,1,1,1} > .
Now let us evaluate our reduce method
reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter)
Here all the input parameters are hold the same functionality as that of a mapper, the only diference is with the input Key Value. As mentioned earlier the input to a reducer instance is a key and list of values hence  ‘Text key, Iterator<IntWritable> values’ . The next parameter denotes the output collector of the reducer with the data type of output Key and Value.

The functionality of the reduce method is as follows
1.       Initaize a variable ‘sum’ as 0
2.       Iterate through all the values with respect to a key and sum up all of them
3.       Push to the output collector the Key and the obtained sum as value

Driver Class
The last class file is the driver class. This driver class is responsible for triggering the map reduce job in Hadoop, it is in this driver class we provide the name of our job, output key value data types and the mapper and reducer classes. The source code for the same is as follows

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;


public class WordCount extends Configured implements Tool{
      public int run(String[] args) throws Exception
      {
            //creating a JobConf object and assigning a job name for identification purposes
            JobConf conf = new JobConf(getConf(), WordCount.class);
            conf.setJobName("WordCount");

            //Setting configuration object with the Data Type of output Key and Value
            conf.setOutputKeyClass(Text.class);
            conf.setOutputValueClass(IntWritable.class);

            //Providing the mapper and reducer class names
            conf.setMapperClass(WordCountMapper.class);
            conf.setReducerClass(WordCountReducer.class);

            //the hdfs input and output directory to be fetched from the command line
            FileInputFormat.addInputPath(conf, newPath(args[0]));
            FileOutputFormat.setOutputPath(conf, newPath(args[1]));

            JobClient.runJob(conf);
            return 0;
      }
     
      public static void main(String[] args) throws Exception
      {
            int res = ToolRunner.run(new Configuration(), newWordCount(),args);
            System.exit(res);
      }
}


Create all the three java files in your project. Now you’d be having compilation errors just get the latest release of Hadoop and add the jars on to your class path. Once free from compilation errors we have to package them to a jar. If you are using eclipse then right click on the project and use the export utility. While packing  the jar it is better not to give the main class, because in future when you have multiple map reduce and multiple drivers for the same project we should leave an option to choose the main class file  during run time through the command line. 

Follow the steps to execute the job
1.       Copy the jar to a location in LFS (/home/training/usecase/wordcount/wordcount.jar)
2.       Copy the input files from windows to LFS(/home/training/usecase/wordcount/input/)
3.       Create an input directory in HDFS
hadoop fs –mkdir /projects/wordcount/input/
4.       Copy the input files from LFS to HDFS
Hadoop fs –copyFromLocal /home/training/usecase/wordcount/input/* /projects/wordcount/input/
5.       Execute the jar
hadoop jar /home/training/usecase/wordcount/wordcount.jar com.bejoy.samples.wordcount.WordCount /projects/wordcount/input/ /projects/wordcount/output/

We’d just look at the command in detail with each parameter
/home/training/usecase/wordcount/wordcount.jar -> full path of the jar file in LFS
com.bejoy.samples.wordcount.WordCount  -> full package name of the Driver Class
/projects/wordcount/input/  -> input files location in HDFS
/projects/wordcount/output/  -> a directory in HDFS where we need the output files

NOTE: In Hadoop the map reduce process creates the output directory in hdfs and store the output files on to the same. If the output directory already exists in Hadoop then the m/r job wont execute, in that case either you need to change the output directory or delete the provided output directory in HDFS before running the jar again
6.       Once the job shows a success status we can see the output file in the output directory(part-00000)
Hadoop fs –ls /projects/wordcount/output/
7.       For any further investigation of output file we can retrieve the data from hdfs to LFS and from there to the desired location
hadoop fs –copyToLocal /projects/wordcount/output/ /home/training/usecase/wordcount/output/

Some better practices
                In our current example with the configuration parameters or during runtime we are not specifying the number of reducers. In default Hadoop map reduce jobs have the default no of reducers as one, hence one only one reducer instance is used to process the result set from all the mappers and therefore greater the load a single reducer instance and slower the whole process. We are not exploiting parallelism here, to exploit the same we have to assign the no of reducers explicitly. In runtime we can specify the no of reducers as
hadoop jar /home/training/usecase/wordcount/wordcount.jar com.bejoy.samples.wordcount.WordCount -D mapred.reduce.tasks=15 /projects/wordcount/input/ /projects/wordcount/output/

The key point to be noted here is that the no of output files is same as the no of reducers used as every reducer would produce its own output file. All these output files would be available in the hdfs output directory we assigned in the run command. It would be a cumbersome job to combine all these files manually to obtain the result set. For that Hadoop has provided a get merge command

hadoop fs –getmerge /projects/wordcount/output/ /home/training/usecase/wordcount/output/WordCount.txt

This command would combine the contents of all the files available directly within the /projects/wordcount/output/ hdfs directory and write the same to /home/training/usecase/wordcount/output/WordCount.txt file in LFS

Thanks Bejoy KS for this wonderful explanation

Thursday, May 23, 2013

Mongo DB connection - Java code



Mongo DB connector – Java code
                MongoDB (from "humongous") is an open source document-oriented database system developed and supported by 10gen. It is part of the NoSQLfamily of database systems. Instead of storing data in tables as is done in a "classical" relational database, MongoDB stores structured data as JSON-like documents with dynamic schemas (MongoDB calls the format BSON), making the integration of data in certain types of applications easier and faster.
Here is the Java code for making connection with Mongo DB and creating documents over there.
/**
 *
 * @author Amal_Babu

 *
 */
package com.examples;

import java.util.ArrayList;

import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.Mongo;
import com.mongodb.MongoException;

public class DBManager {

       private static DB database;

       public static void setDatabase(String hostName, int port, String DBName)
                     throws Exception {

              try {
                     Mongo mongo = new Mongo(hostName, port);
                     database = mongo.getDB(DBName);
                     System.out.println(database);
                     DBCollection collName = database.getCollection("testData");
                     System.out.println(collName);
                     ArrayList<String> arrayList_bid = null;
                     for (int i = 0; i < 10000; i++) {
                           arrayList_bid = new ArrayList<String>();
                           arrayList_bid.add("pos1_" + i + 10);
                           arrayList_bid.add("pos2_" + i + 20);
                           arrayList_bid.add("pos3_" + i + 30);
                           BasicDBObject doc1 = new BasicDBObject("bid", arrayList_bid);
                           collName.insert(doc1);
                     }

                     System.out.println(collName.findOne());

              } catch (MongoException ex) {
                     ex.printStackTrace();
              }
       }

       public static void main(String[] args) throws Exception {
              setDatabase("172.22.91.32", 27017, "myDB");
       }

}