Spark integration with kafka (Streaming)

Apache Kafka — Spark structured streaming is one of the best combinations for building real time applications. In the previous article, we discussed about integration of spark(2.4.x) with kafka for batch processing of queries. In this article, we will discuss about the integration of spark structured streaming with kafka.

Kafka is a distributed publisher/subscriber messaging system that acts as a pipeline for transfer of real time data in fault-tolerant and parallel manner. Kafka acts as the central hub for real-time streams of data that are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming can be used to publish results into yet another Kafka topic or store in HDFS, databases or dashboards.
In this article, we will discuss ingestion of data from kafka using structured streaming. We will discuss interaction of spark with kafka and the spark APIs used for reading as well as writing of data.

Kafka Source (Read):-

Dataset<Row> kafka_df = spark.readStream().format(“kafka”).option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”).option(“subscribe”, “topic1,topic2”).load();

Each dataframe created from kafka ingestion has seven columns. These columns define attributes of each message ingested from kafka.

1. key
2. value
3. topic
4. partition
5. offset
6. timestamp
7. timestampType

‘Key’ and ‘value’ columns are used to extract the content of the message. Mostly value contains data that can be expanded into a dataframe.
Data can be present in multiple formats in kafka. Here we have provided methods for two formats.

1) JSON:-

a) If Schema is present:-

StructType json_schema=”schema of the dataframe”
Dataset<Row> input = kafka_df.withColumn(“data”, functions.from_json(kafka_df.col(“value”), schema)).select(“data.*”);

This method can be used if schema of the data is fixed and already defined. Mostly this is not the scenario in the real world as columns can be added or deleted leading to changes in schema.

b) If Schema is not present:-

Schema can be derived by reading the kafka topic first in batch mode. This schema can then be used for extracting streaming data from the topic. Steps have been given below:-

1. Read the kafka topic in batch mode:-

Dataset<Row> kafka_batch_df =“kafka”).option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”).option(“subscribe”, “topic1,topic2”).option(“startingOffsets”, “{\”topic1\”:{\”0\”:23,\”1\”:-2},\”topic2\”:{\”0\”:-2}}”).load();

2. As the ‘value’ column contains rows consisting of json strings, we can use the method discussed in the previous article to extract data and create a dataset:-

Dataset<Row> input_batch =“CAST(value AS STRING) as value”).map(Row::mkString, Encoders.STRING()))

3. Use the schema of input_batch to read the same topic using structured streaming:-

Dataset<Row> input = kafka_df.selectExpr(“CAST(value AS STRING) as value”).select(from_json(col(“value”), input_batch.schema()).as(“value”)).select(“value.*”);

Just reading a single record in batch mode can help in determining the schema. This method is really helpful in scenarios of changing schema as explicit declaration of schema is not a prerequisite.

2) AVRO:-

Ingestion of data in avro format needs schema to be present in the form of json string.

String json_schema=”schema of the dataframe”
Dataset<Row> store =$.MODULE$.from_avro(kafka_df.col(“value”),json_schema).as(“data”));
Dataset<Row> input =“data.*”);

Kafka Sink (Write):-

Data can be published into kafka in batches or using a streaming job. ‘Value’ column is required to be published and rest of the columns are optional.

1) JSON:-

output.selectExpr(“to_json(struct(*)) AS value”).writeStream().format(“kafka”).option(“kafka.bootstrap.servers”, “host:port”).option(“topic”, “topic_name”).option("checkpointLocation", "checkpointJson").start().awaitTermination();

2) AVRO:-$.MODULE$.to_avro(struct(“*”)).as(“value”)).writeStream().format(“kafka”).option(“kafka.bootstrap.servers”, “localhost:9093”).option(“topic”,”test_avro”).option("checkpointLocation", "checkpointAvro").start().awaitTermination();

The idea in both cases is to construct a column of type struct having all the columns as sub-columns and write this column (‘value’) into kafka so that later this column can be retrieved and used to recreate the dataframe.

| — value: struct (nullable = true)
| | — column1: string (nullable = true)
| | — column2: string (nullable = true)

The above methods simplify the process of integration of spark structured streaming and kafka. This combination is widely used for building real-time applications.

Code can be found on the link below.