First, let me introduce you another template before you encounter the exercise problems. This template is the upgraded version of the first template and it has an extra method to delete the existing output repository. Using the template, you don’t need to delete the output repository explicitly whenever you will re-execute your job several times. This will reduce your time and effort to delete the existing output repository.
GenericMapReduceTemplate- Version2:
//A generic template for MapReduce Program which will delete the previously generated output repository.
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class GenericMapReduceDeleteOutput {
public static class MyMapper extends Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
//Do the map-side processing i.e., write out key and value pairs
context.write(new Text(""), new Text(""));
}
}
public static class MyReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
//Do the reduce-side processing i.e., write out the key and value pairs
Iterator<Text> it = values.iterator();
while(it.hasNext()){
//Do some calculation
}
context.write(new Text(""), new Text(""));
}
}
public static void deletePreviousOutput(Configuration conf, Path path){
try{
FileSystem hdfs = FileSystem.get(conf);
hdfs.delete(path,true);
}
catch(IOException e){
//ignore any exception
}
}
public static void main(String[] args) throws Exception {
/**Configure the job**/
Path in = new Path(args[0]); //Create Path variable for input data
Path out = new Path(args[1]); //Create Path variable for output data
Configuration conf = new Configuration(); //Create a token of Configuration
Job job = Job.getInstance(conf); //Create an instance of Job class using the getInstance()
deletePreviousOutput(conf,out); //Delete the previously created output repository
job.setOutputKeyClass(Text.class); //Set the "key" output class for the map and reduce methods
job.setOutputValueClass(Text.class); //Set the "value" output class for the map and reduce methods
job.setMapperClass(MyMapClass.class); //Set the Mapper Class
job.setReducerClass(MyReduceClass.class); //Set the Reducer Class
//job.setCombinerClass(MyReduceClass.class); //Set the Combiner Class if applicable.
job.setInputFormatClass(TextInputFormat.class); //Set the InputFormat Class
job.setOutputFormatClass(TextOutputFormat.class); //Set the OutputFormat Class
FileInputFormat.addInputPath(job, new Path(args[0])); //Specify the InputPath to the Job to read input data
FileOutputFormat.setOutputPath(job, new Path(args[1])); //Specify the OutputPath to the Job to write output data
job.setJarByClass(GenericMapReduce.class); //set the JobControl class name in the JAR file
job.setJobName("MyJob"); //set a name for you job
job.submit(); //submit your job to Hadoop for execution
}
Problem 1: Vowel Analysis
Problem Statement:
We have given a text file or many text files and we have to calculate the total number of occurrences of each vowel in the file(s).
File:
This file is The
Bible
Book in text format available in the public domain. You can download it using the reference
http://www.gutenberg.org
. This file contains mostly texts and some numeric data. Few lines of this file are as given below:
The Old Testament of the King James Bible
The First Book of Moses: Called Genesis
1:1 In the beginning God created the heaven and the earth.
1:2 And the earth was without form, and void; and darkness was upon
the face of the deep. And the Spirit of God moved upon the face of the waters.
1:3 And God said, Let there be light: and there was light.
1:4 And God saw the light, that it was good: and God divided the light from the darkness.
Algorithm:
Map Side:
As you can observe, for Mapper we can use the default input key provided by Hadoop as LongWritable which is the file offset and input value as Text which is the content of the file. For output key and output value, we can use Text and LongWritable. Inside the map() method we will convert the Text value as String. We will remove all the punctuations from the sting and convert them in lower case and create character array to hold this string. Finally, we will loop through each character of this array and for every vowel we will write out the vowel as key and a LongWritable 1 as value.
So the Map Class code goes like this:
public static class VowelAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//Do the map-side processing i.e., write out key and value pairs
//Remove all punctuation and place in lower case
String line = value.toString().replaceAll("\\p{Punct}","").toLowerCase();
char[] chars = line.toCharArray();
for(char letter : chars){
switch(letter){
case 'a':
case 'e':
case 'i':
case 'o':
case 'u':
context.write(new Text(String.valueOf(letter)), new LongWritable(1));
}
}
}
}
Reduce Side:
The reducer will receive input key as Text and input value as an iterable list of LongWritable. In the reduce() method, for every key, we will loop through its list of values and sum them up. Finally, we will write out the same input key as output key and summed up value as an output value. Here output key, value pair from Reducer would be Text and LongWritable. So the Reduce Class code goes like this:
public static class VowelAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//Do the reduce-side processing i.e., write out the key and value pairs
Iterator<LongWritable> it = values.iterator();
long sum = 0;
while(it.hasNext()){
//Do some calculation
sum += it.next().get();
}
context.write(key, new LongWritable(sum));
}
}
The full functional code is given below. You can go through the steps discussed in the
Developing The First MapReduce Application
section to develop, compile and run this job using Eclipse.
/**
* A MapReduce WordAnalysis Job in Hadoop using new API.
* Problem Statement: We have given a text file or many text files and we have to calculate the total number of occurrences of each
* vowels in the file(s).
*/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author course.dvanalyticsmds.com
*
*/
public class VowelAnalysis{
public static class VowelAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//Do the map-side processing i.e., write out key and value pairs
//Remove all punctuation and place in lower case
String line = value.toString().replaceAll("\\p{Punct}","").toLowerCase();
char[] chars = line.toCharArray();
for(char letter : chars){
switch(letter){
case 'a':
case 'e':
case 'i':
case 'o':
case 'u':
context.write(new Text(String.valueOf(letter)), new LongWritable(1));
}
}
}
}
public static class VowelAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
//Do the reduce-side processing i.e., write out the key and value pairs
Iterator<LongWritable> it = values.iterator();
long sum = 0;
while(it.hasNext()){
//Do some calculation
sum += it.next().get();
}
context.write(key, new LongWritable(sum));
}
}
public static void deletePreviousOutput(Configuration conf, Path path){
try{
FileSystem hdfs = FileSystem.get(conf);
hdfs.delete(path,true);
}
catch(IOException e){
//ignore any exception
}
}
public static void main(String[] args) throws Exception {
/**Configure the job**/
Path in = new Path(args[0]); //Create Path variable for input data
Path out = new Path(args[1]); //Create Path variable for output data
Configuration conf = new Configuration(); //Create an instance Configuration Class
Job job = Job.getInstance(conf); //Create an instance of Job class using the getInstance()
deletePreviousOutput(conf,out); //Delete the previously created output repository
job.setOutputKeyClass(Text.class); //Set the "key" output class for the map
job.setOutputValueClass(LongWritable.class); //Set the "value" output class for the map
job.setMapperClass(VowelAnalysisMapper.class); //Set the Mapper Class
job.setReducerClass(VowelAnalysisReducer.class); //Set the Reducer Class
//job.setCombinerClass(MyReduceClass.class); //Set the Combiner Class if applicable.
job.setInputFormatClass(TextInputFormat.class); //Set the InputFormat Class
job.setOutputFormatClass(TextOutputFormat.class); //Set the OutputFormat Class
FileInputFormat.addInputPath(job, in); //Specify the InputPath to the Job
FileOutputFormat.setOutputPath(job, out); //Specify the OutputPath to the Job
job.setJarByClass(VowelAnalysis.class); //set the JobControl class name in the JAR file
job.setJobName("VowelsCount"); //set a name for you job
job.submit(); //submit your job to Hadoop for execution
}
}
After the successful execution of this job on Bible file, it would produce the following output:
a 274728
e 410413
i 192841
o 241852
u 82970
Problem 2: Accidents Analysis
Problem Statement:
We have given file(s) which contains different types of accidents that happened in the different cities in India in the
* year of 2011-2015. We need to calculate the total accidents happened in each of these cities.
File:
The file contains two columns as Name of City and different types of accidents like: Total number of Fatal Accidents, All Accidents, Persons Kille, Persons Injured that happened in India from 2011-2015. This file is edited to suit the need of this tutorial. You can download the original files from
https://data.gov.in/catalogs/sector/Transport-9383
. The original files had various NA values for the different cities for the different types of accidents and I have replaced those values into 0 for the ease of calculations. Few lines of this file are as given below:
Agra,336
Ahmedabad,222
Asansol-Durgapur,229
Aurangabad,161
Bengaluru,689
Bhopal,275
Coimbatore,253
Delhi,2007
Dhanbad,74
Faridabad,229
Ghaziabad,495
Algorithm:
Map Side:
As you can observe, for Mapper we can use the default input key supplied by Hadoop as LongWritable which is the file offset and input value as Text which is the content of the file. For output key and output value, we can use Text and LongWritable. In the map() method, we need to convert the text value into a string and split this new string value using the “,” delimiter and store them in a string array. Then we can simply write out first entry of the array as key and the second entry as value. Hence we can use output key and output value pair from the Mapper as: Text and LongWritable. So the Map Class code goes like this:
public static class AccidentAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*Do the map-side processing i.e., write out key and value pairs
Map will receive key and value pairs like: LongWritableOffset and Agra,336
LongWritableOffset and Ahmedabad,222
....................................
So we need to convert the text value into a string and split this new string value using the "," delimiter and store them in a string array. Them we can simply write out first entry of the array as key and the second entry as value.
*/
String line = value.toString();
String[] words = line.split(",");
context.write(new Text(words[0]), new LongWritable(Long.valueOf(words[1])));
}
}
Reduce Side:
The reducer will receive input key as Text and input value as an iterable list of LongWritable. In the reduce() method, for every key, we will loop through its list of values and sum them up. Finally, we will write out the same input key as output key and summed up value as an output value. Here output key, value pair from Reducer would be Text and LongWritable. So the Reduce Class code goes like this:
public static class AccidentAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
/*Do the reduce-side processing i.e., write out the key and value pairs
Here Reducer receives key, value pairs as: cityname, [sone long value,some long vlaue, some long value,....]
So we will iterate through the list of each key and sum up the value. Finally we need to write out the cityname
as key and sum as the value.
*/
Iterator<LongWritable> it = values.iterator();
Long sum = 0l;
while(it.hasNext()){
sum += it.next().get();
}
context.write(key, new LongWritable(sum));
}
}
The full functional code is given below. You can go through the steps discussed in the
Developing The First MapReduce Application
section to develop, compile and run this job using Eclipse.
/**
* A MapReduce AccidentAnalysis Program in Hadoop using new API.
* Problem Statement: We have given file(s) which contains different types of accidents that happened in the different cities in India in the
* year of 2011-2015. We need to calculate the total accidents happened in each of these cities.
*
*/
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
/**
* @author Dv Analytics.com
*
*/
public class AccidentAnalysis {
public static class AccidentAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*Do the map-side processing i.e., write out key and value pairs
Map will receive key and value pairs like: LongWritableOffset and Agra,336
LongWritableOffset and Ahmedabad,222
....................................
So we need to convert the text value into string and split this new string value using the "," delimiter and
store them in a string array. Them we can simply write out first entry of the array as key and the second
entry as value.
*/
String line = value.toString();
String[] words = line.split(",");
context.write(new Text(words[0]), new LongWritable(Long.valueOf(words[1])));
}
}
public static class AccidentAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
/*Do the reduce-side processing i.e., write out the key and value pairs
Here Reducer receives key, value pairs as: cityname, [sone long value,some long vlaue, some long value,....]
So we will iterate through the list of each key and sum up the value. Finally we need to write out the cityname
as key and sum as the value.
*/
Iterator<LongWritable> it = values.iterator();
Long sum = 0l;
while(it.hasNext()){
sum += it.next().get();
}
context.write(key, new LongWritable(sum));
}
}
public static void deletePreviousOutput(Configuration conf, Path path){
try{
FileSystem hdfs = FileSystem.get(conf);
hdfs.delete(path,true);
}
catch(IOException e){
//ignore any exception
}
}
public static void main(String[] args) throws Exception {
/**Configure the job**/
Path in = new Path(args[0]); //Create Path variable for input data
Path out = new Path(args[1]); //Create Path variable for output data
Configuration conf = new Configuration(); //Create an instance Configuration Class
Job job = Job.getInstance(conf); //Create an instance of Job class using the getInstance()
deletePreviousOutput(conf,out); //Delete the previously created output repository
job.setOutputKeyClass(Text.class); //Set the "key" output class for the map
job.setOutputValueClass(LongWritable.class); //Set the "value" output class for the map
job.setMapperClass(AccidentAnalysisMapper.class); //Set the Mapper Class
job.setReducerClass(AccidentAnalysisReducer.class); //Set the Reducer Class
//job.setCombinerClass(MyReduceClass.class); //Set the Combiner Class if applicable.
job.setInputFormatClass(TextInputFormat.class); //Set the InputFormat Class
job.setOutputFormatClass(TextOutputFormat.class); //Set the OutputFormat Class
FileInputFormat.addInputPath(job, in); //Specify the InputPath to the Job
FileOutputFormat.setOutputPath(job, out); //Specify the OutputPath to the Job
job.setJarByClass(AccidentAnalysis.class); //set the JobControl class name in the JAR file
job.setJobName("AccidentAnalysis"); //set a name for you job
job.submit(); //submit your job to Hadoop for execution
}
}
After the successful execution of this job on Bible file, it would produce the following output:
Agra 13829
Ahmedabad 20858
Allahabad 10618
Amritsar 1719
Asansol-Durgapur 5603
Aurangabad 8069
Bengaluru 55739
Bhopal 34041
Chandigarh 3967
Chennai 79485
Coimbatore 15294
Delhi 93852
Dhanbad 2991
Faridabad 8548
Ghaziabad 12004
Gwalior 20239
Hyderabad 30078
Indore 54235
Jabalpur 32711
Jaipur 21938
Jamshedpur 4889
Jodhpur 8197
Kannur 7704
Kanpur 13704
Khozikode 11967
Kochi 24503
Kolkata 40916
Kollam 19191
Kota 8198
Lucknow 15349
Ludhiana 6551
Madurai 9175
Mallapuram 33909
Meerut 12355
Mumbai 146719
Nagpur 14341
Nashik 8109
Patna 12804
Pune 15644
Raipur 17714
Rajkot 9979
Srinagar 5214
Surat 11658
Thiruvanthapuram 19135
Thrissur 14892
Tiruchirapalli 7358
Vadodra 12922
Varanasi 4463
Vijaywada City 15235
Visakhapatnam 3658
Vizaq 14085