Delta Table
Reads from and writes to Delta tables that your execution environment's Metadata catalog manages.
Prerequisites
Before you specify parameters and properties, select the Delta table type:
- Open the Source or Target gem configuration.
- On the Type & Format page, select Catalog Table.
- On the Properties page, set the provider property to
delta
.
Parameters
Parameter | Tab | Description |
---|---|---|
Use Unity Catalog | Location | Whether to use a Unity catalog. |
Catalog | Location | If you use a unity catalog, specify which catalog to use. |
Database | Location | Name of the database to connect to. |
Table | Location | Name of the table to connect to. |
Use file path | Location | Whether to use a custom file path to store underlying files in the Target gem. |
Schema | Properties | Schema to apply on the loaded data. In the Source gem, you can define or edit the schema visually or in JSON code. In the Target gem, you can view the schema visually or as JSON code. |
Source
The Source gem reads data from Delta tables and allows you to optionally specify the following additional properties.
Source properties
Properties | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Provider | Provider to use. You must set this to delta . | delta |
Filter Predicate | Where clause to filter the table by. | None |
Read timestamp | Time travel in milliseconds to a specific timestamp. This value should be between the first commit timestamp and the latest commit timestamp in the table. | None |
Read version | Time travel to a specific version of the table. | None |
You can only select Read Timestamp
or Read Version
, not both.
If you don't use a time travel option, the Source gem fetches the most recent version of each row by default.
To learn more about Delta time travel and its use cases, see Introducing Delta Time Travel for Large Scale Data Lakes.
Source example
Compiled code
To see the compiled code of your project, switch to the Code view in the project header.
Without filter predicate
- Python
- Scala
def Source(spark: SparkSession) -> DataFrame:
return spark.read.table(f"test_db.test_table")
object Source {
def apply(spark: SparkSession): DataFrame = {
spark.read.table("test_db.test_table")
}
}
With filter predicate
- Python
- Scala
def Source(spark: SparkSession) -> DataFrame:
return spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")
object Source {
def apply(spark: SparkSession): DataFrame =
spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")
}
Target
The Target gem writes data to Delta tables and allows you to optionally specify the following additional properties.
Target properties
Property | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Provider | Provider to use. You must set this to delta . | delta |
Write Mode | How to handle existing data. For a list of the possible values, see Supported write modes. | error |
Use insert into | Whether to use the insertInto() method to write instead of the save() method. | false |
Overwrite table schema | Whether to overwrite the schema of the Delta table. | false |
Merge DataFrame schema into table schema | Whether to automatically add columns that are present in the DataFrame but not in the Target table to the end of the schema as part of a write transaction. | false |
Partition Columns | List of columns to partition the Delta table table by. | None |
Overwrite partition predicate | Selectively overwrite only the data that satisfies the given where clause expression. | None |
Optimize write | Whether to optimize Spark partition sizes based on the actual data. | false |
Supported write modes
Write mode | Description |
---|---|
overwrite | If the data already exists, overwrite the data with the contents of the DataFrame . |
error | If the data already exists, throw an exception. |
append | If the data already exists, append the contents of the DataFrame . |
ignore | If the data already exists, do nothing with the contents of the DataFrame . This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL. |
merge | Use the Delta merge command to insert, delete and update data. For more information, see DeltaTableOperations. |
scd2 merge | Store and manage the current and historical data over time. For more information, see DeltaTableOperations. |
These overwrite
, append
, ignore
, and error
write modes operate the same way as with other native Spark-supported formats such as Parquet.
Target example
Compiled code
To see the compiled code of your project, switch to the Code view in the project header.
- Python
- Scala
def Target(spark: SparkSession, in0: DataFrame):
in0.write\
.format("delta")\
.mode("overwrite")\
.saveAsTable("test_db.test_table")
object Target {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.write
.format("delta")
.mode("overwrite")
.saveAsTable("test_db.test_table")
}
}