Background
In this post I’ll provide a practical example of how to integrate Avro with data flowing from Kafka to Storm
- Kafka is a highly available high-throughput messaging system from LinkedIn.
- Storm is adistributed event stream processing system from Twitter.
- Avro is a data serialization protocol that is based on schema.
All the above are open source projects.
The challenge
There are two main challenges when trying to make Avro data to flow between Kafka and Storm:
- How to serialize/deserialize the data of Avro on Kafka
- How to manage the changes in protocol that will come when schema evolves.
Code example
Here I provide a simple code sample that demonstrate the use of Avro with Kafka and Storm.
The code can be downloaded from GitHub here: https://github.com/ransilberman/avro-kafka-storm.git
In order to download it, just type:
$ git clone https://github.com/ransilberman/avro-kafka-storm.git
Building from Source
$ mvn install -Dmaven.test.skip=true
Usage:
The project contains two modules: one is AvroMain which is the Kafka Producer test application. Second is AvroStorm which is the Storm Topology code
Basic Usage:
LPEvent.avsc
is the Avro Schema. It exists in both projects.
MainTest
runs three tests, each of which sends event to Kafka. You may need to change the variable ‘zkConnection’ to your own zookeeper servers list that hold Kafka service brokers.
AvroTopology
is the class that runs the Storm Topology. You may need to change the host list zookeeper servers list that holds Kafka service brokers.
First start the topology, then run the tests and the events will pass via Kafka in Avro format
First Test Send event with GenericRecord class.
Second Test Build Avro object using Generated Resources. This test will compile only after the code is generated using the Maven plugin avro-maven-plugin
.
Third Test Serializes the Avro objects into file and then reads the file.
upgrade Avro scema version:
AvroTopology2
is a second Storm topology that demonstrates how to use a schema that differs between producer and consumer. The difference in the code is in the line:$ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema, _newSchema);
Note that there are two parameters to the GenericDatumReader constructor.