Skip to main content

Delta

Reads and writes Delta tables, including Delta Merge operations and Time travel.

Source

Source Parameters

ParameterDescriptionRequired
LocationFile path for the Delta tableTrue
Read TimestampTime travel to a specific timestampFalse
Read VersionTime travel to a specific version of tableFalse
note

For time travel on Delta tables:

  1. Only Read Timestamp OR Read Version can be selected, not both.
  2. Timestamp should be between the first commit timestamp and the latest commit timestamp in the table.
  3. Version needs to be an integer. Its value has to be between min and max version of table.

By default most recent version of each row is fetched if no time travel option is used.

info

To read more about Delta time travel and its use cases click here.

Example

Delta source example

Generated Code

Without time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Timestamp-based time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("timestampAsOf", "2022-05-05")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Version-based time travel

def readDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("versionAsOf", "0")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")


Target

Target Parameters

ParameterDescriptionRequired
LocationFile path to write the Delta table toTrue
Write modeWrite mode for DataFrameTrue
Optimise writeIf true, it optimizes Spark partition sizes based on the actual dataFalse
Overwrite table schemaIf true, overwrites the schema of the Delta table with the schema of the incoming DataFrameFalse
Merge schemaIf true, then any columns that are present in the DataFrame but not in the target table are automatically added on to the end of the schema as part of a write transactionFalse
Partition ColumnsList of columns to partition the Delta table byFalse
Overwrite partition predicateIf specified, then it selectively overwrites only the data that satisfies the given where clause expression.False

Supported Write Modes

Write ModeDescription
overwriteIf data already exists, overwrite with the contents of the DataFrame
appendIf data already exists, append the contents of the DataFrame
ignoreIf data already exists, do nothing with the contents of the DataFrame. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.
errorIf data already exists, throw an exception.
mergeInsert, delete and update data using the Delta merge command.
SCD2 mergeIt is a Delta merge operation that stores and manages both current and historical data over time.

Among these write modes overwrite, append, ignore and error works the same way as in case of parquet file writes. Merge will be explained with several examples in the following sections.

Target Example

Delta Target Example

Generated Code

def writeDelta(spark: SparkSession, in0: DataFrame):
return in0.write\
.format("delta")\
.option("optimizeWrite", True)\
.option("mergeSchema", True)\
.option("replaceWhere", "order_dt > '2022-01-01'")\
.option("overwriteSchema", True)\
.mode("overwrite")\
.partitionBy("order_dt")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Delta MERGE

You can upsert data from a source DataFrame into a target Delta table by using the MERGE operation. Delta MERGE supports Inserts, Updates, and Deletes in a variety of use cases, and Delta is particularly suited to examine data with individual records that slowly change over time. Here we consider the most common types of slowly changing dimension (SCD) cases: SCD1, SCD2, and SCD3. Records are modified in one of the following ways: history is not retained (SCD1), history is retained at the row level (SCD2), or history is retained at the column level (SCD3).

SCD1

Let's take the simplest case to illustrate a MERGE condition.

Parameters

ParameterDescriptionRequired
Source aliasAlias to use for the source DataFrameTrue
Target aliasAlias to use for existing target Delta tableTrue
Merge ConditionCondition to merge data from source DataFrame to target table, which would be used to perform update, delete, or insert actions as specified.True
When Matched Update ActionUpdate the row from Source that already exists in Target (based on Merge Condition)False
When Matched Update ConditionOptional additional condition for updating row. If specified then it must evaluate to true for the row to be updated.False
When Matched Update ExpressionsOptional expressions for setting the values of columns that need to be updated.False
When Matched Delete ActionDelete rows if Merge Condition (and the optional additional condition) evaluates to trueFalse
When Matched Delete ConditionOptional additional condition for deleting row. If a condition is specified then it must evaluate to true for the row to be deleted.False
When Not Matched ActionThe action to perform if the row from Source is not present in Target (based on Merge Condition)False
When Not Matched ConditionOptional condition for inserting row. If a condition is specified then it must evaluate to true for the row to be updated.False
When Not Matched ExpressionsOptional expressions for setting the values of columns that need to be updated.False
note
  1. At least one action out of update, delete or insert needs to be set.
  2. Delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until the old versions are explicitly vacuumed. See vacuum for details.
  3. A merge operation can fail if multiple rows of the source DataFrame match and the merge attempts to update the same rows of the target Delta table. Deduplicate gem can be placed before target if duplicate rows at source are expected.

When possible, provide predicates on the partition columns for a partitioned Delta table as such predicates can significantly speed up the operations.

Example

Let's assume our initial customers table is as below:

Initial customer table

And we have the below updates coming into customers table:

Customer table updates

Our output and configurations for SCD1 merge will look like below:

Generated Code

def writeDeltaMerge(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1"):
DeltaTable\
.forPath(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")\
.alias("target")\
.merge(in0.alias("source"), (col("source.customer_id") == col("target.customer_id")))\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")

SCD2

Let's use the Delta log to capture the historical customer_zip_code at the row-level.

Parameters

ParameterDescriptionRequired
Key columnsList of key columns which would remain constantTrue
Historic columnsList of columns which would change over time for which history needs to be maintainedTrue
From time columnTime from which a particular row became validTrue
To time columnTime till which a particular row was validTrue
Min/old-value flagColumn placeholder to store the flag as true for the first entry of a particular keyTrue
Max/latest flagColumn placeholder to store the flag as true for the last entry of a particular keyTrue
Flag valuesOption to choose the min/max flag to be true/false or 0/1True

Example

Using the same customer tables as in our merge example above, output and configurations for SCD2 merge will look like below:

Generated Code

def writeDeltaSCD2(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"):
existingTable = DeltaTable.forPath(
spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"
)
updatesDF = in0.withColumn("minFlag", lit("true")).withColumn(
"maxFlag", lit("true")
)
existingDF = existingTable.toDF()
updateColumns = updatesDF.columns
stagedUpdatesDF = (
updatesDF.join(existingDF, ["customer_id"])
.where(
(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= updatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= updatesDF["customer_city"]
)
)
| (existingDF["customer_state"] != updatesDF["customer_state"])
)
)
)
.select(*[updatesDF[val] for val in updateColumns])
.withColumn("minFlag", lit("false"))
.withColumn("mergeKey", lit(None))
.union(updatesDF.withColumn("mergeKey", concat("customer_id")))
)
existingTable.alias("existingTable").merge(
stagedUpdatesDF.alias("staged_updates"),
concat(existingDF["customer_id"]) == stagedUpdatesDF["mergeKey"],
).whenMatchedUpdate(
condition=(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= stagedUpdatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= stagedUpdatesDF["customer_city"]
)
)
| (
existingDF["customer_state"]
!= stagedUpdatesDF["customer_state"]
)
)
),
set={"maxFlag": "false", "end_date": "staged_updates.updated_dt"},
)\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2")


SCD3

Using the same customer tables as in our merge example above, output and configurations for SCD3 merge will look like below. Let's track change for customer_zip_code by adding a column to show the previous value.


info

To check out our blogpost on making data lakehouse easier using Delta with Prophecy click here.