Map Reduce in Hadoop calculating Average value

Calculate Average value in WordCount MapReduce on Hadoop

The famous example of Word Count that can be found here here Shows a simple MapReduce that sets counter of words.

Here we set an example that instead of counting the words, will print out the average value of word count.

In order to do so, we changed the original program:

  • We separated between combiner and reducer (it was the same class used for both puposes)
  • We created a new reducer that accumulates the Average value accross the iterations and eventually prints the final value

The code can be downloaded from here.

Usage

Let’s take three files with words:

$ echo “Hadoop is an elephant” > file0
$ echo “Hadoop is as Yellow as can be” > file1
$ echo “Oh what a yellow fellow is Hadoop” > file2
$ hdfs dfs -put file* /user/ran/wordcount/input

Now we run the original WordCount program on this file. The result will look something like this:

  • a 1
  • an 1
  • as 2
  • be 1
  • can 1
  • elephant 1
  • fellow 1
  • hadoop 3
  • is 3
  • oh 1
  • what 1
  • yellow 2

Now let’s run our WordCountAverage program. The new result will look like this:

  • avegage 1.0
  • avegage 1.0
  • avegage 1.3333334
  • avegage 1.25
  • avegage 1.2
  • avegage 1.1666666
  • avegage 1.1428572
  • avegage 1.375
  • avegage 1.5555556
  • avegage 1.5
  • avegage 1.4545455
  • avegage 1.5

The last line is the true average result. Each iteration prints an intermidiate result, but only the last one is the final result.

The code

package com.ransilberman;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountAverage {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString().toLowerCase());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class Reduce
            extends Reducer<Text,IntWritable,Text,FloatWritable> {
        private FloatWritable result = new FloatWritable();
        Float average = 0f;
        Float count = 0f;
        int sum = 0;
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {

            Text sumText = new Text("avegage");
            for (IntWritable val : values) {
                sum += val.get();

            }
            count += 1;
            average = sum/count;
            result.set(average);
            context.write(sumText, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCountAverage.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Author: Ran Silberman

I am a tour guide in Israel with a passion for the Bible. For many years I work in the software industry as a software consultant. I blog in http://ransilberman.blog

One thought on “Map Reduce in Hadoop calculating Average value”

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: