How to Unit Test Kafka

Note: this post was revised in January 20th, 2017 to reflect changes in Kafka



When using Apache Kafka, one concern raised is how to run unit tests for the consumers without the need to start a whole Kafka cluster and Zookeeper.

In Kafka 0.9 two Mock classes was added: MockProducer and MockConsumer.

The problem with those mock classes is that in many cases they are just unusable. The reason is that we use frameworks for Kafka consumers that do not allow to implement a mock class instead of the real KafkaConsumer that is internally used.

But still, we want to be able to test our code somehow without the need to start Zookeeper and Kafka server always.

In this article I suggest a new approach that can be helpful in case you want to write a unit-test for your code that implements Kafka Consumer.

Test case

I take for example the case of Spark Streaming using Kafka Receiver.

The whole example can be found in this GitHub repository.

In this project, I set a code example for Spark Streaming using Kafka receiver to perform word count application. There is code in both Java and Scala.

This is the Word Count Java code to be tested (complete code can be found here):

package com.myspark;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public final class JavaKafkaWordCount {
 private static final Pattern SPACE = Pattern.compile(" ");

 private JavaKafkaWordCount() {

 public static void main(String[] args) {
 if (args.length < 4) {
 System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");

 SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
 sparkConf.setMaster("local[2]"); //set master server
 sparkConf.set("", "");
 // Create the context with 2 seconds batch size
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

 int numThreads = Integer.parseInt(args[3]);
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
 String[] topics = args[2].split(",");
 for (String topic: topics) {
 topicMap.put(topic, numThreads);

 JavaPairReceiverInputDStream<String, String> messages =
 KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

 JavaDStream<String> lines = Function<Tuple2<String, String>, String>() {
 public String call(Tuple2<String, String> tuple2) {
 return tuple2._2();

 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
 public Iterable<String> call(String x) {
 return Lists.newArrayList(SPACE.split(x));

 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
 new PairFunction<String, String, Integer>() {
 public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
 public Integer call(Integer i1, Integer i2) {
 return i1 + i2;


And now –  the test

In order to test the class above, we perform the following steps:

  1. Start a local Zookeeper server
  2. Start a local Kafka server
  3. Create Kafka Producer
  4. Run the Spark Streaming program
  5. Send some messages through the Kafka Producer

The code below does this (complete code can be found here):

package com.myspark;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.kafka.clients.producer.*;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.junit.*;

import java.nio.charset.Charset;
import java.util.Properties;

public class JavaKafkaWordCountTest {
    private static final String TOPIC = "topic-1";
    private static final String BROKERHOST = "";
    private static final String BROKERPORT = "9092";
    private static final String ZKPORT = "2181";

    private String nodeId = "0";
    private String zkConnect = "localhost:" + ZKPORT;
    private KafkaServerStartable server;
    KafkaProducer<Integer, byte[]> producer;

    public void setup() throws IOException {
        //start kafka
        // setup producer

    public void tearDown() throws Exception {

    private static void startZK() throws IOException {
        final File zkTmpDir = File.createTempFile("zookeeper", "test");

        new Thread() {
            public void run() {
                ZooKeeperServerMain.main(new String [] {ZKPORT,  zkTmpDir.getAbsolutePath()});

        try {
        } catch (InterruptedException e) {

    private void startKafka() {
        Properties props = new Properties();
        props.put("", nodeId);
        props.put("port", BROKERPORT);
        props.put("zookeeper.connect", zkConnect);
        props.put("", "");
        KafkaConfig conf = new KafkaConfig(props);
        server = new KafkaServerStartable(conf);

    private void setupProducer() {
        Properties producerProps = new Properties();
        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(producerProps);

    public void testSparkWordCount() throws Exception {
        Thread t =  new Thread(() -> {
            String[] args = {"localhost", "grp-1", TOPIC, "2"};
            System.out.println("End Child Thread");

        for (int i=0; i<1000; i++){
            producer.send(new ProducerRecord<>(TOPIC, 0, 1, ("There are some words here to count -" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"))));
        System.out.println("End Test");




How to use Avro with Kafka and Storm


In this post I’ll provide a practical example of how to integrate Avro with data flowing from Kafka to Storm

  • Kafka is a highly available high-throughput messaging system from LinkedIn.
  • Storm is adistributed event stream processing system from Twitter.
  • Avro is a data serialization protocol that is based on schema.

All the above are open source projects.

The challenge

There are two main challenges when trying to make Avro data to flow between Kafka and Storm:

  1. How to serialize/deserialize the data of Avro on Kafka
  2. How to manage the changes in protocol that will come when schema evolves.

Code example

Here I provide a simple code sample that demonstrate the use of Avro with Kafka and Storm.

The code can be downloaded from GitHub here:

In order to download it, just type:

$ git clone

Building from Source

$ mvn install -Dmaven.test.skip=true


The project contains two modules: one is AvroMain which is the Kafka Producer test application. Second is AvroStorm which is the Storm Topology code

Basic Usage:

LPEvent.avsc is the Avro Schema. It exists in both projects.

MainTest runs three tests, each of which sends event to Kafka. You may need to change the variable ‘zkConnection’ to your own zookeeper servers list that hold Kafka service brokers.

AvroTopology is the class that runs the Storm Topology. You may need to change the host list zookeeper servers list that holds Kafka service brokers.

First start the topology, then run the tests and the events will pass via Kafka in Avro format

First Test Send event with GenericRecord class.

Second Test Build Avro object using Generated Resources. This test will compile only after the code is generated using the Maven plugin avro-maven-plugin.

Third Test Serializes the Avro objects into file and then reads the file.

upgrade Avro scema version:

AvroTopology2 is a second Storm topology that demonstrates how to use a schema that differs between producer and consumer. The difference in the code is in the line:$ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema, _newSchema); Note that there are two parameters to the GenericDatumReader constructor.