Kafka
The Kafka file type is used in Apache Kafka, which:
- Is an open-source distributed event streaming platform.
- Handles high volumes of data and delivers messages with low latency.
- Supports real-time analytics, stream processing, fault tolerance, scalability, data integration, and event-driven architectures.
Parameters
Parameter | Tab | Description |
---|---|---|
Bootstrap Server/Broker List | Location | Comma separated list of Kafka brokers. |
Security Protocol | Location | Security protocol for Kafka. (Default value is SASL_SSL .) |
SASL Mechanisms | Location | Default SASL Mechanism for SASL_SSL . (Default value is SCRAM-SHA-256 .) |
Credentials | Location | How to provide your credentials. You can select: Databricks Secrets , Username & Password , or Environment variables |
Kafka topic | Location | Comma separated list of Kafka topics. |
Source
The Source gem reads data from Kafka stream in batch mode and allows you to optionally specify the following additional properties. This means that Kafka only reads data incrementally from the last offset stored in the specified Metadata table. If the Metadata table is not present, then Kafka reads data from the earliest
offset.
Source properties
Property name | Description | Default |
---|---|---|
Group Id | Kafka consumer group ID. | None |
Session Timeout | Session timeout for Kafka. | 6000 |
Store offsets read per partition in Delta table | Whether to store offsets read per partition in Delta table. | false |
Metadata Table | Delta table to store offsets for each topic and partition. | None |
Kerberos service name for Kafka SASL | Name of your Kerberos service to use in Kafka. | None |
Example
Generated Code
To see the generated source code of your project, switch to the Code view in the project header.
- Python
def KafkaSource(spark: SparkSession) -> DataFrame:
from delta.tables import DeltaTable
import json
from pyspark.dbutils import DBUtils
if spark.catalog._jcatalog.tableExists(f"metadata.kafka_offsets"):
offset_dict = {}
for row in DeltaTable.forName(spark, f"metadata.kafka_offsets").toDF().collect():
if row["topic"] in offset_dict.keys():
offset_dict[row["topic"]].update({row["partition"] : row["max_offset"] + 1})
else:
offset_dict[row["topic"]] = {row["partition"] : row["max_offset"] + 1}
return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic",
"startingOffsets": json.dumps(offset_dict),
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))
else:
return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic"
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))
Target
The Target gem writes data to each row from the Dataframe
to a Kafka topic as JSON messages and allows you to optionally specify the following additional properties.
Target properties
Property name | Description | Default |
---|---|---|
Message Unique Key | Key to help determine which partition to write the data to. | None |
Kerberos service name for Kafka SASL | Name of your Kerberos service to use in Kafka. | None |
Example
Generated Code
To see the generated source code of your project, switch to the Code view in the project header.
- Python
def KafkaTarget(spark: SparkSession, in0: DataFrame):
df1 = in0.select(to_json(struct("*")).alias("value"))
df2 = df1.selectExpr("CAST(value AS STRING)")
df2.write\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"topic": "my_first_topic,my_second_topic",
}
)\
.save()
Example Pipeline
Source Pipeline Example
In this example, you read JSON messages from Kafka, parse them, remove any null messages, and persist the data to a Delta table.
To see the generated source code of your project, switch to the Code view in the project header.
Metadata Table
To avoid reprocessing messages on subsequent pipeline runs, update a table with the last processed offsets for each Kafka partition and topic. When you run the pipeline, the table only gets a batch of messages that arrived since the previously-processed offset.
In this example, you update metadata.kafka_offsets
, which has the following structure:
topic | partition | max_offset |
---|---|---|
my_first_topic | 0 | 10 |
my_first_topic | 1 | 5 |
my_second_topic | 0 | 10 |
my_second_topic | 1 | 5 |
Taking this approach provides you the with following benefits:
- Builds the pipeline interactively without committing any offsets.
- Production workflows only consume messages that arrived since the previously-processed offset.
- You can replay old messages by modifying the Metadata table.
For production workflows the phase for the Script
gem that updates the offsets should be greater than the phase of the Target gem. This ensures that offsets only update in the table after Prophecy safely persists the data to the Target.
Spark Code used for script component
To see the generated source code of your project, switch to the Code view in the project header.
- Python
def UpdateOffsets(spark: SparkSession, in0: DataFrame):
if not ("SColumnExpression" in locals()):
from delta.tables import DeltaTable
import pyspark.sql.functions as f
metadataTable = "metadata.kafka_offsets"
metaDataDf = in0.groupBy("partition", "topic").agg(f.max(f.col("`offset`").cast("int")).alias("max_offset"))
if not spark.catalog._jcatalog.tableExists(metadataTable):
metaDataDf.write.format("delta").mode("overwrite").saveAsTable(metadataTable)
else:
DeltaTable\
.forName(spark, metadataTable)\
.alias("target")\
.merge(
metaDataDf.alias("source"),
(
(col("source.`partition`") == col("target.`partition`"))
& (col("source.`topic`") == col("target.`topic`"))
)
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()