Apache Spark 2:Data Processing and Real-Time Analytics
上QQ阅读APP看书,第一时间看更新

Kafka

Apache Kafka (http://kafka.apache.org/) is a top-level open source project in Apache. It is a big data publish/subscribe messaging system that is fast and highly scalable. It uses message brokers for data management and ZooKeeper for configuration so that data can be organized into consumer groups and topics.

Data in Kafka is split into partitions. In this example, we will demonstrate a receiverless Spark-based Kafka consumer so that we don't need to worry about configuring Spark data partitions when compared to our Kafka data. In order to demonstrate Kafka-based message production and consumption, we will use the Perl RSS script from the last section as a data source. The data passing into Kafka and to Spark will be Reuters RSS news data in the JSON format. As topic messages are created by message producers, they are placed in partitions in message order sequence. The messages in the partitions are retained for a configurable time period. Kafka then stores the offset value for each consumer, which is that consumer's position (in terms of message consumption) in that partition.

We are currently using Kafka 0.10.1.0. We have used Kafka message brokers on the Hortonworks HDP 2.6 Sandbox virtual machine. We then set the Kafka broker ID values for each Kafka broker server, giving them a broker.id number of 1 through 4. As Kafka uses ZooKeeper for cluster data configuration, we wanted to keep all the Kafka data in a top-level node called kafka in ZooKeeper. In order to do this, we set the Kafka ZooKeeper root value, called zookeeper.chroot, to /kafka. After making these changes, we restarted the Kafka servers for the changes to take effect.

With Kafka installed, we can check the scripts available to test. The following list shows Kafka-based scripts for message producers and consumers as well as scripts to manage topics and check consumer offsets. These scripts will be used in this section in order to demonstrate Kafka functionality:

[hadoop@hc2nn ~]$ ls /usr/bin/kafka*
/usr/bin/kafka-console-consumer /usr/bin/kafka-run-class
/usr/bin/kafka-console-producer /usr/bin/kafka-topics
/usr/bin/kafka-consumer-offset-checker

In order to run the installed Kafka servers, we need to have the broker server ID's (broker.id) values set; otherwise, an error will occur. Once Kafka is running, we will need to prepare a message producer script. The simple Bash script given next, called kafka.bash, defines a comma-separated broker list of hosts and ports. It also defines a topic called rss. It then calls the Perl script rss.perl to generate the RSS-based data. This data is then piped into the Kafka producer script called kafka-console-producer to be sent to Kafka.

[hadoop@hc2r1m1 stream]$ more kafka.bash
#!/bin/bash
#BROKER_LIST="hc2r1m1:9092,hc2r1m2:9092,hc2r1m3:9092,hc2r1m4:9092"
BROKER_LIST="hc2r1m1:9092"

TOPIC="rss"
./rss.perl | /usr/bin/kafka-console-producer --broker-list $BROKER_LIST --topic $TOPIC

Notice that we are only running against a single broker, but a link on how to use multiple brokers has been provided as well. Also, notice that we have not mentioned Kafka topics at this point. When a topic is created in Kafka, the number of partitions can be specified. In the following example, the kafka-topics script has been called with the create option. The number of partitions has been set to 5, and the data replication factor has been set to 3. The ZooKeeper server string has been defined as hc2r1m2-4 with a port number of 2181. Also, note that the top level ZooKeeper Kafka node has been defined as /kafka in the ZooKeeper string:

 /usr/bin/kafka-topics \
--create \
--zookeeper hc2r1m1:2181:2181/kafka \
--replication-factor 3 \
--partitions 5 \
--topic rss

We have also created a Bash script called kafka_list.bash for use during testing, which checks all the Kafka topics that have been created, and also the Kafka consumer offsets. It calls the Kafka-topics commands with a list option, and a ZooKeeper string to get a list of created topics. It then calls the Kafka script called Kafka-consumer-offset-checker with a ZooKeeper string--the topic name and a group name to get a list of consumer offset values. Using this script, we can check that our topics are created, and the topic data is being consumed correctly:

[hadoop@hc2r1m1 stream]$ cat kafka_list.bash
#!/bin/bash
ZOOKEEPER="hc2r1m1:2181:2181/kafka"
TOPIC="rss"
GROUP="group1"
echo ""
echo "================================"
echo " Kafka Topics "
echo "================================"
/usr/bin/kafka-topics --list --zookeeper $ZOOKEEPER
echo ""
echo "================================"
echo " Kafka Offsets "
echo "================================"
/usr/bin/kafka-consumer-offset-checker \
--group $GROUP \
--topic $TOPIC \
--zookeeper $ZOOKEEPER

Next, we need to create the Apache Spark Scala-based Kafka consumer code. As we said, we will create a receiver-less example, so that the Kafka data partitions match in both, Kafka and Spark. The example is called stream6. First, the classes are imported for Kafka, spark, context, and streaming. Then, the object class called stream6, and the main method are defined. The code looks like this:

import kafka.serializer.StringDecoder
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._

object stream6 {
def main(args: Array[String]) {

Next, the class parameters (broker's string, group ID, and topic) are checked and processed. If the class parameters are incorrect, then an error is printed, and execution stops, else the parameter variables are defined:

     if ( args.length < 3 ) {
System.err.println("Usage: stream6 <brokers> <groupid> <topics>\n")
System.err.println("<brokers> = host1:port1,host2:port2\n")
System.err.println("<groupid> = group1\n")
System.err.println("<topics> = topic1,topic2\n")
System.exit(1)
}
val brokers = args(0).trim
val groupid = args(1).trim
val topics = args(2).trim
println("brokers : " + brokers)
println("groupid : " + groupid)
println("topics : " + topics)

The Spark context is defined in terms of an application name. The Spark URL has again been left as the default. The streaming context has been created using the Spark context. We have left the stream batch interval at 10 seconds, which is the same as the last example. However, you can set it using a parameter of your choice:

    val appName = "Stream example 6"
val conf = new SparkConf()
conf.setAppName(appName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10) )

Next, the broker list and group ID are set up as parameters. These values are then used to create a Kafka-based Spark Stream called rawDstream:

val topicsSet = topics.split(",").toSet
val kafkaParams : Map[String, String] =
Map("metadata.broker.list" -> brokers,
"group.id" -> groupid )
val rawDstream = KafkaUtils.createDirectStream[
String,
String,
StringDecoder,
StringDecoder](ssc, kafkaParams, topicsSet)

We have again printed the stream event count for debugging purposes so that we know when the application is receiving and processing the data:

rawDstream.count().map(cnt => ">>>>>>>>>>>>>>> Received events : " + cnt ).print()

The HDFS location for the Kafka data has been defined as /data/spark/kafka/rss/. It has been mapped from the DStream into the variable lines. Using the foreachRDD method, a check on the data count is carried out on the lines variable before saving the data in HDFS using the saveAsTextFile method:

    val now: Long = System.currentTimeMillis
val hdfsdir = "hdfs://hc2nn:8020/data/spark/kafka/rss/"
val lines = rawDstream.map(record => record._2)
lines.foreachRDD(rdd => {
if (rdd.count() > 0) {
rdd.saveAsTextFile(hdfsdir+"file_"+now.toString())
}
})

Finally, the Scala script closes by starting the stream processing and setting the application class to run until terminated with awaitTermination:

     ssc.start()
ssc.awaitTermination()
} // end main
} // end stream6

With all of the scripts explained and the Kafka brokers running, it is time to examine the Kafka configuration, which, if you remember, is maintained by Apache ZooKeeper. (All of the code samples that have been described so far will be released with the book.) We will use the zookeeper-client tool and connect to the ZooKeeper server on the host called hc2r1m2 on the 2181 port. As you can see here, we have received a connected message from the client session:

[hadoop@hc2r1m1 stream]$ /usr/bin/zookeeper-client -server hc2r1m2:2181
[zk: hc2r1m2:2181(CONNECTED) 0]

If you remember, we specified the top-level ZooKeeper directory for Kafka to be /kafka. If we examine this now via a client session, we can see the Kafka ZooKeeper structure. We will be interested in brokers (the Kafka broker servers) and consumers (the previous Spark Scala code). The ZooKeeper ls command shows that the four Kafka servers are registered with ZooKeeper and are listed by their broker.id configuration values, one to four:

[zk: hc2r1m2:2181(CONNECTED) 2] ls /kafka
[consumers, config, controller, admin, brokers, controller_epoch]
[zk: hc2r1m2:2181(CONNECTED) 3] ls /kafka/brokers
[topics, ids]
[zk: hc2r1m2:2181(CONNECTED) 4] ls /kafka/brokers/ids
[3, 2, 1, 4]

We will create the topic that we want to use for this test using the Kafka script, kafka-topics, with a create flag. We do this manually because we want to demonstrate the definition of the data partitions while we do it. Note that we have set the partitions in the Kafka topic rss to 5, as shown in the following piece of code. Note also that the ZooKeeper connection string for the command has a comma-separated list of ZooKeeper servers, terminated by the top-level ZooKeeper Kafka directory called /kafka. This means that the command puts the new topic in the proper place:

[hadoop@hc2nn ~]$ /usr/bin/kafka-topics \
> --create \
> --zookeeper hc2r1m2:2181,hc2r1m3:2181,hc2r1m4:2181/kafka \
> --replication-factor 3 \
> --partitions 5 \
> --topic rss
Created topic "rss".

Now, when we use the ZooKeeper client to check the Kafka topic configuration, we can see the correct topic name and the expected number of the partitions:

[zk: hc2r1m2:2181(CONNECTED) 5] ls /kafka/brokers/topics
[rss]
[zk: hc2r1m2:2181(CONNECTED) 6] ls /kafka/brokers/topics/rss
[partitions]
[zk: hc2r1m2:2181(CONNECTED) 7] ls /kafka/brokers/topics/rss/partitions
[3, 2, 1, 0, 4]

This describes the configuration for the Kafka broker servers in ZooKeeper, but what about the data consumers? Well, the following list shows where the data will be held. Remember that, at this time, there is no consumer running, so it is not represented in ZooKeeper:

 [zk: hc2r1m2:2181(CONNECTED) 9]  ls /kafka/consumers
[]
[zk: hc2r1m2:2181(CONNECTED) 10] quit

In order to start this test, we will run our Kafka data producer and consumer scripts. We will also check the output of the Spark application class and check the Kafka partition offsets and HDFS to make sure that the data has arrived. This is quite complicated, so we will add a diagram here to explain the test architecture:

The Perl script called rss.perl will be used to provide a data source for a Kafka data producer, which will feed data to the Hortonworks Kafka broker servers. The data will be stored in ZooKeeper, in the structure that has just been examined, under the top-level node called /kafka. The Apache Spark Scala-based application will then act as a Kafka consumer and read the data that it will store under HDFS.

In order to try and explain the complexity here, we will examine our method of running the Apache Spark class. It will be started via the spark-submit command. Remember that all of these scripts will be released with this book, so you can examine them in your own time. We always use scripts for server test management so that we encapsulate complexity, and command execution is quickly repeatable. The script, run_stream.bash, is like many example scripts that have already been used in this chapter and in this book. It accepts a class name and class parameters, and runs the class via spark-submit:

[hadoop@hc2r1m1 stream]$ more run_stream.bash
#!/bin/bash
SPARK_HOME=/usr/local/spark
SPARK_BIN=$SPARK_HOME/bin
SPARK_SBIN=$SPARK_HOME/sbin
JAR_PATH=/home/hadoop/spark/stream/target/scala-2.10/streaming_2.10-1.0.jar
CLASS_VAL=$1
CLASS_PARAMS="${*:2}"
STREAM_JAR=/usr/local/spark/lib/spark-examples-1.3.1-hadoop2.3.0.jar
cd $SPARK_BIN
./spark-submit \
--class $CLASS_VAL \
--master spark://hc2nn.semtech-solutions.co.nz:7077 \
--executor-memory 100M \
--total-executor-cores 50 \
--jars $STREAM_JAR \
$JAR_PATH \
$CLASS_PARAMS

We then used a second script, which calls the run_kafka_example.bash script to execute the Kafka consumer code in the previous stream6 application class. Note that this script sets up the full application class name--the broker server list. It also sets up the topic name, called RSS, to use for data consumption. Finally, it defines a consumer group called group1. Remember that Kafka is a publish/subscribe message brokering system. There may be many producers and consumers organized by topic, group, and partition:

[hadoop@hc2r1m1 stream]$ more run_kafka_example.bash
#!/bin/bash
RUN_CLASS=nz.co.semtechsolutions.stream6
BROKERS="hc2r1m1:9092,hc2r1m2:9092,hc2r1m3:9092,hc2r1m4:9092"
GROUPID=group1
TOPICS=rss
# run the Apache Spark Kafka example
./run_stream.bash $RUN_CLASS \
$BROKERS \
$GROUPID \
$TOPICS

So, we will start the Kafka consumer by running the run_kafka_example.bash script, which, in turn, will run the previous stream6 Scala code using spark-submit. While monitoring Kafka data consumption using the script called kafka_list.bash, we were able to get the kafka-consumer-offset-checker script to list the Kafka-based topics, but for some reason, it will not check the correct path (under /kafka in ZooKeeper) when checking the offsets, as shown here:

[hadoop@hc2r1m1 stream]$ ./kafka_list.bash
================================
Kafka Topics
================================
__consumer_offsets
rss
================================
Kafka Offsets
================================
Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/group1/offsets/rss/4.

By starting the Kafka producer rss feed using the kafka.bash script, we can now start feeding the RSS-based data through Kafka into Spark, and then into HDFS. Periodically checking the spark-submit session output, it can be seen that events are passing through the Spark-based Kafka DStream. The following output comes from the stream count in the Scala code and shows that, at that point, 28 events were processed:

-------------------------------------------
Time: 1436834440000 ms
-------------------------------------------
>>>>>>>>>>>>>>> Received events : 28

By checking HDFS under the /data/spark/kafka/rss/ directory via the Hadoop filesystem ls command, it can be seen that there is now data stored on HDFS:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/kafka/rss
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907

By checking the contents of this directory, it can be seen that an HDFS part data file exists, which should contain the RSS-based data from Reuters:

[hadoop@hc2r1m1 stream]$ hdfs dfs -ls /data/spark/kafka/rss/file_1436833769907
Found 2 items
-rw-r--r-- 3 hadoop supergroup 0 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 8205 2015-07-14 12:40 /data/spark/kafka/rss/file_1436833769907/part-00001

Using the Hadoop filesystem cat Command, we can dump the contents of this HDFS-based file to check its contents. We have used the Linux head command to limit the data to save space. Clearly, this is RSS Reuters science-based information that the Perl script rss.perl has converted from XML to RSS JSON format.

[hadoop@hc2r1m1 stream]$ hdfs dfs -cat /data/spark/kafka/rss/file_1436833769907/part-00001 | head -2
{"category": "science", "title": "Bear necessities: low metabolism lets pandas survive on bamboo", "summary": "WASHINGTON (Reuters) - Giant pandas eat vegetables even though their bodies are better equipped to eat meat. So how do these black-and-white bears from the remote misty mountains of central China survive on a diet almost exclusively of a low-nutrient food like bamboo?"}
{"category": "science", "title": "PlanetiQ tests sensor for commercial weather satellites", "summary": "CAPE CANAVERAL (Reuters) - PlanetiQ a privately owned company is beginning a key test intended to pave the way for the first commercial weather satellites."}

This ends this Kafka example. It can be seen that Kafka brokers have been configured. It shows that an RSS data-based Kafka producer has fed data to the brokers. It has been proved, using the ZooKeeper client, that the Kafka architecture matching the brokers, topics, and partitions has been set up in ZooKeeper. Finally, it has been shown using the Apache Spark-based Scala code in the stream6 application, that the Kafka data has been consumed and saved to HDFS.