Apache Spark with Maven template

I provide hereby a template for starting a Spark project with Maven. Code examples are in both Java and Scala. It provides a template for Spark Core, spark SQL and Spark Streaming.

This is a good starting point for any Spark project that you want to start.

The project template can be found in this GitHub repository.

project structure

Project structure is of a parent pom and a child module.

See picture below:

screenshot-from-2017-01-20-14-59-42

pom.xml explained

All dependencies except for the Spark libraries are defined in <dependencyManagement> tag in the parent pom.

The Spark libraries are defined in two profiles: spark-prod and spark-dev.

‘spark-prod’ contains all spark libraries in scope provided. This provides a fat jar that does not contain the spark libraries in it and hence can be deployed in a cluster using the command spark-submit.

‘spark-dev’ contains all spark libraries in scope compile. This allows debugging it in IntelliJ or eclipse in local mode.

In order to debug the project one needs to set the profile to -Pspark-dev or set it in the IDE profile. For example, in IntelliJ you set it like this:

screenshot-from-2017-01-20-15-14-49

Source templates

There is one Java source template for Spark core called HelloWorldJava.java.

There are three Scala source templates for Spark core, Spark SQL and Spark Streaming respectively:

  • HelloWorldScala.scala
  • HelloWorldSqlScala.scala
  • HelloWorldStreaming.scala

There is one test class Hello_Test.scala. This test uses the testing framework ScalaTest

Trouble Shooting

If you get the following error while trying to run any of the classes:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
 at com.tikalk.HelloWorldScala$.main(HelloWorldScala.scala:13)
 at com.tikalk.HelloWorldScala.main(HelloWorldScala.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

It is due to forgetting to set the profile to ‘spark-dev’.

Set the profile to ‘spark-dev’ as explained in the section: pom.xml explained.


The project template can be found in this GitHub repository.

Streaming your data

So, you want to process data that flows and you want to show information based on this data in real-time, right?

What are the available options you have?

Background

First, let’s cover the components that you will need:

Figure 1. Flow of Data

Data Streaming & Processing

As seen in figure 1, the Data Streaming & Processing is only one part in the architecture.

In this post I will focus only on this part, but will suggest what are the available tools that may fill in for the rest of the components:

Data Source: This is any source of data that may exist. Known tools to send the data into the Processing component are Apache Kafka or Amazon Kinesis.

Analytical Database: Any database that applies you need for aggregating the information for the UI. Some suggestions may be:  Cassandra, HBase, Elasticsearch, Couchbase, etc.

Front End: This will generally be a web server that serves a web application for your use.

Data Streaming and Processing engine

Now we are at the heart of this post.

What are the available Data Streaming and Processing engines that exist and what is the difference between them?

Requirement:

Those are the expected abilities of the Stream processor:

  1. Distributed system with ability to scale linearly.
  2. Fault Tolerance and Durability (Consistency)
  3. Fast!
  4. Resource Management and rebalance resources
  5. Mature enough

Note: For item #1 and #2, according to the CAP theorem, it may be difficult to gain both of them. Therefore, a system that will allow to choose between the two based on the expected SLA will be a good catch!

Two approaches

We can find today two different approaches that can be adapted

  1. Real-time streaming infrastructures. Data streams between processing units via network (Apache Storm, Apache Flink)
  2. Micro-batch infrastructure data is persisted between the hops (Spark Streaming)

There is also a third approach that combines some features from each approach. Apache Samza streams data in real-time, but keeps the intermediates between the hops persisted.

In this post, I will cover the following tools:

Apache Storm

Storm was the first real-time event processing framework that was release ans that addressed most items in our requirements list. It was developed by Nathan Marz and BackType team and then acquired and open sourced in Twitter.

It is also the most adopted system as can be seen here.

Let’s check if Storm addresses all our needs:

Distributed System: Yes. Storm is certainly a distributed system that can scale. I will not go into the architecture of Storm here.  I will only mention that Storm is built on small runnable components that are called Spouts and Bolts. Tuples are passed between the Spouts and Bolts via ZeroMQ/Netty. An application can grow in scale by setting more bolts or by setting more parallelism. For more information please refer to Storm home page.

Fault Tolerance Vs. Fast streaming: Here, there was a lot of work done to allow using a more durable application or a faster application, with a penalty of latency.

So on one side, Storm provided a very good configurable way to address your SLA.

On the other side, when working with Storm you may reach points of not durable enough and trying to be more durable will have a latency cost.

In General, it needs a lot of tuning to adjust the best configuration for your application.

Resource Management: Here again, Storm does not make it easy for you. As real-time data may have daily and seasonal peaks, you may need to adjust your resources to be sure you do not lag behind.

There is however a good initiative of Mesos for Storm that make your like much easier in this regard!

Spark Streaming

Spark was initiated by Matei Zaharia at UC Berkeley. Originally this project meant to address some issues in Hadoop like latency and to improve machine learning capabilities. Later on it was taken by Databricks and added to the Apache project.

As Spark idea grew with Hadoop in  mind, the batch processing was the backbone of it. The core of Spark is Resilient Distributed Dataset (RDD), which is is a logical collection of data partitioned across machines.

Spark streaming was added after the project started to grow significantly.

This went very well with the Lambda Architecture. A concept that was spoken a lot but was not implemented very much. With Spark, you can gain both offline batch processing and real-time streaming in one platform.

So with spark, that is based on batch processing, streaming is in fact still batch processing and not real time streaming. The idea is that if you shorten your batches very much until they become micro-batches, you get a latency close to what real-time streaming suggest.

As latency may be a problem, the durability of batch  is superior over real-time  processing. Fault tolerance is easier to maintain and resource utilization is more efficient.

Apache Samza

Samza was developed in LinkedIn and was aimed to address the pitfalls of Storm.

There is some similarity between Samza and Storm, but some important differences.

In general, the architecture is similar: A distributed system that can scale horizontally; Streams and Tasks replace Spouts and Bolts.

Fault Tolerance Vs. Fast streaming: In this aspect, Samza took the Consistency approach over fast streaming. the messages that are sent are always persisted to disk. Therefore the issue of bottlenecks that may occur due to temporarily high throughput is solved. But the whole throughput may take a bit longer, and it is dependent on the number of hops in the way of the message.

A good comparison between Storm and Samza can be found here.

Samza comes with Yarn integration, so the Resource Management should be well addressed.

Choosing Streaming framework

When coming to decide which streaming framework to choose for a new project, there may be some guidelines.

First: define your expected SLA (Service Level Agreement).

A distributed system is not easy to manage and to monitor, and there are trade-offs that you need to decide about

Low Latency

If you need a very low latency system that can get you responses in milliseconds or few seconds, then storm is preferred. There is also a good pro for Storm, that it lets you manage your data flow in the system in a visual topology map.

High Throughput

Choose Spark Streaming if you can compromise the low latency to few seconds or even minutes, but to know that you can sustain high throughput and unexpected spikes. Another benefit is that you can use the same infrastructure for both batch processing and streaming.

 Conclusion

We saw that there are several good available Cluster base, Open Source, Streaming frameworks today to choose from.

Each one took a different technology path but they all address the same goal: providing an infrastructure that is fault tolerant and can run on clusters, on which you can deploy small processing units that process streaming data.

Storm is quite mature and proved very good performance results.

On the other hand, Spark Streaming has an emerging community and lets you develop durable data frameworks

Samza took a kind of chimeric approach between the two and did not gain lot of popularity yet.

Go Streaming!

 

How to Unit Test Kafka

Note: this post was revised in January 20th, 2017 to reflect changes in Kafka

1534695_589830264418901_705675027_o

Background:

When using Apache Kafka, one concern raised is how to run unit tests for the consumers without the need to start a whole Kafka cluster and Zookeeper.

In Kafka 0.9 two Mock classes was added: MockProducer and MockConsumer.

The problem with those mock classes is that in many cases they are just unusable. The reason is that we use frameworks for Kafka consumers that do not allow to implement a mock class instead of the real KafkaConsumer that is internally used.

But still, we want to be able to test our code somehow without the need to start Zookeeper and Kafka server always.

In this article I suggest a new approach that can be helpful in case you want to write a unit-test for your code that implements Kafka Consumer.

Test case

I take for example the case of Spark Streaming using Kafka Receiver.

The whole example can be found in this GitHub repository.

In this project, I set a code example for Spark Streaming using Kafka receiver to perform word count application. There is code in both Java and Scala.

This is the Word Count Java code to be tested (complete code can be found here):

package com.myspark;

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public final class JavaKafkaWordCount {
 private static final Pattern SPACE = Pattern.compile(" ");

 private JavaKafkaWordCount() {
 }

 public static void main(String[] args) {
 if (args.length < 4) {
 System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
 System.exit(1);
 }

 LoggerTools.setStreamingLogLevels();
 SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
 sparkConf.setMaster("local[2]"); //set master server
 sparkConf.set("com.couchbase.bucket.travel-sample", "");
 // Create the context with 2 seconds batch size
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

 int numThreads = Integer.parseInt(args[3]);
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
 String[] topics = args[2].split(",");
 for (String topic: topics) {
 topicMap.put(topic, numThreads);
 }

 JavaPairReceiverInputDStream<String, String> messages =
 KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

 JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
 @Override
 public String call(Tuple2<String, String> tuple2) {
 return tuple2._2();
 }
 });

 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public Iterable<String> call(String x) {
 return Lists.newArrayList(SPACE.split(x));
 }
 });

 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
 new PairFunction<String, String, Integer>() {
 @Override
 public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
 }
 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
 @Override
 public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 }
 });

 wordCounts.print();
 jssc.start();
 jssc.awaitTermination();
 }
}

And now –  the test

In order to test the class above, we perform the following steps:

  1. Start a local Zookeeper server
  2. Start a local Kafka server
  3. Create Kafka Producer
  4. Run the Spark Streaming program
  5. Send some messages through the Kafka Producer

The code below does this (complete code can be found here):

package com.myspark;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.kafka.clients.producer.*;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.junit.*;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class JavaKafkaWordCountTest {
    private static final String TOPIC = "topic-1";
    private static final String BROKERHOST = "127.0.0.1";
    private static final String BROKERPORT = "9092";
    private static final String ZKPORT = "2181";

    private String nodeId = "0";
    private String zkConnect = "localhost:" + ZKPORT;
    private KafkaServerStartable server;
    KafkaProducer<Integer, byte[]> producer;


    @Before
    public void setup() throws IOException {
        //zookeeper
        startZK();
        //start kafka
        startKafka();
        // setup producer
        setupProducer();
    }

    @After
    public void tearDown() throws Exception {
        server.shutdown();
        server.awaitShutdown();
    }

    private static void startZK() throws IOException {
        final File zkTmpDir = File.createTempFile("zookeeper", "test");
        zkTmpDir.delete();
        zkTmpDir.mkdir();

        new Thread() {
            @Override
            public void run() {
                ZooKeeperServerMain.main(new String [] {ZKPORT,  zkTmpDir.getAbsolutePath()});
            }
        }.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }

    private void startKafka() {
        Properties props = new Properties();
        props.put("broker.id", nodeId);
        props.put("port", BROKERPORT);
        props.put("zookeeper.connect", zkConnect);
        props.put("host.name", "127.0.0.1");
        KafkaConfig conf = new KafkaConfig(props);
        server = new KafkaServerStartable(conf);
        server.startup();
    }

    private void setupProducer() {
        Properties producerProps = new Properties();
        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
        producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(producerProps);
    }

    @Test
    public void testSparkWordCount() throws Exception {
        Thread t =  new Thread(() -> {
            String[] args = {"localhost", "grp-1", TOPIC, "2"};
            JavaKafkaWordCount.main(args);
            System.out.println("End Child Thread");
        });
        t.start();

        for (int i=0; i<1000; i++){
            producer.send(new ProducerRecord<>(TOPIC, 0, 1, ("There are some words here to count -" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"))));
            Thread.sleep(10);
        }
        System.out.println("End Test");
    }

}