
Checkpointing
On batch processing, we are used to having fault tolerance. This means, in case a node crashed, the job doesn't lose its state and the lost tasks are rescheduled on other workers. Intermediate results are written to persistent storage (which of course has to be fault tolerant as well which is the case for HDFS, GPFS or Cloud Object Storage). Now we want to achieve the same guarantees in streaming as well since it might be crucial that the data stream we are processing is not lost.
It is possible to set up an HDFS-based checkpoint directory to store Apache Spark-based streaming information. In this Scala example, data will be stored in HDFS under /data/spark/checkpoint. The following HDFS filesystem ls command shows that before starting, the directory does not exist:
[hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint
ls: `/data/spark/checkpoint': No such file or directory
For replicating the following example, Twitter API credentials are used in order to connect to the Twitter API and obtain a stream of tweets. The following link explains how such credentials are created within the Twitter UI: https://dev.twitter.com/oauth/overview/application-owner-access-tokens.
The following Scala code sample starts by importing Spark Streaming Context and Twitter-based functionality. It then defines an application object named stream1:
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming._
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.StreamingContext._
object stream1 {
Next, a method is defined called createContext, which will be used to create both the Spark and Streaming contexts. It will also checkpoint the stream to the HDFS-based directory using the streaming context checkpoint method, which takes a directory path as a parameter. The directory path the value (cpDir) that was passed to the createContext method:
def createContext( cpDir : String ) : StreamingContext = {
val appName = "Stream example 1"
val conf = new SparkConf()
conf.setAppName(appName)
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5) )
ssc.checkpoint( cpDir )
ssc
}
Now, the main method is defined as is the HDFS directory, as well as Twitter access authority and parameters. The Spark Streaming context ssc is either retrieved or created using the HDFS checkpoint directory via the StreamingContext method--checkpoint. If the directory doesn't exist, then the previous method called createContext is called, which will create the context and checkpoint. Obviously, we have truncated our own Twitter auth.keys in this example for security reasons:
def main(args: Array[String]) {
val hdfsDir = "/data/spark/checkpoint"
val consumerKey = "QQpxx"
val consumerSecret = "0HFzxx"
val accessToken = "323xx"
val accessTokenSecret = "IlQxx"
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val ssc = StreamingContext.getOrCreate(hdfsDir,
() => { createContext( hdfsDir ) })
val stream = TwitterUtils.createStream(ssc,None).window( Seconds(60) )
// do some processing
ssc.start()
ssc.awaitTermination()
} // end main
Having run this code, which has no actual processing, the HDFS checkpoint directory can be checked again. This time, it is apparent that the checkpoint directory has been created and the data has been stored:
[hadoop@hc2nn stream]$ hdfs dfs -ls /data/spark/checkpoint
Found 1 items
drwxr-xr-x - hadoop supergroup 0 2015-07-02 13:41 /data/spark/checkpoint/0fc3d94e-6f53-40fb-910d-1eef044b12e9
This example, taken from the Apache Spark website, shows you how checkpoint storage can be set up and used. How often is checkpointing carried out? The metadata is stored during each stream batch. The actual data is stored within a period, which is the maximum of the batch interval, or ten seconds. This might not be ideal for you, so you can reset the value using the following method:
DStream.checkpoint( newRequiredInterval )
Here, newRequiredInterval is the new checkpoint interval value that you require; generally, you should aim for a value that is five to ten times your batch interval. Checkpointing saves both the stream batch and metadata (data about the data).
In the next section, we will examine the streaming sources and provide some examples of each type.