Skip to main content

Kafka

The Kafka file type is used in Apache Kafka, which:

  • Is an open-source distributed event streaming platform.
  • Handles high volumes of data and delivers messages with low latency.
  • Supports real-time analytics, stream processing, fault tolerance, scalability, data integration, and event-driven architectures.

Parameters

ParameterTabDescription
Bootstrap Server/Broker ListLocationComma separated list of Kafka brokers.
Security ProtocolLocationSecurity protocol for Kafka. (Default value is SASL_SSL.)
SASL MechanismsLocationDefault SASL Mechanism for SASL_SSL. (Default value is SCRAM-SHA-256.)
CredentialsLocationHow to provide your credentials.
You can select: Databricks Secrets, Username & Password, or Environment variables
Kafka topicLocationComma separated list of Kafka topics.

Source

The Source gem reads data from Kafka stream in batch mode and allows you to optionally specify the following additional properties. This means that Kafka only reads data incrementally from the last offset stored in the specified Metadata table. If the Metadata table is not present, then Kafka reads data from the earliest offset.

Source properties

Property nameDescriptionDefault
Group IdKafka consumer group ID.None
Session TimeoutSession timeout for Kafka.6000
Store offsets read per partition in Delta tableWhether to store offsets read per partition in Delta table.false
Metadata TableDelta table to store offsets for each topic and partition.None
Kerberos service name for Kafka SASLName of your Kerberos service to use in Kafka.None

Example

Example usage of Filter

Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

def KafkaSource(spark: SparkSession) -> DataFrame:
from delta.tables import DeltaTable
import json
from pyspark.dbutils import DBUtils

if spark.catalog._jcatalog.tableExists(f"metadata.kafka_offsets"):
offset_dict = {}

for row in DeltaTable.forName(spark, f"metadata.kafka_offsets").toDF().collect():
if row["topic"] in offset_dict.keys():
offset_dict[row["topic"]].update({row["partition"] : row["max_offset"] + 1})
else:
offset_dict[row["topic"]] = {row["partition"] : row["max_offset"] + 1}

return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic",
"startingOffsets": json.dumps(offset_dict),
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))
else:
return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic"
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))

Target

The Target gem writes data to each row from the Dataframe to a Kafka topic as JSON messages and allows you to optionally specify the following additional properties.

Target properties

Property nameDescriptionDefault
Message Unique KeyKey to help determine which partition to write the data to.None
Kerberos service name for Kafka SASLName of your Kerberos service to use in Kafka.None

Example

Example usage of Filter

Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

def KafkaTarget(spark: SparkSession, in0: DataFrame):
df1 = in0.select(to_json(struct("*")).alias("value"))
df2 = df1.selectExpr("CAST(value AS STRING)")
df2.write\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"topic": "my_first_topic,my_second_topic",
}
)\
.save()

Example Pipeline

Source Pipeline Example

In this example, you read JSON messages from Kafka, parse them, remove any null messages, and persist the data to a Delta table.

Example usage of Filter

tip

To see the generated source code of your project, switch to the Code view in the project header.

Metadata Table

To avoid reprocessing messages on subsequent pipeline runs, update a table with the last processed offsets for each Kafka partition and topic. When you run the pipeline, the table only gets a batch of messages that arrived since the previously-processed offset.

In this example, you update metadata.kafka_offsets, which has the following structure:

topicpartitionmax_offset
my_first_topic010
my_first_topic15
my_second_topic010
my_second_topic15

Taking this approach provides you the with following benefits:

  1. Builds the pipeline interactively without committing any offsets.
  2. Production workflows only consume messages that arrived since the previously-processed offset.
  3. You can replay old messages by modifying the Metadata table.
note

For production workflows the phase for the Script gem that updates the offsets should be greater than the phase of the Target gem. This ensures that offsets only update in the table after Prophecy safely persists the data to the Target.

Spark Code used for script component

tip

To see the generated source code of your project, switch to the Code view in the project header.

def UpdateOffsets(spark: SparkSession, in0: DataFrame):

if not ("SColumnExpression" in locals()):
from delta.tables import DeltaTable
import pyspark.sql.functions as f
metadataTable = "metadata.kafka_offsets"
metaDataDf = in0.groupBy("partition", "topic").agg(f.max(f.col("`offset`").cast("int")).alias("max_offset"))

if not spark.catalog._jcatalog.tableExists(metadataTable):
metaDataDf.write.format("delta").mode("overwrite").saveAsTable(metadataTable)
else:
DeltaTable\
.forName(spark, metadataTable)\
.alias("target")\
.merge(
metaDataDf.alias("source"),
(
(col("source.`partition`") == col("target.`partition`"))
& (col("source.`topic`") == col("target.`topic`"))
)
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()