Apache Spark with Maven template

I provide hereby a template for starting a Spark project with Maven. Code examples are in both Java and Scala. It provides a template for Spark Core, spark SQL and Spark Streaming.

This is a good starting point for any Spark project that you want to start.

The project template can be found in this GitHub repository.

project structure

Project structure is of a parent pom and a child module.

See picture below:

screenshot-from-2017-01-20-14-59-42

pom.xml explained

All dependencies except for the Spark libraries are defined in <dependencyManagement> tag in the parent pom.

The Spark libraries are defined in two profiles: spark-prod and spark-dev.

‘spark-prod’ contains all spark libraries in scope provided. This provides a fat jar that does not contain the spark libraries in it and hence can be deployed in a cluster using the command spark-submit.

‘spark-dev’ contains all spark libraries in scope compile. This allows debugging it in IntelliJ or eclipse in local mode.

In order to debug the project one needs to set the profile to -Pspark-dev or set it in the IDE profile. For example, in IntelliJ you set it like this:

screenshot-from-2017-01-20-15-14-49

Source templates

There is one Java source template for Spark core called HelloWorldJava.java.

There are three Scala source templates for Spark core, Spark SQL and Spark Streaming respectively:

  • HelloWorldScala.scala
  • HelloWorldSqlScala.scala
  • HelloWorldStreaming.scala

There is one test class Hello_Test.scala. This test uses the testing framework ScalaTest

Trouble Shooting

If you get the following error while trying to run any of the classes:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/SparkConf
 at com.tikalk.HelloWorldScala$.main(HelloWorldScala.scala:13)
 at com.tikalk.HelloWorldScala.main(HelloWorldScala.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

It is due to forgetting to set the profile to ‘spark-dev’.

Set the profile to ‘spark-dev’ as explained in the section: pom.xml explained.


The project template can be found in this GitHub repository.

Streaming your data

So, you want to process data that flows and you want to show information based on this data in real-time, right?

What are the available options you have?

Background

First, let’s cover the components that you will need:

Figure 1. Flow of Data

Data Streaming & Processing

As seen in figure 1, the Data Streaming & Processing is only one part in the architecture.

In this post I will focus only on this part, but will suggest what are the available tools that may fill in for the rest of the components:

Data Source: This is any source of data that may exist. Known tools to send the data into the Processing component are Apache Kafka or Amazon Kinesis.

Analytical Database: Any database that applies you need for aggregating the information for the UI. Some suggestions may be:  Cassandra, HBase, Elasticsearch, Couchbase, etc.

Front End: This will generally be a web server that serves a web application for your use.

Data Streaming and Processing engine

Now we are at the heart of this post.

What are the available Data Streaming and Processing engines that exist and what is the difference between them?

Requirement:

Those are the expected abilities of the Stream processor:

  1. Distributed system with ability to scale linearly.
  2. Fault Tolerance and Durability (Consistency)
  3. Fast!
  4. Resource Management and rebalance resources
  5. Mature enough

Note: For item #1 and #2, according to the CAP theorem, it may be difficult to gain both of them. Therefore, a system that will allow to choose between the two based on the expected SLA will be a good catch!

Two approaches

We can find today two different approaches that can be adapted

  1. Real-time streaming infrastructures. Data streams between processing units via network (Apache Storm, Apache Flink)
  2. Micro-batch infrastructure data is persisted between the hops (Spark Streaming)

There is also a third approach that combines some features from each approach. Apache Samza streams data in real-time, but keeps the intermediates between the hops persisted.

In this post, I will cover the following tools:

Apache Storm

Storm was the first real-time event processing framework that was release ans that addressed most items in our requirements list. It was developed by Nathan Marz and BackType team and then acquired and open sourced in Twitter.

It is also the most adopted system as can be seen here.

Let’s check if Storm addresses all our needs:

Distributed System: Yes. Storm is certainly a distributed system that can scale. I will not go into the architecture of Storm here.  I will only mention that Storm is built on small runnable components that are called Spouts and Bolts. Tuples are passed between the Spouts and Bolts via ZeroMQ/Netty. An application can grow in scale by setting more bolts or by setting more parallelism. For more information please refer to Storm home page.

Fault Tolerance Vs. Fast streaming: Here, there was a lot of work done to allow using a more durable application or a faster application, with a penalty of latency.

So on one side, Storm provided a very good configurable way to address your SLA.

On the other side, when working with Storm you may reach points of not durable enough and trying to be more durable will have a latency cost.

In General, it needs a lot of tuning to adjust the best configuration for your application.

Resource Management: Here again, Storm does not make it easy for you. As real-time data may have daily and seasonal peaks, you may need to adjust your resources to be sure you do not lag behind.

There is however a good initiative of Mesos for Storm that make your like much easier in this regard!

Spark Streaming

Spark was initiated by Matei Zaharia at UC Berkeley. Originally this project meant to address some issues in Hadoop like latency and to improve machine learning capabilities. Later on it was taken by Databricks and added to the Apache project.

As Spark idea grew with Hadoop in  mind, the batch processing was the backbone of it. The core of Spark is Resilient Distributed Dataset (RDD), which is is a logical collection of data partitioned across machines.

Spark streaming was added after the project started to grow significantly.

This went very well with the Lambda Architecture. A concept that was spoken a lot but was not implemented very much. With Spark, you can gain both offline batch processing and real-time streaming in one platform.

So with spark, that is based on batch processing, streaming is in fact still batch processing and not real time streaming. The idea is that if you shorten your batches very much until they become micro-batches, you get a latency close to what real-time streaming suggest.

As latency may be a problem, the durability of batch  is superior over real-time  processing. Fault tolerance is easier to maintain and resource utilization is more efficient.

Apache Samza

Samza was developed in LinkedIn and was aimed to address the pitfalls of Storm.

There is some similarity between Samza and Storm, but some important differences.

In general, the architecture is similar: A distributed system that can scale horizontally; Streams and Tasks replace Spouts and Bolts.

Fault Tolerance Vs. Fast streaming: In this aspect, Samza took the Consistency approach over fast streaming. the messages that are sent are always persisted to disk. Therefore the issue of bottlenecks that may occur due to temporarily high throughput is solved. But the whole throughput may take a bit longer, and it is dependent on the number of hops in the way of the message.

A good comparison between Storm and Samza can be found here.

Samza comes with Yarn integration, so the Resource Management should be well addressed.

Choosing Streaming framework

When coming to decide which streaming framework to choose for a new project, there may be some guidelines.

First: define your expected SLA (Service Level Agreement).

A distributed system is not easy to manage and to monitor, and there are trade-offs that you need to decide about

Low Latency

If you need a very low latency system that can get you responses in milliseconds or few seconds, then storm is preferred. There is also a good pro for Storm, that it lets you manage your data flow in the system in a visual topology map.

High Throughput

Choose Spark Streaming if you can compromise the low latency to few seconds or even minutes, but to know that you can sustain high throughput and unexpected spikes. Another benefit is that you can use the same infrastructure for both batch processing and streaming.

 Conclusion

We saw that there are several good available Cluster base, Open Source, Streaming frameworks today to choose from.

Each one took a different technology path but they all address the same goal: providing an infrastructure that is fault tolerant and can run on clusters, on which you can deploy small processing units that process streaming data.

Storm is quite mature and proved very good performance results.

On the other hand, Spark Streaming has an emerging community and lets you develop durable data frameworks

Samza took a kind of chimeric approach between the two and did not gain lot of popularity yet.

Go Streaming!

 

Map Reduce in Hadoop calculating Average value

Calculate Average value in WordCount MapReduce on Hadoop

The famous example of Word Count that can be found here here Shows a simple MapReduce that sets counter of words.

Here we set an example that instead of counting the words, will print out the average value of word count.

In order to do so, we changed the original program:

  • We separated between combiner and reducer (it was the same class used for both puposes)
  • We created a new reducer that accumulates the Average value accross the iterations and eventually prints the final value

The code can be downloaded from here.

Usage

Let’s take three files with words:

$ echo “Hadoop is an elephant” > file0
$ echo “Hadoop is as Yellow as can be” > file1
$ echo “Oh what a yellow fellow is Hadoop” > file2
$ hdfs dfs -put file* /user/ran/wordcount/input

Now we run the original WordCount program on this file. The result will look something like this:

  • a 1
  • an 1
  • as 2
  • be 1
  • can 1
  • elephant 1
  • fellow 1
  • hadoop 3
  • is 3
  • oh 1
  • what 1
  • yellow 2

Now let’s run our WordCountAverage program. The new result will look like this:

  • avegage 1.0
  • avegage 1.0
  • avegage 1.3333334
  • avegage 1.25
  • avegage 1.2
  • avegage 1.1666666
  • avegage 1.1428572
  • avegage 1.375
  • avegage 1.5555556
  • avegage 1.5
  • avegage 1.4545455
  • avegage 1.5

The last line is the true average result. Each iteration prints an intermidiate result, but only the last one is the final result.

The code

package com.ransilberman;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountAverage {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString().toLowerCase());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer
            extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static class Reduce
            extends Reducer<Text,IntWritable,Text,FloatWritable> {
        private FloatWritable result = new FloatWritable();
        Float average = 0f;
        Float count = 0f;
        int sum = 0;
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
        ) throws IOException, InterruptedException {

            Text sumText = new Text("avegage");
            for (IntWritable val : values) {
                sum += val.get();

            }
            count += 1;
            average = sum/count;
            result.set(average);
            context.write(sumText, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCountAverage.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Hadoop Ecosystem – How to find your way out there?

Hadoop and HDFS are the pillars of today’s big data systems. They are the cornerstone for almost every big data project.
But in order to use them properly and efficiently, it needs to know their ecosystem.
Today there are dozens of system that together build a huge ecosystem for Hadoop and HDFS.

The Hadoop Ecosystem Table lists many of the related projects and the list is growing.

Why are there so many projects and how can we find our way between all of them?

Why are there so many projects?

Let’s first answer the first question: Why are there so many?

Since Hadoop emerged as an open source solution for storing big data tables, many companies adopted it quickly as a big promise to solve their big data issues with their legacy RDBMS.

The promise was big, but the fulfilment no so much.

On the one hand, Hadoop provided what it promised: Hadoop can store a lot of data and that we can write map-reduce jobs to query the data.

However, we all soon realized that Hadoop has very big drawbacks compared to the way we worked with RDBMS

  1. Management tools and GUI tools are missing.
  2. SQL is not provided out of the box.
  3. It can take very long to get results for data queries.
  4. Installation, upgrades and housekeeping is cumbersome.
  5. Difficult to integrate with programs for DB operations

In order to resolve the issues raised above, many projects started to emerge. But as there is no one owner to Hadoop, there was no order in the way those projects were written and in many cases projects competed other projects as doing the same thing better.

Define our needs

Now to the second question: How can we find our way between all Hadoop related projects?

This is not an easy one to answer, but this is the goal of my post today, so let’s start.

Before you go to search for solutions, you first need to ask yourself those questions:

Q1: What are your requirements?

Define well what are your business needs. Hadoop and HDFS are mainly addressed for big data storage that can scale horizontally. Do not choose Hadoop and HDFS if this is not your requirement. There are many other NoSql systems that may be better for your needs if you have different requirements.

Q2: What is your expected SLA?

Hadoop and HDFS provides Big Data storage on cluster. this means that you will need to start facing issues of data freshness, eventually consistency, long respond times. Define your system SLA’s for those issues: response time, Consistency or eventually consistency, etc.

Q3: What are you willing to compromise in your requirements?

In regard with the previous question: check carefully what you are willing to compromise in your SLA. You will not be able to get everything! there is no way to get Big Data cluster that can scale with not limits, and that can provide fast queries consistently. There is no magic here!

Classification of projects and systems

This document describes the different projects classified by technical criteria to which they refer.

It does not make your life easier when you want to chose one.

I will try to provide a simpler classification for making some order. The systems are divided here by the following characteristics:

  1. Systems that are based on MapReduce
  2. System that replace MapReduce
  3. Complementary Databases
  4. Utilities

Systems that are based on MapReduce

Originally, The Hadoop provided two parts: The file system: HDFS and the processing mechanism MapReduce. Many projects where built on top of MapReduce to provide simplified API that eventually runs a MapReduce.

Some examples are Hive, Pig and Cascading

The benefit of those systems is that you do not need (almost) anything else then Vanilla Hadoop running on your cluster in order to run them.

The drawback is that those are tied to the MapReduce processing which, in some regards, is pretty slow.

Systems that replace MapReduce

The limitations of MapReduce were clear to many people:

  1. it is slow
  2. It is general purpose processing that may not be good for specific purposes
  3. Its API is complicated to program to.

Several projects started to appear then. All do not replace HDFS as the data layer, but rather suggest better processing mechanisms. Interestingly, all those projects do more or less the same thing, ending up competing each other.

Some examples are Impala, Tez and Spark.

Most of those projects provide better response time than MapReduce. But note that they do not provide a fast results as you expect when querying indexed column in relational database.

Complementary Databases

HDFS is great for storing very big files. But it is not a database. What Database then can we use on top of HDFS or beside it to gain the benefits of HDFS as the storage of the big files?

There are many options here.

There are solutions that use HDFS as their file system and add indexing and management on top of it: HBase and Cassandra (can run without HDFS) are good examples for that.

There are many other NoSQL databases that do not use HDFS at all. The list is big and can be found here and in other places.

A common architecture (Lambda architecture) suggests not to use data in Hadoop directly for reporting, but rather to use other database to hold partial view of the data in Hadoop.

Utilities

There are many projects that build tools or utilities over Hadoop. Some examples are: Hue, Mesos, Mahout and many many others.

In addition, there are large projects that enable the management of Hadoop cluster in a friendly manner. Those are Cloudera Manager and Ambari.

Summary

We showed here that there are many too many projects and systems that are related to Hadoop.

The reason for this big amount of projects is historical and practical: Hadoop grew from its beginning as an open source in the Open Source community, resulting with many involved projects. Because it was so successful, many people wanted to contribute and started new projects.

When you want to choose from this big list the systems that will best fit your project, it is a bit overwhelming in the beginning.

The first step is to define very clear your requirements and SLA’s. most important is to choose where you are OK to compromise SLA and where you cannot.

After that, you look at the different projects and find the ones that best fit your requirements. This may not be a simple task for you, but having good definition of requirement can help you here not to take the wrong approach.

 

Creating Scalding project with Maven

In this post I am going to show how to start a Scalding project with Maven.

Setup scala in your IDE

First step is to make sure your IDE (IntelliJ or Eclipse) supports Scala.

Create the scalding project

  • Create a new maven Scala project.
  • Use this pom.xml:
  • <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
    
     <groupId>com.mysite</groupId>
     <artifactId>scalding-test</artifactId>
     <packaging>jar</packaging>
     <version>1.1.1-SNAPSHOT</version>
    <properties>
     <scala.version>2.10.4</scala.version>
     <main>scalding.examples.WordCountJob</main>
     <targetJdk>1.6</targetJdk>
     <slf4j.log4j12.version>1.6.2</slf4j.log4j12.version>
     <junit.version>4.8.2</junit.version>
     <maven-jar-plugin-version>2.3.1</maven-jar-plugin-version>
     <cascading.version>2.5.2</cascading.version>
     <hadoop.version>2.3.0-cdh5.0.3</hadoop.version>
     </properties>
    <repositories>
     <repository>
     <id>conjars.org</id>
     <url>http://conjars.org/repo</url>
     </repository>
     <repository>
     <id>cloudera-releases</id>
     <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
     <releases>
     <enabled>true</enabled>
     </releases>
     <snapshots>
     <enabled>false</enabled>
     </snapshots>
     </repository>
    </repositories>
    
    <dependencies>
     <dependency>
     <groupId>org.scala-lang</groupId>
     <artifactId>scala-library</artifactId>
     <version>${scala.version}</version>
     </dependency>
     <dependency>
     <groupId>com.twitter</groupId>
     <artifactId>scalding-core_2.10</artifactId>
     <version>0.12.0</version>
     </dependency>
     
     <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-client</artifactId>
     <version>${hadoop.version}</version>
     </dependency>
     <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-common</artifactId>
     <version>${hadoop.version}</version>
     </dependency>
     <dependency>
     <groupId>cascading</groupId>
     <artifactId>cascading-hadoop</artifactId>
     <version>${cascading.version}</version>
     </dependency>
     <dependency>
     <groupId>cascading</groupId>
     <artifactId>cascading-core</artifactId>
     <version>${cascading.version}</version>
     </dependency>
     <dependency>
     <groupId>cascading</groupId>
     <artifactId>cascading-local</artifactId>
     <version>${cascading.version}</version>
     </dependency>
     <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
     <version>${slf4j.log4j12.version}</version>
     </dependency>
     <dependency>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-log4j12</artifactId>
     <version>${slf4j.log4j12.version}</version>
     </dependency>
    </dependencies>
    <build>
     <resources>
     <resource>
     <directory>src/main/resources</directory>
     <filtering>true</filtering>
     </resource>
     </resources>
     <plugins>
     <!--<plugin>-->
     <!--<artifactId>maven-compiler-plugin</artifactId>-->
     <!--<configuration>-->
     <!--<source>1.8</source>-->
     <!--<target>1.8</target>-->
     <!--<showDeprecation>false</showDeprecation>-->
     <!--<showWarnings>false</showWarnings>-->
     <!--</configuration>-->
     <!--</plugin>-->
     <!--<plugin>-->
     <!--<groupId>org.scala-tools</groupId>-->
     <!--<artifactId>maven-scala-plugin</artifactId>-->
     <!--<executions>-->
     <!--<execution>-->
     <!--<id>scala-compile-first</id>-->
     <!--<phase>process-resources</phase>-->
     <!--<goals>-->
     <!--<goal>add-source</goal>-->
     <!--<goal>compile</goal>-->
     <!--</goals>-->
     <!--<configuration>-->
     <!--<args>-->
     <!--<arg>-make:transitive</arg>-->
     <!--<arg>-dependencyfile</arg>-->
     <!--<arg>${project.build.directory}/.scala_dependencies</arg>-->
     <!--</args>-->
     <!--</configuration>-->
     <!--</execution>-->
     <!--<execution>-->
     <!--<id>scala-test-compile</id>-->
     <!--<phase>process-test-resources</phase>-->
     <!--<goals>-->
     <!--<goal>testCompile</goal>-->
     <!--</goals>-->
     <!--</execution>-->
     <!--</executions>-->
     <!--</plugin>-->
     <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-shade-plugin</artifactId>
     <version>2.2</version>
     <executions>
     <execution>
     <phase>package</phase>
     <goals>
     <goal>shade</goal>
     </goals>
     <configuration>
     <finalName>scalding-test-jar-with-libs-${project.version}</finalName>
     <shadedArtifactAttached>true</shadedArtifactAttached>
     <transformers>
     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
     <resource>META-INF/spring.handlers</resource>
     </transformer>
     <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
     <resource>META-INF/spring.schemas</resource>
     </transformer>
     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
     <mainClass>${main}</mainClass>
     </transformer>
     </transformers>
     <filters>
     <filter>
     <artifact>*:*</artifact>
     <excludes>
     <exclude>META-INF/*.SF</exclude>
     <exclude>META-INF/*.DSA</exclude>
     <exclude>META-INF/*.RSA</exclude>
     </excludes>
     </filter>
     </filters>
     </configuration>
     </execution>
     </executions>
     </plugin>
     <plugin>
     <groupId>org.apache.maven.plugins</groupId>
     <artifactId>maven-assembly-plugin</artifactId>
     <version>2.4</version>
     <executions>
     <execution>
     <id>assembly-zip</id>
     <goals>
     <goal>single</goal>
     </goals>
     <phase>verify</phase>
     <configuration>
     <finalName>scalding-test-${project.version}</finalName>
     <attach>false</attach>
     <appendAssemblyId>false</appendAssemblyId>
     <descriptor>src/main/assembly/assembly-descriptor.xml</descriptor>
     </configuration>
     </execution>
     </executions>
     </plugin>
     <plugin>
     <groupId>org.codehaus.mojo</groupId>
     <artifactId>build-helper-maven-plugin</artifactId>
     <version>1.5</version>
     <executions>
     <execution>
     <id>attach-artifacts</id>
     <phase>package</phase>
     <goals>
     <goal>attach-artifact</goal>
     </goals>
     <configuration>
     <artifacts>
     <artifact>
     <file>${project.build.directory}/scalding-test-jar-with-libs-${project.version}.jar</file>
     <type>jar</type>
     <classifier>dist</classifier>
     </artifact>
     <artifact>
     <file>${project.build.directory}/scalding-test-${project.version}.zip</file>
     <type>zip</type>
     <classifier>dist-zip</classifier>
     </artifact>
     </artifacts>
     </configuration>
     </execution>
     </executions>
     </plugin>
     </plugins>
    </build>
    </project>

Use this scala class:

  • package scalding.examples
    import com.twitter.scalding._
    import org.apache.hadoop
    
    object JobRunner {
     def main(args : Array[String]) {
     hadoop.util.ToolRunner.run(new hadoop.conf.Configuration, new Tool, args);
     }
    }
    
    class WordCountJob(args : Args) extends Job(args) {
     TypedPipe.from(TextLine(args("input")))
     .flatMap { line => line.split("""s+""") }
     .groupBy { word => word }
     .size
     .write(TypedTsv(args("output")))
    }

Run the Scalding project

  • Create a running profile in your IDE
  • Set the Main class to: scalding.examples.JobRunner
  • Add the following program arguments: scalding.examples.WordCountJob –hdfs  –input hdfs://<your-hadoop-server>:<hdfs-port(e.x., 8020)>/user/myuser/input_path –output hdfs://<your-hadoop-server>:<hdfs-port(e.x., 8020)>/user/myuser/output_path/someOutputFile.tsv

Run the program and check the file in HDFS: /user/myuser/output_path/someOutputFile.tsv

Summary

In this post I showed you how you can build your first scalding project using simple maven project with no need to use sbt tool.

How to Unit Test Kafka

Note: this post was revised in January 20th, 2017 to reflect changes in Kafka

1534695_589830264418901_705675027_o

Background:

When using Apache Kafka, one concern raised is how to run unit tests for the consumers without the need to start a whole Kafka cluster and Zookeeper.

In Kafka 0.9 two Mock classes was added: MockProducer and MockConsumer.

The problem with those mock classes is that in many cases they are just unusable. The reason is that we use frameworks for Kafka consumers that do not allow to implement a mock class instead of the real KafkaConsumer that is internally used.

But still, we want to be able to test our code somehow without the need to start Zookeeper and Kafka server always.

In this article I suggest a new approach that can be helpful in case you want to write a unit-test for your code that implements Kafka Consumer.

Test case

I take for example the case of Spark Streaming using Kafka Receiver.

The whole example can be found in this GitHub repository.

In this project, I set a code example for Spark Streaming using Kafka receiver to perform word count application. There is code in both Java and Scala.

This is the Word Count Java code to be tested (complete code can be found here):

package com.myspark;

import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

public final class JavaKafkaWordCount {
 private static final Pattern SPACE = Pattern.compile(" ");

 private JavaKafkaWordCount() {
 }

 public static void main(String[] args) {
 if (args.length < 4) {
 System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
 System.exit(1);
 }

 LoggerTools.setStreamingLogLevels();
 SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
 sparkConf.setMaster("local[2]"); //set master server
 sparkConf.set("com.couchbase.bucket.travel-sample", "");
 // Create the context with 2 seconds batch size
 JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

 int numThreads = Integer.parseInt(args[3]);
 Map<String, Integer> topicMap = new HashMap<String, Integer>();
 String[] topics = args[2].split(",");
 for (String topic: topics) {
 topicMap.put(topic, numThreads);
 }

 JavaPairReceiverInputDStream<String, String> messages =
 KafkaUtils.createStream(jssc, args[0], args[1], topicMap);

 JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
 @Override
 public String call(Tuple2<String, String> tuple2) {
 return tuple2._2();
 }
 });

 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public Iterable<String> call(String x) {
 return Lists.newArrayList(SPACE.split(x));
 }
 });

 JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
 new PairFunction<String, String, Integer>() {
 @Override
 public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
 }
 }).reduceByKey(new Function2<Integer, Integer, Integer>() {
 @Override
 public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 }
 });

 wordCounts.print();
 jssc.start();
 jssc.awaitTermination();
 }
}

And now –  the test

In order to test the class above, we perform the following steps:

  1. Start a local Zookeeper server
  2. Start a local Kafka server
  3. Create Kafka Producer
  4. Run the Spark Streaming program
  5. Send some messages through the Kafka Producer

The code below does this (complete code can be found here):

package com.myspark;

import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.kafka.clients.producer.*;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.junit.*;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Properties;


public class JavaKafkaWordCountTest {
    private static final String TOPIC = "topic-1";
    private static final String BROKERHOST = "127.0.0.1";
    private static final String BROKERPORT = "9092";
    private static final String ZKPORT = "2181";

    private String nodeId = "0";
    private String zkConnect = "localhost:" + ZKPORT;
    private KafkaServerStartable server;
    KafkaProducer<Integer, byte[]> producer;


    @Before
    public void setup() throws IOException {
        //zookeeper
        startZK();
        //start kafka
        startKafka();
        // setup producer
        setupProducer();
    }

    @After
    public void tearDown() throws Exception {
        server.shutdown();
        server.awaitShutdown();
    }

    private static void startZK() throws IOException {
        final File zkTmpDir = File.createTempFile("zookeeper", "test");
        zkTmpDir.delete();
        zkTmpDir.mkdir();

        new Thread() {
            @Override
            public void run() {
                ZooKeeperServerMain.main(new String [] {ZKPORT,  zkTmpDir.getAbsolutePath()});
            }
        }.start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
    }

    private void startKafka() {
        Properties props = new Properties();
        props.put("broker.id", nodeId);
        props.put("port", BROKERPORT);
        props.put("zookeeper.connect", zkConnect);
        props.put("host.name", "127.0.0.1");
        KafkaConfig conf = new KafkaConfig(props);
        server = new KafkaServerStartable(conf);
        server.startup();
    }

    private void setupProducer() {
        Properties producerProps = new Properties();
        producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
        producerProps.setProperty("key.serializer","org.apache.kafka.common.serialization.IntegerSerializer");
        producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        producer = new KafkaProducer<>(producerProps);
    }

    @Test
    public void testSparkWordCount() throws Exception {
        Thread t =  new Thread(() -> {
            String[] args = {"localhost", "grp-1", TOPIC, "2"};
            JavaKafkaWordCount.main(args);
            System.out.println("End Child Thread");
        });
        t.start();

        for (int i=0; i<1000; i++){
            producer.send(new ProducerRecord<>(TOPIC, 0, 1, ("There are some words here to count -" + Integer.toString(i)).getBytes(Charset.forName("UTF-8"))));
            Thread.sleep(10);
        }
        System.out.println("End Test");
    }

}

 

 

How to use Avro with Kafka and Storm

Background

In this post I’ll provide a practical example of how to integrate Avro with data flowing from Kafka to Storm

  • Kafka is a highly available high-throughput messaging system from LinkedIn.
  • Storm is adistributed event stream processing system from Twitter.
  • Avro is a data serialization protocol that is based on schema.

All the above are open source projects.

The challenge

There are two main challenges when trying to make Avro data to flow between Kafka and Storm:

  1. How to serialize/deserialize the data of Avro on Kafka
  2. How to manage the changes in protocol that will come when schema evolves.

Code example

Here I provide a simple code sample that demonstrate the use of Avro with Kafka and Storm.

The code can be downloaded from GitHub here: https://github.com/ransilberman/avro-kafka-storm.git

In order to download it, just type:

$ git clone https://github.com/ransilberman/avro-kafka-storm.git

Building from Source

$ mvn install -Dmaven.test.skip=true

Usage:

The project contains two modules: one is AvroMain which is the Kafka Producer test application. Second is AvroStorm which is the Storm Topology code

Basic Usage:

LPEvent.avsc is the Avro Schema. It exists in both projects.

MainTest runs three tests, each of which sends event to Kafka. You may need to change the variable ‘zkConnection’ to your own zookeeper servers list that hold Kafka service brokers.

AvroTopology is the class that runs the Storm Topology. You may need to change the host list zookeeper servers list that holds Kafka service brokers.

First start the topology, then run the tests and the events will pass via Kafka in Avro format

First Test Send event with GenericRecord class.

Second Test Build Avro object using Generated Resources. This test will compile only after the code is generated using the Maven plugin avro-maven-plugin.

Third Test Serializes the Avro objects into file and then reads the file.

upgrade Avro scema version:

AvroTopology2 is a second Storm topology that demonstrates how to use a schema that differs between producer and consumer. The difference in the code is in the line:$ DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema, _newSchema); Note that there are two parameters to the GenericDatumReader constructor.