Map Reduce in Hadoop calculating Average value

Calculate Average value in WordCount MapReduce on Hadoop

The famous example of Word Count that can be found here here Shows a simple MapReduce that sets counter of words.

Here we set an example that instead of counting the words, will print out the average value of word count.

In order to do so, we changed the original program:

  • We separated between combiner and reducer (it was the same class used for both puposes)
  • We created a new reducer that accumulates the Average value accross the iterations and eventually prints the final value

The code can be downloaded from here.

Usage

Let’s take three files with words:

$ echo “Hadoop is an elephant” > file0
$ echo “Hadoop is as Yellow as can be” > file1
$ echo “Oh what a yellow fellow is Hadoop” > file2
$ hdfs dfs -put file* /user/ran/wordcount/input

Now we run the original WordCount program on this file. The result will look something like this:

  • a 1
  • an 1
  • as 2
  • be 1
  • can 1
  • elephant 1
  • fellow 1
  • hadoop 3
  • is 3
  • oh 1
  • what 1
  • yellow 2

Now let’s run our WordCountAverage program. The new result will look like this:

  • avegage 1.0
  • avegage 1.0
  • avegage 1.3333334
  • avegage 1.25
  • avegage 1.2
  • avegage 1.1666666
  • avegage 1.1428572
  • avegage 1.375
  • avegage 1.5555556
  • avegage 1.5
  • avegage 1.4545455
  • avegage 1.5

The last line is the true average result. Each iteration prints an intermidiate result, but only the last one is the final result.

The code

package com.ransilberman;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountAverage {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString().toLowerCase());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class Reduce
            extends Reducer<Text,IntWritable,Text,FloatWritable> {
        private FloatWritable result = new FloatWritable();
        Float average = 0f;
        Float count = 0f;
        int sum = 0;
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {

            Text sumText = new Text("avegage");
            for (IntWritable val : values) {
                sum += val.get();

            }
            count += 1;
            average = sum/count;
            result.set(average);
            context.write(sumText, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCountAverage.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

How to Unit Test Kafka

Note: this post was revised in January 20th, 2017 to reflect changes in Kafka

1534695_589830264418901_705675027_o

Background:

When using Apache Kafka, one concern raised is how to run unit tests for the consumers without the need to start a whole Kafka cluster and Zookeeper.

In Kafka 0.9 two Mock classes was added: MockProducer and MockConsumer.

The problem with those mock classes is that in many cases they are just unusable. The reason is that we use frameworks for Kafka consumers that do not allow to implement a mock class instead of the real KafkaConsumer that is internally used.

But still, we want to be able to test our code somehow without the need to start Zookeeper and Kafka server always.

In this article I suggest a new approach that can be helpful in case you want to write a unit-test for your code that implements Kafka Consumer.

Test case

I take for example the case of Spark Streaming using Kafka Receiver.

The whole example can be found in this GitHub repository.

In this project, I set a code example for Spark Streaming using Kafka receiver to perform word count application. There is code in both Java and Scala.

This is the Word Count Java code to be tested (complete code can be found here):

package com.myspark;

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public final class JavaKafkaWordCount {
 private static final Pattern SPACE = Pattern.compile(" ");

 private JavaKafkaWordCount() {
 }

 public static void main(String[] args) {
 if (args.length < 4) {
 System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
 System.exit(1);
 }

 LoggerTools.setStreamingLogLevels();
 SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
 sparkConf.setMaster("local[2]"); //set master server
 sparkConf.set("com.couchbase.bucket.travel-sample", "");
 // Create the context with 2 seconds batch size
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

 int numThreads = Integer.parseInt(args[3]);
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
 String[] topics = args[2].split(",");
 for (String topic: topics) {
 topicMap.put(topic, numThreads);
 }

 JavaPairReceiverInputDStream<String, String> messages =
 KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

 JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
 @Override
 public String call(Tuple2<String, String> tuple2) {
 return tuple2._2();
 }
 });

 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public Iterable<String> call(String x) {
 return Lists.newArrayList(SPACE.split(x));
 }
 });

 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
 new PairFunction<String, String, Integer>() {
 @Override
 public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
 }
 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
 @Override
 public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 }
 });

 wordCounts.print();
 jssc.start();
 jssc.awaitTermination();
 }
}

And now –  the test

In order to test the class above, we perform the following steps:

  1. Start a local Zookeeper server
  2. Start a local Kafka server
  3. Create Kafka Producer
  4. Run the Spark Streaming program
  5. Send some messages through the Kafka Producer

The code below does this (complete code can be found here):

package com.myspark;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.kafka.clients.producer.*;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.junit.*;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class JavaKafkaWordCountTest {
    private static final String TOPIC = "topic-1";
    private static final String BROKERHOST = "127.0.0.1";
    private static final String BROKERPORT = "9092";
    private static final String ZKPORT = "2181";

    private String nodeId = "0";
    private String zkConnect = "localhost:" + ZKPORT;
    private KafkaServerStartable server;
    KafkaProducer<Integer, byte[]> producer;


    @Before
    public void setup() throws IOException {
        //zookeeper
        startZK();
        //start kafka
        startKafka();
        // setup producer
        setupProducer();
    }

    @After
    public void tearDown() throws Exception {
        server.shutdown();
        server.awaitShutdown();
    }

    private static void startZK() throws IOException {
        final File zkTmpDir = File.createTempFile("zookeeper", "test");
        zkTmpDir.delete();
        zkTmpDir.mkdir();

        new Thread() {
            @Override
            public void run() {
                ZooKeeperServerMain.main(new String [] {ZKPORT,  zkTmpDir.getAbsolutePath()});
            }
        }.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }

    private void startKafka() {
        Properties props = new Properties();
        props.put("broker.id", nodeId);
        props.put("port", BROKERPORT);
        props.put("zookeeper.connect", zkConnect);
        props.put("host.name", "127.0.0.1");
        KafkaConfig conf = new KafkaConfig(props);
        server = new KafkaServerStartable(conf);
        server.startup();
    }

    private void setupProducer() {
        Properties producerProps = new Properties();
        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
        producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(producerProps);
    }

    @Test
    public void testSparkWordCount() throws Exception {
        Thread t =  new Thread(() -> {
            String[] args = {"localhost", "grp-1", TOPIC, "2"};
            JavaKafkaWordCount.main(args);
            System.out.println("End Child Thread");
        });
        t.start();

        for (int i=0; i<1000; i++){
            producer.send(new ProducerRecord<>(TOPIC, 0, 1, ("There are some words here to count -" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"))));
            Thread.sleep(10);
        }
        System.out.println("End Test");
    }

}