• No products in the cart.

Solved problem in MapReduce

 

First, let me introduce you another template before you encounter the exercise problems. This template is the upgraded version of the first template and it has an extra method to delete the existing output repository. Using the template, you don’t need to delete the output repository explicitly whenever you will re-execute your job several times. This will reduce your time and effort to delete the existing output repository.

GenericMapReduceTemplate- Version2:


//A generic template for MapReduce Program which will delete the previously generated output repository.


import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class GenericMapReduceDeleteOutput {
        public static class MyMapper extends Mapper<Text, Text, Text, Text> {

            public void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {
            //Do the map-side processing i.e., write out key and value pairs
                context.write(new Text(""), new Text(""));
            }
        }

        public static class MyReducer extends Reducer<Text, Text, Text, Text> {

            public void reduce(Text key, Iterable<Text> values,
                Context context) throws IOException, InterruptedException {
                //Do the reduce-side processing i.e., write out the key and value pairs
                Iterator<Text> it = values.iterator();
                while(it.hasNext()){
                    //Do some calculation
                }
                context.write(new Text(""), new Text(""));
            }
        }

        public static void deletePreviousOutput(Configuration conf, Path path){

            try{
                FileSystem hdfs = FileSystem.get(conf);
                hdfs.delete(path,true);
            }
            catch(IOException e){
                //ignore any exception    
            }
        }

        public static void main(String[] args) throws Exception {

        /**Configure the job**/
            Path in = new Path(args[0]);                //Create Path variable for input data
            Path out = new Path(args[1]);                //Create Path variable for output data

            Configuration conf = new Configuration();    //Create a token of Configuration 
            Job job = Job.getInstance(conf);            //Create an instance of Job class using the getInstance()

            deletePreviousOutput(conf,out);            //Delete the previously created output repository

            job.setOutputKeyClass(Text.class);            //Set the "key" output class for the map and reduce methods
            job.setOutputValueClass(Text.class);        //Set the "value" output class for the map and reduce methods     

            job.setMapperClass(MyMapClass.class);                 //Set the Mapper Class
            job.setReducerClass(MyReduceClass.class);            //Set the Reducer Class
            //job.setCombinerClass(MyReduceClass.class);        //Set the Combiner Class if applicable.

            job.setInputFormatClass(TextInputFormat.class);            //Set the InputFormat Class
            job.setOutputFormatClass(TextOutputFormat.class);        //Set the OutputFormat Class

            FileInputFormat.addInputPath(job, new Path(args[0]));    //Specify the InputPath to the Job to read input data
            FileOutputFormat.setOutputPath(job, new Path(args[1]));    //Specify the OutputPath to the Job to write output data

            job.setJarByClass(GenericMapReduce.class);     //set the JobControl class name in the JAR file
            job.setJobName("MyJob");                     //set a name for you job

            job.submit();                                 //submit your job to Hadoop for execution

}

Problem 1: Vowel Analysis


Problem Statement:

We have given a text file or many text files and we have to calculate the total number of occurrences of each vowel in the file(s).


File:

This file is The

Bible

Book in text format available in the public domain. You can download it using the reference

http://www.gutenberg.org

. This file contains mostly texts and some numeric data. Few lines of this file are as given below:

The Old Testament of the King James Bible
The First Book of Moses:  Called Genesis
1:1 In the beginning God created the heaven and the earth.
1:2 And the earth was without form, and void; and darkness was upon
the face of the deep. And the Spirit of God moved upon the face of the waters.
1:3 And God said, Let there be light: and there was light.
1:4 And God saw the light, that it was good: and God divided the light from the darkness.

Algorithm:


Map Side:

As you can observe, for Mapper we can use the default input key provided by Hadoop as LongWritable which is the file offset and input value as Text which is the content of the file. For output key and output value, we can use Text and LongWritable. Inside the map() method we will convert the Text value as String. We will remove all the punctuations from the sting and convert them in lower case and create character array to hold this string. Finally, we will loop through each character of this array and for every vowel we will write out the vowel as key and a LongWritable 1 as value.

So the Map Class code goes like this:

    public static class VowelAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

            public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //Do the map-side processing i.e., write out key and value pairs

                //Remove all punctuation and place in lower case
                String line = value.toString().replaceAll("\\p{Punct}","").toLowerCase();
                char[] chars = line.toCharArray();

                for(char letter : chars){
                switch(letter){
                case 'a':
                case 'e':
                case 'i':
                case 'o':
                case 'u':
                    context.write(new Text(String.valueOf(letter)), new LongWritable(1));
                }
            }
        }
    }


Reduce Side:

The reducer will receive input key as Text and input value as an iterable list of LongWritable. In the reduce() method, for every key, we will loop through its list of values and sum them up. Finally, we will write out the same input key as output key and summed up value as an output value. Here output key, value pair from Reducer would be Text and LongWritable. So the Reduce Class code goes like this:

    public static class VowelAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

            public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            //Do the reduce-side processing i.e., write out the key and value pairs
            Iterator<LongWritable> it = values.iterator();
            long sum = 0;
            while(it.hasNext()){
            //Do some calculation
                sum += it.next().get();

            }
            context.write(key, new LongWritable(sum));
        }
    }

The full functional code is given below. You can go through the steps discussed in the

Developing The First MapReduce Application

section to develop, compile and run this job using Eclipse.

/**
 * A MapReduce WordAnalysis Job in Hadoop using new API.
 * Problem Statement: We have given a text file or many text files and we have to calculate the total number of occurrences of each
 * vowels in the file(s). 
 */
import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


/**
 * @author course.dvanalyticsmds.com
 *
 */
public class VowelAnalysis{
        public static class VowelAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

            public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            //Do the map-side processing i.e., write out key and value pairs

                //Remove all punctuation and place in lower case
                String line = value.toString().replaceAll("\\p{Punct}","").toLowerCase();
                char[] chars = line.toCharArray();

                for(char letter : chars){
                switch(letter){
                case 'a':
                case 'e':
                case 'i':
                case 'o':
                case 'u':
                    context.write(new Text(String.valueOf(letter)), new LongWritable(1));
                }
            }
        }
    }

        public static class VowelAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

            public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
            //Do the reduce-side processing i.e., write out the key and value pairs
            Iterator<LongWritable> it = values.iterator();
            long sum = 0;
            while(it.hasNext()){
            //Do some calculation
                sum += it.next().get();

            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void deletePreviousOutput(Configuration conf, Path path){

            try{
                FileSystem hdfs = FileSystem.get(conf);
                hdfs.delete(path,true);
            }
            catch(IOException e){
                //ignore any exception    
            }
        }

    public static void main(String[] args) throws Exception {

        /**Configure the job**/
        Path in = new Path(args[0]);            //Create Path variable for input data
        Path out = new Path(args[1]);            //Create Path variable for output data


        Configuration conf = new Configuration();            //Create an instance Configuration Class 
        Job job = Job.getInstance(conf);                    //Create an instance of Job class using the getInstance()

        deletePreviousOutput(conf,out);                //Delete the previously created output repository

        job.setOutputKeyClass(Text.class);                    //Set the "key" output class for the map 
        job.setOutputValueClass(LongWritable.class);        //Set the "value" output class for the map     

        job.setMapperClass(VowelAnalysisMapper.class);             //Set the Mapper Class
        job.setReducerClass(VowelAnalysisReducer.class);        //Set the Reducer Class
        //job.setCombinerClass(MyReduceClass.class);        //Set the Combiner Class if applicable.

        job.setInputFormatClass(TextInputFormat.class);        //Set the InputFormat Class
        job.setOutputFormatClass(TextOutputFormat.class);    //Set the OutputFormat Class

        FileInputFormat.addInputPath(job, in);            //Specify the InputPath to the Job
        FileOutputFormat.setOutputPath(job, out);        //Specify the OutputPath to the Job

        job.setJarByClass(VowelAnalysis.class);             //set the JobControl class name in the JAR file
        job.setJobName("VowelsCount");                     //set a name for you job

        job.submit();                                     //submit your job to Hadoop for execution


    }
}

After the successful execution of this job on Bible file, it would produce the following output:

a    274728
e    410413
i    192841
o    241852
u    82970

Problem 2: Accidents Analysis


Problem Statement:

We have given file(s) which contains different types of accidents that happened in the different cities in India in the

* year of 2011-2015. We need to calculate the total accidents happened in each of these cities.


File:

The file contains two columns as Name of City and different types of accidents like: Total number of Fatal Accidents, All Accidents, Persons Kille, Persons Injured that happened in India from 2011-2015. This file is edited to suit the need of this tutorial. You can download the original files from

https://data.gov.in/catalogs/sector/Transport-9383

. The original files had various NA values for the different cities for the different types of accidents and I have replaced those values into 0 for the ease of calculations. Few lines of this file are as given below:

Agra,336
Ahmedabad,222
Asansol-Durgapur,229
Aurangabad,161
Bengaluru,689
Bhopal,275
Coimbatore,253
Delhi,2007
Dhanbad,74
Faridabad,229
Ghaziabad,495

Algorithm:


Map Side:

As you can observe, for Mapper we can use the default input key supplied by Hadoop as LongWritable which is the file offset and input value as Text which is the content of the file. For output key and output value, we can use Text and LongWritable. In the map() method, we need to convert the text value into a string and split this new string value using the “,” delimiter and store them in a string array. Then we can simply write out first entry of the array as key and the second entry as value. Hence we can use output key and output value pair from the Mapper as: Text and LongWritable. So the Map Class code goes like this:

    public static class AccidentAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {


            /*Do the map-side processing i.e., write out key and value pairs
             Map will receive key and value pairs like: LongWritableOffset and Agra,336
                                                        LongWritableOffset and Ahmedabad,222
                                                        ....................................
             So we need to convert the text value into a string and split this new string value using the "," delimiter and store them in a string array. Them we can simply write out first entry of the array as key and the second entry as value.
            */
                String line = value.toString();
                String[] words = line.split(",");
                    context.write(new Text(words[0]), new LongWritable(Long.valueOf(words[1])));
            }
        }


Reduce Side:

The reducer will receive input key as Text and input value as an iterable list of LongWritable. In the reduce() method, for every key, we will loop through its list of values and sum them up. Finally, we will write out the same input key as output key and summed up value as an output value. Here output key, value pair from Reducer would be Text and LongWritable. So the Reduce Class code goes like this:

    public static class AccidentAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
                /*Do the reduce-side processing i.e., write out the key and value pairs
                 Here Reducer receives key, value pairs as: cityname, [sone long value,some long vlaue, some long value,....]
                 So we will iterate through the list of each key and sum up the value. Finally we need to write out the cityname
                 as key and sum as the value.

                */
                Iterator<LongWritable> it = values.iterator();
                Long sum = 0l;
                while(it.hasNext()){
                    sum += it.next().get();
                }
                context.write(key, new LongWritable(sum));
            }
        }

The full functional code is given below. You can go through the steps discussed in the

Developing The First MapReduce Application

section to develop, compile and run this job using Eclipse.


/**
 * A MapReduce AccidentAnalysis Program in Hadoop using new API.
 * Problem Statement: We have given file(s) which contains different types of accidents that happened in the different cities in India in the
 * year of 2011-2015. We need to calculate the total accidents happened in each of these cities.
 *
 */

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * @author Dv Analytics.com
 *
 */

public class AccidentAnalysis {
        public static class AccidentAnalysisMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {


            /*Do the map-side processing i.e., write out key and value pairs
             Map will receive key and value pairs like: LongWritableOffset and Agra,336
                                                        LongWritableOffset and Ahmedabad,222
                                                        ....................................
             So we need to convert the text value into string and split this new string value using the "," delimiter and
             store them in a string array. Them we can simply write out first entry of the array as key and the second
             entry as value.
            */
                String line = value.toString();
                String[] words = line.split(",");
                    context.write(new Text(words[0]), new LongWritable(Long.valueOf(words[1])));
            }
        }

        public static class AccidentAnalysisReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            public void reduce(Text key, Iterable<LongWritable> values,
                Context context) throws IOException, InterruptedException {
                /*Do the reduce-side processing i.e., write out the key and value pairs
                 Here Reducer receives key, value pairs as: cityname, [sone long value,some long vlaue, some long value,....]
                 So we will iterate through the list of each key and sum up the value. Finally we need to write out the cityname
                 as key and sum as the value.

                */
                Iterator<LongWritable> it = values.iterator();
                Long sum = 0l;
                while(it.hasNext()){
                    sum += it.next().get();
                }
                context.write(key, new LongWritable(sum));
            }
        }

        public static void deletePreviousOutput(Configuration conf, Path path){

            try{
                FileSystem hdfs = FileSystem.get(conf);
                hdfs.delete(path,true);
            }
            catch(IOException e){
                //ignore any exception    
            }
        }

        public static void main(String[] args) throws Exception {

            /**Configure the job**/
            Path in = new Path(args[0]);            //Create Path variable for input data
            Path out = new Path(args[1]);            //Create Path variable for output data

            Configuration conf = new Configuration();            //Create an instance Configuration Class 
            Job job = Job.getInstance(conf);                    //Create an instance of Job class using the getInstance()

            deletePreviousOutput(conf,out);                //Delete the previously created output repository

            job.setOutputKeyClass(Text.class);                //Set the "key" output class for the map 
            job.setOutputValueClass(LongWritable.class);        //Set the "value" output class for the map     

            job.setMapperClass(AccidentAnalysisMapper.class);             //Set the Mapper Class
            job.setReducerClass(AccidentAnalysisReducer.class);        //Set the Reducer Class
            //job.setCombinerClass(MyReduceClass.class);        //Set the Combiner Class if applicable.

            job.setInputFormatClass(TextInputFormat.class);        //Set the InputFormat Class
            job.setOutputFormatClass(TextOutputFormat.class);    //Set the OutputFormat Class

            FileInputFormat.addInputPath(job, in);            //Specify the InputPath to the Job
            FileOutputFormat.setOutputPath(job, out);        //Specify the OutputPath to the Job

            job.setJarByClass(AccidentAnalysis.class);             //set the JobControl class name in the JAR file
            job.setJobName("AccidentAnalysis");                     //set a name for you job

            job.submit();                                     //submit your job to Hadoop for execution


        }
}

After the successful execution of this job on Bible file, it would produce the following output:

Agra    13829
Ahmedabad    20858
Allahabad    10618
Amritsar    1719
Asansol-Durgapur    5603
Aurangabad    8069
Bengaluru    55739
Bhopal    34041
Chandigarh    3967
Chennai    79485
Coimbatore    15294
Delhi    93852
Dhanbad    2991
Faridabad    8548
Ghaziabad    12004
Gwalior    20239
Hyderabad    30078
Indore    54235
Jabalpur    32711
Jaipur    21938
Jamshedpur    4889
Jodhpur    8197
Kannur    7704
Kanpur    13704
Khozikode    11967
Kochi    24503
Kolkata    40916
Kollam    19191
Kota    8198
Lucknow    15349
Ludhiana    6551
Madurai    9175
Mallapuram    33909
Meerut    12355
Mumbai    146719
Nagpur    14341
Nashik    8109
Patna    12804
Pune    15644
Raipur    17714
Rajkot    9979
Srinagar    5214
Surat    11658
Thiruvanthapuram    19135
Thrissur    14892
Tiruchirapalli    7358
Vadodra    12922
Varanasi    4463
Vijaywada City    15235
Visakhapatnam    3658
Vizaq    14085

 

DV Analytics

DV Data & Analytics is a leading data science training and consulting firm, led by industry experts. We are aiming to train and prepare resources to acquire the most in-demand data science job opportunities in India and abroad.

Bangalore Center

DV Data & Analytics Bangalore Private Limited
#52, 2nd Floor:
Malleshpalya Maruthinagar Bengaluru.
Bangalore 560075
India
(+91) 9019 030 033 (+91) 8095 881 188
Email: info@dvanalyticsmds.com

Bhubneshwar Center

DV Data & Analytics Private Limited Bhubaneswar
Plot No A/7 :
Adjacent to Maharaja Cine Complex, Bhoinagar, Acharya Vihar
Bhubaneswar 751022
(+91) 8095 881 188 (+91) 8249 430 414
Email: info@dvanalyticsmds.com

top
© 2020. All Rights Reserved.