Skip to main content

Kafka

Apache Kafka is an open-source distributed event streaming platform. Supporting a number of streaming paradigms it's used by thousands of companies and organizations in scenarios including Data Ingestion, Analytics and more.

This source currently connects with Kafka Brokers in Batch mode.

Source

Reads data from Kafka stream in batch mode. Data is read only incrementally from the last offset stored in the specified Metadata table. If the Metadata table is not present, then data will be read from the earliest offset.

Source Parameters

ParameterDescriptionRequired
Broker ListComma separated list of Kafka brokersTrue
Group IdKafka consumer group IDTrue
Session TimeoutSession timeout for Kafka. (Default value set to 6000s)False
Security ProtocolSecurity protocol for Kafka (Default value set to SASL_SSL)True
SASL MechanismDefault SASL Mechanism for SASL_SSL (Default value set to SCRAM-SHA-256)True
Credential TypeCredential Type provider (Databricks Secrets or Username/Password)True
Credential ScopeScope to use for Databricks secretsTrue
Kafka TopicComma separated list of Kafka topicsTrue
Metadata TableTable name which would be used to store offsets for each topic, partitionTrue

Example

Example usage of Filter

Generated Code

def KafkaSource(spark: SparkSession) -> DataFrame:
from delta.tables import DeltaTable
import json
from pyspark.dbutils import DBUtils

if spark.catalog._jcatalog.tableExists(f"metadata.kafka_offsets"):
offset_dict = {}

for row in DeltaTable.forName(spark, f"metadata.kafka_offsets").toDF().collect():
if row["topic"] in offset_dict.keys():
offset_dict[row["topic"]].update({row["partition"] : row["max_offset"] + 1})
else:
offset_dict[row["topic"]] = {row["partition"] : row["max_offset"] + 1}

return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic",
"startingOffsets": json.dumps(offset_dict),
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))
else:
return (spark.read\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"kafka.session.timeout.ms": "6000",
"group.id": "group_id_1",
"subscribe": "my_first_topic,my_second_topic"
}
)\
.load()\
.withColumn("value", col("value").cast("string"))\
.withColumn("key", col("key").cast("string")))


Target

Writes each row from the Dataframe to Kafka topic(s) as JSON messages.

Target Parameters

ParameterDescriptionRequired
Broker ListComma separated list of Kafka brokersTrue
Security ProtocolSecurity protocol for Kafka (Default value set to SASL_SSL)True
SASL MechanismDefault SASL Mechanism for SASL_SSL (Default value set to SCRAM-SHA-256)True
Credential TypeCredential Type provider (Databricks Secrets or Username/Password)True
Credential ScopeScope to use for Databricks secretsTrue
Kafka TopicComma separated list of Kafka topicsTrue

Example

Example usage of Filter

Generated Code

def KafkaTarget(spark: SparkSession, in0: DataFrame):
df1 = in0.select(to_json(struct("*")).alias("value"))
df2 = df1.selectExpr("CAST(value AS STRING)")
df2.write\
.format("kafka")\
.options(
**{
"kafka.sasl.jaas.config": (
f"kafkashaded.org.apache.kafka.common.security.scram.ScramLoginModule"
+ f' required username="{DBUtils(spark).secrets.get(scope = "test", key = "username")}" password="{DBUtils(spark).secrets.get(scope = "test", key = "password")}";'
),
"kafka.sasl.mechanism": "SCRAM-SHA-256",
"kafka.security.protocol": "SASL_SSL",
"kafka.bootstrap.servers": "broker1.aws.com:9094,broker2.aws.com:9094",
"topic": "my_first_topic,my_second_topic",
}
)\
.save()

Example Pipelines

Source Pipeline Example

In this example we'll read JSON messages from Kafka, parse them, remove any null messagesand then finally persist it to a Delta table.

Example usage of Filter

Metadata Table

In order to avoid reprocessing messages on subsequent Pipeline runs, we're going to update a certain table with the last processed offsets for each Kafka partition and topic. The next time the Pipeline runs this table will be used to only get a batch of messages that have arrived since the previously-processed offset.

For this example, we're going to update metadata.kafka_offsets, which has the following structure:

topicpartitionmax_offset
my_first_topic010
my_first_topic15
my_second_topic010
my_second_topic15

Taking this approach gives us the following benefits:

  1. Build the Pipeline interactively without committing any offsets
  2. Production workflows will only consume messages that have arrived since the previously-processed offset
  3. We can replay old messages by modifying the Metadata table
note

For production workflows the Phase for the Script Gem that updates the offsets should be greater than the Phase of the Target Gem. This is to ensure that offsets are only updated in the table after data is safely persisted to the Target.

Spark Code used for script component

def UpdateOffsets(spark: SparkSession, in0: DataFrame):

if not ("SColumnExpression" in locals()):
from delta.tables import DeltaTable
import pyspark.sql.functions as f
metadataTable = "metadata.kafka_offsets"
metaDataDf = in0.groupBy("partition", "topic").agg(f.max(f.col("`offset`").cast("int")).alias("max_offset"))

if not spark.catalog._jcatalog.tableExists(metadataTable):
metaDataDf.write.format("delta").mode("overwrite").saveAsTable(metadataTable)
else:
DeltaTable\
.forName(spark, metadataTable)\
.alias("target")\
.merge(
metaDataDf.alias("source"),
(
(col("source.`partition`") == col("target.`partition`"))
& (col("source.`topic`") == col("target.`topic`"))
)
)\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()