Skip to main content

Iceberg

Reads from and writes to Iceberg tables, including Iceberg merge operations and time travel.

Required Settings

To use the Source gem with the Iceberg catalog table type, you must configure the following required settings at the environment, initialization, and runtime stages.

Environment Setting

Configure the following Spark JAR dependency package in your fabric environment:

https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.5.0/iceberg-spark-runtime-3.3_2.12-1.5.0.jar

This dependency is available on your compute platform where you installed Spark, such as your Databricks cluster, EMR, or Dataproc.

Initialization Settings

Configure the following Spark session property during the Spark session initialization.

  • Key: spark.sql.extensions
  • Value: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
tip

You can also do this during cluster bootstrap. For example, you can set --properties "spark:spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtentions" \ with a create clusters command.

Runtime Settings

The following properties allow you to configure multiple catalogs and your respective metastores for Iceberg tables and data management.

To configure Spark conf properties:

  1. Navigate to Pipeline Settings.

    a. Click ... at the top of the Prophecy canvas.
    b. Under Manage, click Pipeline Settings.

Open Pipeline Settings

  1. In your Spark tab, under Spark Configuration, add your Spark conf properties.

Spark Pipeline Settings

To configure Hadoop as a catalog, add the following Spark conf properties:

  • spark.sql.catalog.<catalog_name>=org.apache.iceberg.spark.SparkCatalog
  • spark.sql.catalog.<catalog_name>.type=hadoop
  • spark.sql.catalog.<catalog_name>.warehouse=gs://<bucket>/<folder_1>/<folder_1>/

To configure Hive as a catalog, add the following Spark conf properties:

  • spark.sql.catalog.<catalog_name>=org.apache.iceberg.spark.SparkCatalog
  • spark.sql.catalog.<catalog_name>.type=hive
  • spark.sql.catalog.<catalog_name>.warehouse=gs://<bucket>/<folder_1>/<folder_1>/
  • spark.sql.catalog.<catalog_name>.uri=thrift://10.91.64.30:9083

tip

You can set the default catalog by using spark.default.catalog=<catalog_name>.

Parameters

The Source and Target gems require the following parameters to read from and write to Iceberg:

ParameterTabDescription
Use CatalogLocationWhether to use a configured Hadoop or Hive catalog name.
Catalog NameLocationName of your catalog if you enable Use Catalog.
Schema Name (Database Name)LocationName of the database to connect to.
Table NameLocationName of the table to connect to.
Catalog TypeLocationFile path to write the Iceberg table in the Target gem.
Possible values are: hive, hadoop, or other
File locationLocationLocation of your file if you select hive or other for the Catalog Type.
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

Source

The Source gem reads data from Iceberg and allows you to optionally specify the following additional properties.

Source properties

PropertiesDescriptionDefault
DescriptionDescription of your dataset.None
Read timestampTime travel in milliseconds to a specific timestamp.
This value should be between the first commit timestamp and the latest commit timestamp in the table.
None
Read snapshotSnapshot ID to time travel to a specific table version.None
note

You can only select Read timestamp or Read snapshot, not both.

If you don't use a time travel option, the Source gem fetches the most recent version of each row by default.

To learn more about Iceberg time travel and its use cases, see Apache Iceberg TIMETRAVEL.

Example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def iceberg_read(spark: SparkSession) -> DataFrame:
return spark.read.format("iceberg").load("`hadoop_catalog_1`.`prophecy_doc_demo`.`employees_test`")

Target

The Target gem writes data to Iceberg and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.None
Merge schemaWhether to automatically add columns present in the DataFrame but not in the Target table to the end of the schema as part of a write transaction.false
Partition ColumnsList of columns to partition the Iceberg table by.
Provide this during a createOrReplace write mode to leverage theoverwritePartitions write mode in the future.
None

Supported write modes

Write ModeDescription
createOrReplaceCreate or replace the Iceberg table.
appendIf data already exists, append the contents of the DataFrame to the Iceberg table.
overwritePartitionsDynamically overwrite partitions in the Iceberg table.
overwriteIf data already exists, explicitly overwrite the partitions with the Overwrite Condition you specify.
tip

Among these write modes, overwrite and append work the same way for parquet file writes.

Target Example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def iceberg_write(spark: SparkSession, in0: DataFrame):
df1 = in0.writeTo("`hadoop_catalog_1`.`prophecy_doc_demo`.`employees_test`")
df2 = df1.using("iceberg")
df3 = df2.partitionedBy("Department")
df4 = df3.tableProperty("write.spark.accept-any-schema", "true")
df4.createOrReplace()