How to use Avro with Kafka and Storm


In this post I’ll provide a practical example of how to integrate Avro with data flowing from Kafka to Storm

  • Kafka is a highly available high-throughput messaging system from LinkedIn.
  • Storm is adistributed event stream processing system from Twitter.
  • Avro is a data serialization protocol that is based on schema.

All the above are open source projects.

The challenge

There are two main challenges when trying to make Avro data to flow between Kafka and Storm:

  1. How to serialize/deserialize the data of Avro on Kafka
  2. How to manage the changes in protocol that will come when schema evolves.

Code example

Here I provide a simple code sample that demonstrate the use of Avro with Kafka and Storm.

The code can be downloaded from GitHub here:

In order to download it, just type:

$ git clone

Building from Source

$ mvn install -Dmaven.test.skip=true


The project contains two modules: one is AvroMain which is the Kafka Producer test application. Second is AvroStorm which is the Storm Topology code

Basic Usage:

LPEvent.avsc is the Avro Schema. It exists in both projects.

MainTest runs three tests, each of which sends event to Kafka. You may need to change the variable ‘zkConnection’ to your own zookeeper servers list that hold Kafka service brokers.

AvroTopology is the class that runs the Storm Topology. You may need to change the host list zookeeper servers list that holds Kafka service brokers.

First start the topology, then run the tests and the events will pass via Kafka in Avro format

First Test Send event with GenericRecord class.

Second Test Build Avro object using Generated Resources. This test will compile only after the code is generated using the Maven plugin avro-maven-plugin.

Third Test Serializes the Avro objects into file and then reads the file.

upgrade Avro scema version:

AvroTopology2 is a second Storm topology that demonstrates how to use a schema that differs between producer and consumer. The difference in the code is in the line:$ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema, _newSchema); Note that there are two parameters to the GenericDatumReader constructor.