Skip to main content

Delta

A Delta (Delta Lake) file type:

  • Is an optimized storage layer that allows you to store data and tables in the Databricks lakehouse.
  • Extends Parquet data files with a file-based transaction log for ACID transactions and scalable metadata handling.
  • Has a tight integration with structured streaming, which allows you to use a single copy of data for both batch and streaming operations and provides incremental processing at scale.

Parameters

ParameterTabDescription
LocationLocationFile path to read from or write to the Delta file.
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

Source

The Source gem reads data from Delta files and allows you to optionally specify the following additional properties.

Source properties

Property nameDescriptionDefault
DescriptionDescription of your dataset.None
Read timestampTime travel to a specific timestamp.
This value is between the first commit timestamp and the latest commit timestamp in the table.
None
Read versionTime travel to a specific version of the table.
This value is an interger between the minimum and maximum version of the table.
By default, the Source gem fetches the most recent version of each row if you don't use a time travel option.
None
note

You can only select Read Timestamp or Read Version, not both.

Example

Delta source example

Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

Without time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Timestamp-based time travel

def ReadDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("timestampAsOf", "2022-05-05")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Version-based time travel

def readDelta(spark: SparkSession) -> DataFrame:
return spark.read.format("delta").option("versionAsOf", "0")\
.load("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Target

The Target gem writes data to Delta files and allows you to optionally specify the following additional properties.

Target properties

Property nameDescriptionDefault
DescriptionDescription of your dataset.None
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.error
Overwrite table schemaWhether to overwrite the schema of the Delta table with the schema of the incoming DataFrame.false
Merge DataFrame schema into table schemaWhether to automatically add any columns present in the DataFrame but not in the target table to the end of the schema as part of a write transaction.false
Partition ColumnsList of columns to partition the Delta table by.None
Overwrite partition predicateSelectively overwrite the data that satisfies the given where clause expression.None
Optimize writeWhether to optimize the Spark partition sizes based on the actual data.false

Supported write modes

Write modeDescription
errorIf the data already exists, throw an exception.
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
appendIf the data already exists, append the contents of the DataFrame.
ignoreIf the data already exists, do nothing with the contents of the DataFrame.
This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL.
mergeUse the Delta merge command to insert, delete and update data. For more information, see Delta MERGE.
scd2 mergeStore and manage the current and historical data over time. For more information, see Delta MERGE.

Target Example

Delta Target Example

Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

def writeDelta(spark: SparkSession, in0: DataFrame):
return in0.write\
.format("delta")\
.option("optimizeWrite", True)\
.option("mergeSchema", True)\
.option("replaceWhere", "order_dt > '2022-01-01'")\
.option("overwriteSchema", True)\
.mode("overwrite")\
.partitionBy("order_dt")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/orders")

Delta MERGE

You can upsert data from a source DataFrame into a target Delta table by using the MERGE operation. Delta MERGE supports Insert, Update, and Delete operations and modifies records of the most common slowly changing dimension (SCD) cases in one of the following ways:

  • SCD1: Delta tables do not retain history.
  • SCD2: Delta tables retain history at the row level.
  • SCD3: Delta tables retain history at the column level.

SCD1

The following lists the properties in an SCD1 MERGE condition where Delta tables do not retain its history.

Properties

Property nameDescriptionDefault
Source AliasAlias to use for the source DataFrame.source
Target AliasAlias to use for existing target Delta table.taret
Merge conditionCondition to merge data from source DataFrame to target table.
Delta can perform an update, delete, or insert action.
None
When Matched Update ActionUpdate the row from your Source gem that exists in your Target gem based on your When Matched Update Condition property.update
When Matched Update ConditionAdditional condition for updating a row. If you specify a condition, it must evaluate to true for the Target gem to update the row.None
When Matched Update ExpressionsExpressions for setting the values of columns that the Target gem needs to update.None
When Matched Delete ActionDelete rows if your When Matched Delete Condition property and the optional additional condition evaluates to true.
Delete removes the data from the latest version of the Delta table but does not remove it from the physical storage until you explicitily vacuum the old versions. To learn more, see Remove files no longer referenced by a Delta table
ignore
When Matched Delete ConditionAdditional condition for deleting a row. If you specify a condition, it must evaluate to true for the Target gem to delete the row.False
When Not Matched ActionAction to perform if the row from your Source gem is not present in your Target gem based on your When Not Matched Condition property.insert
When Not Matched ConditionCondition for inserting a row. If you specify a condition, it must evaluate to true for the Target gem to insert a new row.None
When Not Matched ExpressionsExpressions for setting the values of columns that the Target gem needs to update.None
note
  1. You must set at least one action out of update, delete or insert.
  2. A merge operation fails if multiple rows of the source DataFrame matches and the merge attempts to update the same rows of the target Delta table. You can place deduplicate gems before your Target gem if you expect duplicate rows in your Source gem.
tip

When possible, provide predicates on the partition columns for a partitioned Delta table because predicates can significantly speed up the operations.

Example

Assume you have the following customers table:

Initial customer table

And, you want to make the following updates to the table:

Customer table updates

The following shows the output and configurations for an SCD1 merge:


Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

def writeDeltaMerge(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1"):
DeltaTable\
.forPath(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")\
.alias("target")\
.merge(in0.alias("source"), (col("source.customer_id") == col("target.customer_id")))\
.whenMatchedUpdateAll()\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd1")

SCD2

The following lists the properties in an SCD2 MERGE condition where Delta tables retain history at the row level.

Parameters

Property nameDescriptionDefault
Key ColumnsList of key columns to remain constant.None
Historic ColumnsList of columns to change over time and maintain its history.None
From time columnTime from which a particular row is valid.None
To time columnTime till which a particular row is not valid anymore.None
Name of the column used as min/old-value flagColumn to store the flag as true for the first entry of a particular key.None
Name of the column used as max/latest flagColumn to store the flag as true for the last entry of a particular key.None
Flag valuesFormat of the min and max flag.
Possible values are: true/false, or 0/1.
None

Example

Continuing from the SCD1 example, you can use the Delta log to capture the historical customer_zip_code at the row-level. The following shows the output and configurations for an SCD2 merge:


Generated Code

tip

To see the generated source code of your project, switch to the Code view in the project header.

def writeDeltaSCD2(spark: SparkSession, in0: DataFrame):
from delta.tables import DeltaTable, DeltaMergeBuilder

if DeltaTable.isDeltaTable(spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"):
existingTable = DeltaTable.forPath(
spark, "dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2"
)
updatesDF = in0.withColumn("minFlag", lit("true")).withColumn(
"maxFlag", lit("true")
)
existingDF = existingTable.toDF()
updateColumns = updatesDF.columns
stagedUpdatesDF = (
updatesDF.join(existingDF, ["customer_id"])
.where(
(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= updatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= updatesDF["customer_city"]
)
)
| (existingDF["customer_state"] != updatesDF["customer_state"])
)
)
)
.select(*[updatesDF[val] for val in updateColumns])
.withColumn("minFlag", lit("false"))
.withColumn("mergeKey", lit(None))
.union(updatesDF.withColumn("mergeKey", concat("customer_id")))
)
existingTable.alias("existingTable").merge(
stagedUpdatesDF.alias("staged_updates"),
concat(existingDF["customer_id"]) == stagedUpdatesDF["mergeKey"],
).whenMatchedUpdate(
condition=(
(existingDF["maxFlag"] == lit("true"))
& (
(
(
existingDF["customer_zip_code"]
!= stagedUpdatesDF["customer_zip_code"]
)
| (
existingDF["customer_city"]
!= stagedUpdatesDF["customer_city"]
)
)
| (
existingDF["customer_state"]
!= stagedUpdatesDF["customer_state"]
)
)
),
set={"maxFlag": "false", "end_date": "staged_updates.updated_dt"},
)\
.whenNotMatchedInsertAll()\
.execute()
else:
in0.write\
.format("delta")\
.mode("overwrite")\
.save("dbfs:/FileStore/data_engg/delta_demo/silver/customers_scd2")

SCD3

Continuing from the SCD2 example, you use the Delta log to capture the historical customer_zip_code at the column-level. The following shows the output and configurations for an SCD3 merge:


tip

To see the generated source code of your project, switch to the Code view in the project header.

info

To learn more about how Prophecy uses the Delta file type, see Prophecy with Delta — making data lakehouses easier.