Skip to main content

Delta Table

Reads from and writes to Delta tables that your execution environment's Metadata catalog manages.

Prerequisites

Before you specify parameters and properties, select the Delta table type:

  1. Open the Source or Target gem configuration.
  2. On the Type & Format page, select Catalog Table.
  3. On the Properties page, set the provider property to delta.

Parameters

ParameterTabDescription
Use Unity CatalogLocationWhether to use a Unity catalog.
CatalogLocationIf you use a unity catalog, specify which catalog to use.
DatabaseLocationName of the database to connect to.
TableLocationName of the table to connect to.
Use file pathLocationWhether to use a custom file path to store underlying files in the Target gem.
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

Source

The Source gem reads data from Delta tables and allows you to optionally specify the following additional properties.

Source properties

PropertiesDescriptionDefault
DescriptionDescription of your dataset.None
ProviderProvider to use. You must set this to delta.delta
Filter PredicateWhere clause to filter the table by.None
Read timestampTime travel in milliseconds to a specific timestamp.
This value should be between the first commit timestamp and the latest commit timestamp in the table.
None
Read versionTime travel to a specific version of the table.None
note

You can only select Read Timestamp or Read Version, not both.

If you don't use a time travel option, the Source gem fetches the most recent version of each row by default.

To learn more about Delta time travel and its use cases, see Introducing Delta Time Travel for Large Scale Data Lakes.

Source example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

Without filter predicate

def Source(spark: SparkSession) -> DataFrame:
return spark.read.table(f"test_db.test_table")

With filter predicate

def Source(spark: SparkSession) -> DataFrame:
return spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")

Target

The Target gem writes data to Delta tables and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
ProviderProvider to use. You must set this to delta.delta
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.error
Use insert intoWhether to use the insertInto() method to write instead of the save() method.false
Overwrite table schemaWhether to overwrite the schema of the Delta table.false
Merge DataFrame schema into table schemaWhether to automatically add columns that are present in the DataFrame but not in the Target table to the end of the schema as part of a write transaction.false
Partition ColumnsList of columns to partition the Delta table table by.None
Overwrite partition predicateSelectively overwrite only the data that satisfies the given where clause expression.None
Optimize writeWhether to optimize Spark partition sizes based on the actual data.false

Supported write modes

Write modeDescription
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
errorIf the data already exists, throw an exception.
appendIf the data already exists, append the contents of the DataFrame.
ignoreIf the data already exists, do nothing with the contents of the DataFrame.
This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL.
mergeUse the Delta merge command to insert, delete and update data. For more information, see DeltaTableOperations.
scd2 mergeStore and manage the current and historical data over time. For more information, see DeltaTableOperations.
tip

These overwrite, append, ignore, and error write modes operate the same way as with other native Spark-supported formats such as Parquet.

Target example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def Target(spark: SparkSession, in0: DataFrame):
in0.write\
.format("delta")\
.mode("overwrite")\
.saveAsTable("test_db.test_table")