In this section we are going to develop the WordCount Application, that we have already discussed in the previous section, using Old API. The entire concept and development process would be as same as the previous section. Finally, we will some of the differences between the New API and Old API.
So, we are going to learn the following concepts:
- MapReduce Class Structure in Old API
- Differences Between New API and Old API
MapRduce Class Structure in Old API
When we develop a MapReduce application using old API, in this case also a Hadoop job class is standard java runnable class which need to contains a public static void main() method. In this main method, we need to create an instance of
org.apache.hadoop.mapred.JobConf
Class passing in the Main class/ Drive class / Job Class as an argument and we need to call the
runJob()
method passing an instance of
org.apache.hadoop.mapred.JobConf
object.
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
public class MyJobClass{
public static void main(String[] args){
JobConf conf = new JobConf(MyJobClass.class);
//Configure the job object
JobClient.runJob(conf);
}
}
Using Old API also, for the configuration of the JobConf object, we need to do at least these three: where to find map() method, where to find reduce() method and set of configuration parameters needed to apply the job such as which data is being used and what is the format of that data. We can set many more configuration parameters that we will see in the subsequent chapters. But these are the minimum requirements:
- Where the map method can be found.
- Where the reduce method can be found.
- Configuration parameters
The map() and reduce() methods need to live in their own classes so the Job Class/Master Class tells Hadoop which is the class containing the map() method and which is the class containing the reduce() method.
//Map Class
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
public static class MyMapClass extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
@Override
public void map(param){
// Do the map-side processing i.e., write out key and value pairs
}
}
//Reduce Class
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
public static class MyReduceClass extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(param){
// Do the reduce-side processing i.e., write out the key and value pairs
}
}
Thus for a Hadoop job, we need a minimum of three classes: an overall Job Controlled Class, a Map Class and a Reduce Class. Here using old API, for a class to be a Map Class it must extend the
MapReduceBase
class of
org.apache.hadoop.mapred
package and implements the
Mapper
interface of
org.apache.hadoop.mapred
package. Similarly, for a class to be a Reducer class, it must extend the
MapReduceBase
class of
org.apache.hadoop.mapred
package and
Reducer
interface of
org.apache.hadoop.mapred
package. These above are the simplified version of Map, Reduce and Job classes you will see the full version of these classes below in this section.
When we put together these classes, the final Job class structure will look like the following:
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.Reducer;
public class MyJobClass{ //Job Class
//Map Class
public static class MyMapper extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
@Override
public void map(param){
// Do the map-side processing i.e., write out key and value pairs
}
}
//Reduce Class
public static class MyReducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
@Override
public void reduce(param){
// Do the reduce-side processing i.e., write out the key and value pairs
}
}
//main method for configuration parameters
public static void main(String[] args){
JobConf conf = new JobConf(MyJobClass.class);
//Configure the job object
JobClient.runJob(conf);
}
}
I am not going to introduce any template for the MapReduce Job using old API because we are going to use new API throughout this course. My intention is only to make aware you with the old API and some of its classes and interface that I have used in this program.
Now Create a new class with the name MapReduceApp2 in the same project directory, FirstHadoopProject, that we have created in the last section. Erase the default generated code and copy, paste the code given below and save it. Here you don’t need to build your project with older API because Hadoop newer API is compatible with the older API.
/**
* A MapReduce WordCount Program in Hadoop using old API
*/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
/**
* @author Dv Analytics.com
*
*/
public class WordCountApp2{
public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter r)throws IOException {
//Do the map-side processing i.e., write out key and value pairs
String line = value.toString();
String body = line.replaceAll("\\p{Punct}","").toLowerCase();
String[] words = body.split(" ");
for(String word : words){
output.collect(new Text(word), new IntWritable(1));
}
}author Dv Analytics.com
}
public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter r)throws IOException {
//Do the reduce-side processing i.e., write out the key and value pairs
// here Reducer receives key, value pairs as: word, [1,1,1,...,1]
int count = 0;
while(values.hasNext()){
count += values.next().get();
}
output.collect(key, new IntWritable(count));
}
}
public static void main(String[] args) throws Exception{
Path in = new Path(args[0]); //create Path variable for input data
Path out = new Path(args[1]); //Create Path variable for output data
JobConf conf = new JobConf(WordCountApp2.class); //Create an instance of JobConf Class using Job Control class as argument
conf.setMapperClass(WordCountMapper.class); //set the Map Class
conf.setReducerClass(WordCountReducer.class); //set the Reduce Class
//conf.setCombinerClass(MyReduceClass.class); //Set the Combiner Class if applicable.
conf.setOutputKeyClass(Text.class); //set output key class for map and reduce
conf.setOutputValueClass(IntWritable.class); //set output value class for the map and reduce
FileInputFormat.setInputPaths(conf, in); //Configure input to which job is going to process and
FileOutputFormat.setOutputPath(conf, out); //output to which job will write the output after execution.
conf.setJobName("WordCount"); //set an appropriate name for your job
JobClient.runJob(conf); //Submit the configured job to JobTracker
}
}
Map Class
Let’s discuss in details about the Map Class and map() method in the old API:
public static class WordCountMapper extends MapReduceBase implements Mapper<LongWritable,Text,Text,IntWritable>{
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter r)throws IOException {
//Do the map-side processing i.e., write out key and value pairs
String line = value.toString();
String body = line.replaceAll("\\p{Punct}","").toLowerCase();
String[] words = body.split(" ");
for(String word : words){
output.collect(new Text(word), new IntWritable(1));
}
}
}
Map class extends the
MapReduceBase
class and it also implements
Mapper
interface and in the definition,
WordCountMapper extends MapReduceBase implements Mapper< LongWritable, Text, Text, IntWritable >
, we have provided four datatypes : two datatypes of Input Key and Input Values to Mapper and other two datatypes for Output Key and Output Value from the Mapper and they are: LongWritable, Text, Text, IntWritable. Which means the Mapper takes input key, values pair as LongWritable, Text and it outputs key, value pairs as Text, IntWritable.
The map() method:
map(LongWritable key, Text value, OutputCollector
output, Reporter r)
, which belongs to Mapper interface, again takes a key and a value for input data and the OutputCollector object which is the output data (i.e., key,value) collector and very similar to the Context object that we have seen in the previous section. Likewise the Context, OutputCollector is also used with both the map() and reduce() methods to collect output data. A Reporter reports progress and update counters, status information. We can observe the functions of Reporter at the console when the job is in execution, like the percentage of the map and reduce task has been done, and after the completion of the job you can see the different counters. We write the output using the collect() method on the OutputCollector object to specify our output key and output value. The total number of map() methods depends on the total number of records available in the input file i.e., one map() method will be executed per record. Here the processing logic is same as we have seen in the previous section.
Reduce Class
Let’s discuss in details about the Reduce Class and reduce() method in the old API:
public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter r)throws IOException {
//Do the reduce-side processing i.e., write out the key and value pairs
// here Reducer receives key, value pairs as: word, [1,1,1,...,1]
int count = 0;
while(values.hasNext()){
count += values.next().get();
}
output.collect(key, new IntWritable(count));
}
}
Reduce class extends the
MapReduceBase
class and it also implements
Reducer
interface and in the definition,
WordCountReducer extends MapReduceBase implements Reducer< Text, IntWritable, Text, IntWritable >
, we have provided four datatypes : two datatypes of Input Key and Input Values to Reducer and other two datatypes for Output Key and Output Value from the Reducer and they are: Text, IntWritable, Text, IntWritable. Which means the Mapper takes input key, values pair as Text, IntWritable and it outputs key, value pairs as Text, IntWritable.
The reduce() method:
void reduce(Text key, Iterator
values, OutputCollector
output, Reporter r)
takes a key and a Iterator list of values for this key as input data. Here the input key, value type to reduce() must be same as output key, value type of Mapper and in this example which are Text and IntWritable. The OutputCollector object is the data collector for the output. A Reporter reports progress and update counters, status information. We write the records using the collect() method on the OutputCollector object to specify our output key and output value. The total number of reduce() methods depends on the total number of keys supplied as the input to it. More precisely for every key and iterator list of values, reduce() method would be executed once. Processing logic is same as we have seen in the reduced part in the previous section.
Job Configuration
Now let’s look at the main() method. When we execute a Hadoop job, we are going to pass some command line arguments which are the input data to our job and the output data from our job. So first thing in our main method we do is to retrieve the input data and output data arguments and create a new object from them. These are Hadoop Path objects which allows Hadoop to read from and write to the relevant places on either HDFS or Local Drive based on what are the configuration is.
Path in = new Path(args[0]);
Path out = new Path(args[1]);
The next line creates an instance of JobConf class passing in the main class as a parameter.
JobConf conf = new JobConf(WordCountApp2.class);
Next, we have created an instance of our Hadoop job. Then we need to tell Hadoop what is the output data from our Map and Reduce classes will look like. The lines of code here assume that both the map() and reduce() methods output in the same format which is by default as Text.
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
Now we need to tell Hadoop where to find the Map and Reduce classes. Remember when we call Hadoop we only tell on the command line using the main class, here for this example it is WordCountApp2. This WordCountApp2 creates a job and this job tells the Hadoop where is the Map class and where is the Reduce class.
conf.setMapperClass(WordCountMapper.class);
conf.setReducerClass(WordCountReducer.class);
Then we can specify the format of input and output data which is optional. Here we haven’t used such configuration but by default it would be considered by Hadoop as:
conf.setInputFormatClass(TextInputFormat.class);
conf.setOutputFormatClass(TextOutputFormat.class);
Then we tell Hadoop where the input and output data is using the Path variables we have created at the beginning of the main method.
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
Then we need to tell the Hadoop what will the Job Control class name once we create the JAR file of our overall project.
conf.setJarByClass(WordCountApp2.class);
Then we can set a name for our job and this is optional step to put in our job configuration.
conf.setJobName("WordCount");
Finally, we call runJob() method passing in the JobConf object to submit our job to Hadoop and then Hadoop starts execution of the job.
JobClient.runJob(conf);
So that’s all we need to run our first Hadoop Application. Now you can create the JAR file and run the job in the same way that we have already discussed in the previous section.
Here you need to know about the output generated by the job when we use the Old API. When you run this job in the stand-alone or pseudo-distributed mode you will get two files in the output repository like:
part-00000 and _SUCCESS
You can observe here, using old API we get common output file for both the map only job and job having both map and reduce functionalities.
Differences Between Old and New API
Difference at the Map Class level:
Old API | New API |
---|---|
Map class extends MapReduceBase class and Mapper interface from org.apache.hadoop.mapred package. | Map class extends Mapper abstract class from org.apache.hadoop.mapreduce package. |
OutputCollector is used to collect the output from Map and Reporter is used to report the progress of job to JobTracker. | Context is used to take care the functions of OutputCollector and Reporter. |
collect(key, value) method used on the OutputCollector object to write out the output. | write(key, value) method used on Context object to write out the output. |
IdentityMapper is defined in org.apache.hadoop.mapred.lib package. | Don’t have IdentityMapper class. |
Don’t have InverseMapper class | InverseMapper class is defined in org.apache.hadoop.mapreduce.lib.map package. |
| Don’t have RegexMapper class | RegexMapper class belongs to org.apache.hadoop.mapreduce.lib.map package.|
Difference at the Reduce Class level:
Old API | New API |
---|---|
Reduce class extends MapReduceBase class and Reducer interface from org.apache.hadoop.mapred package. | Reduce class extends Reducer abstract class from org.apache.hadoop.mapreduce package. |
OutputCollector is used to collect the output from Map and Reporter is used to report the progress of job to JobTracker. | Context is used to take care the functions of OutputCollector and Reporter. |
collect(key, value) method used on the OutputCollector object to write out the output. | write(key, value) method used on Context object to write out the output. |
The reduce() method passes values as a java.lang.Iterator | The reduce() method passes values as a java.lang.Iterable |
IdentityReducer is defined in org.apache.hadoop.mapred.lib package. | Don’t have IdentityReducer Class |
LongSumReducer class belongs to org.apache.hadoop.mapred.lib package. | LongSumReducer class belongs to org.apache.hadoop.mapreduce.lib.reduce package. |
Difference at the Main Class level:
Old API | New API |
---|---|
An instance of org.apache.hadoop.mapred.JobConf class and it’s plenty of methods used to configure the job. | An instance of org.apache.hadoop.mapreduce.Job class and it’s plenty of methods used to configure the job. |
Tool interface and ToolRunner class are there in the old API. | No such interface and class are there in the new API but we can use the Tool interface and ToolRunner class here too. |
runJob() method is used as JobClient.runJob(JobConf); to run the job. | submit() and waitForCompletion(Boolean) method used on org.apache.hadoop.mapreduce.Job to run the job . |
Output is generated as Part-00000 for both Mapper and Reducer. | From map-only job, output is generated as Part-m-00000 and for the job having both Mapper and Reducer output is generated as Part-r-00000. Here m stands for Mapper and r stands for reducer. |
Important Points:
- In the Old API all the interfaces are in org.apache.hadoop.mapred package.
- In the New API all the abstract classes are in org.apache.hadoop.mapreduce package.
- Most of the interfaces from old API are packaged as abstract classes in new API.
- When you implement the interface then you must have to define all the methods but in the case of an abstract class, you need to override only desired methods. So new API has reduced the complexity of programming.
- Identity Mapper is the default Mapper class provided by Hadoop and this will be picked automatically when no mapper is specified in MapReduce driver class. Identity Mapper class implements the identity function, which directly writes all its input key/value pairs into the output. It is a generic mapper class and it can be used with any key/value data types. In its usage, map input key, value types must match with its output key, value types.
- InverseMapper a generic mapper class which simply reverses (or swaps) its input (key, value ) pairs into (value, key) pairs in output.
- RegexMapper class extracts text matching with the given regular expression.
- In the WordCount job as given above, we can replace the reference of WordCountReducer.class with the default LongSumReducer.class. Both of them meant to be same functionality.job.setReducerClass(LongSumReducer.class);