Delta Table
Reads and writes Delta tables that are managed by the execution environment's Metadata catalog (Metastore).
note
Set the property provider
to Delta
on the properties page.
Source
Source Parameters
Parameter | Description | Required |
---|---|---|
Database name | Name of the database | True |
Table name | Name of the table | True |
Provider | Must be set to Delta | True |
Filter Predicate | Where clause to filter the table | False |
Read Timestamp | Time travel to a specific timestamp | False |
Read Version | Time travel to a specific version of the table | False |
note
For time travel on Delta tables:
- Only
Read Timestamp
ORRead Version
can be selected, not both. - Timestamp should be between the first commit timestamp and the latest commit timestamp in the table.
- Version needs to be an integer with value between min and max version of table.
By default most recent version of each row is fetched if no time travel option is used.
info
To read more about Delta time travel and its use cases click here.
Source Example
Generated Code
Without filter predicate
- Python
- Scala
def Source(spark: SparkSession) -> DataFrame:
return spark.read.table(f"test_db.test_table")
object Source {
def apply(spark: SparkSession): DataFrame = {
spark.read.table("test_db.test_table")
}
}
With filter predicate
- Python
- Scala
def Source(spark: SparkSession) -> DataFrame:
return spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")
object Source {
def apply(spark: SparkSession): DataFrame =
spark.sql("SELECT * FROM test_db.test_table WHERE col > 10")
}
Target
Target Parameters
Parameter | Description | Required |
---|---|---|
Database name | Name of the database | True |
Table name | Name of the table | True |
Custom file path | Use custom file path to store underlying files. | False |
Provider | Must be set to Delta | True |
Write Mode | How to handle existing data. See this table for a list of available options. (Default is set to error .) | True |
Use insert into | Flag to use insertInto method to write instead of save | False |
Optimize write | If true, it optimizes Spark partition sizes based on the actual data. | False |
Overwrite table schema | If true, overwrites the schema of the Delta table. | False |
Merge schema | If true, then any columns that are present in the DataFrame but not in the target table are automatically added on to the end of the schema as part of a write transaction. | False |
Partition Columns | List of columns to partition the Delta table by | False |
Overwrite partition predicate | If specified, then it selectively overwrites only the data that satisfies the given where clause expression. | False |
note
Among these write modes overwrite
, append
, ignore
, and error
work the same way as with other native Spark-supported formats such as Parquet.
To read more about using merge
write mode click here.
To read more about using SCD2
merge write mode click here.
Target Example
Generated Code
- Python
- Scala
def Target(spark: SparkSession, in0: DataFrame):
in0.write\
.format("delta")\
.mode("overwrite")\
.saveAsTable("test_db.test_table")
object Target {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.write
.format("delta")
.mode("overwrite")
.saveAsTable("test_db.test_table")
}
}