Process Streaming Data with Spark Structure Streaming

Catherine Shen
6 min readApr 14, 2020

--

Get started with Apache Spark Structure Streaming for your realtime applications.

Why Spark

Spark offers a compute engine which easily scales big data applications. For ETL(extract, transform, load) pipelines, Spark provides a scalable approach which allows you to easily ingest data from a number of different sources, transform and clean data, apply computing statistics and load data to its final destinations.

Spark can easily connect to a lot of different data sources such as databases, file systems and message brokers. It can be used in both batch data processing and streaming applications.

picture from databricks

Dealing with steaming data

A data stream is an unbounded sequence of data arriving continuously. It can be the live logs or IoT device data in event bus such as Kafka or Kinesis. Spark Streaming enables scalable and fault-tolerant ETL operations that allow continuously aggregating data with low latency.

Spark Streaming VS Spark Structure Streaming

From the Spark 2.x release onwards, built on the Spark SQL library, Structured Streaming came into the picture. The model of Structured Streaming is based on Dataframe and Dataset APIs. So we can easily apply any SQL query using the DataFrame API on streaming datasets. In Structured Streaming, there is no batch concept. Each row of the data stream is processed and the result is updated into the unbounded result table.

In general, Structured Streaming is more close to real-time streaming but Spark Streaming focuses more on batch processing. The APIs are better and optimized in Structured Streaming while Spark Streaming is still based on the RDDs.

Get started with Spark Structure Streaming API

Streaming data sources in Spark offer the same DataFrames API for interacting with your data. The major difference is that in structured streaming, the DataFrame is unbounded. In other words, data arrives in an input stream and new records are appended to the input DataFrame.

Let’s learn Spark Streaming API with an example of processing streaming data from Kafka.

Step1: Initialize the Spark Session

Spark session is a unified entry point of a spark application(prior Spark 2.0, Spark Context was the entry point of any spark application). It can be created using a builder pattern. (you don’t need to create a spark session object when using spark-shell. It is already created for us with the variable spark.)

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("StructuredStreamingKafkaExample") \
.getOrCreate()

Step2: Define the schema of the data in the stream.

By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures.

from pyspark.sql.types import BooleanType, StructType, StringType, TimestampTypeschema = (StructType()
.add(“user”, StringType())
.add("isActiveUser", BooleanType())
.add(“timestamp”, TimestampType())
.add(“geocoding”, StructType()
.add(“city”, StringType())
.add(“country”, StringType())
.add(“stateProvince”, StringType())
)

Step3: Load the data. To read from a stream, use spark.readStream(), which returns a DataStreamReader class. Then, configure the stream by adding the following options:

  • The server endpoint
  • The topic to subscribe to: en
  • A location to log checkpoint metadata
  • For the forma, here we use:kafka

We can call .format() on a DataFrame which is streaming reads to specify the type of destination our data will read from. Here we use kafka to read the dataframe from one or more topics in Kafka. Required options are kafka.bootstrap.servers (list of Kafka server IP addresses) and topic (Kafka topic or topics to read from).

Use the load method, which loads the data stream from the Kafka source and returns it as an unbounded DataFrame

kafkaDF = (spark
.readStream
.option("kafka.bootstrap.servers", "your.server.name:port")
.option("subscribe", "en")
.format("kafka")
.load()
)

Step4: Transform the stream

We can now start to apply transformation logic to our data in real time. Parse out only the non-null country data and group by the result.

filterDF = (kafkaDF
.filter(col("country").isNotNull())
.select("timestamp", "user", "country", "city")
.groupBy(filterDF.country).count()

DataFrames have a built-in check for when we quickly need test our stream’s status. Running .isStreaming on a DataFrame will return a Boolean value, which will tell us whether or not the DataFrame is streaming:

filterDF.isStreaming

The output:

True

Step5. Save the data.

Now we have a streaming DataFrame, but it isn’t streaming to any destination. To stream to a destination, we need to call writeStream().

query = filterDF \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()

console: Prints the output to the console/stdout every time there is a trigger, which can be used for debugging. We can view the stream in real-time. After the first part of code is executed, the streaming computation will have started in the background.

Here we choose complete as our ouputMode, which means the entire updated Result Table will be written to the external storage.

Structured Streaming treats a data stream as a table that is being continuously appended. Every data point that is arriving on the stream is like a new row being appended to the Input Table.

The “Output” is defined as what gets written out to the external storage. The output can be defined in 3 different modes:

Complete Mode — The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

Append Mode — Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

Update Mode — Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

query.awaitTermination()

We wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active.

Conclusion

Spark Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The model of Structured Streaming is based on Dataframe and Dataset APIs. Structured Streaming treats a data stream as a table that is being continuously appended. You can run your streaming computation the similar way you use in a batch computation on static data.

We go through a simple pipeline which connects Spark Structure Streaming API with Kafka data stream for real-time data analysis. For more detailed information for Spark Structure Streaming API, checkout the documentation below.

--

--

Catherine Shen
Catherine Shen

Written by Catherine Shen

Software Engineer working on building big data & machine learning platform.

No responses yet