Developer

Catalog

Spark (and more largeley Hadoop) environments have built-in Metastore (Hive Metastore) that has databases and tables registered, for which it also keeps statistics that help with query planning. Databricks additionally has the delta catalog of delta tables.

.

Warning: Poor choice of partition column is the single biggest performance issue in big data ecosystem.

Example

In this example, we’ll read a catalog table, take a few sample rows and write them out to another catalog table.

We’ll use the Target component and choose Catalog as the type, the next screen allows you to browse the metastore and see the available databases and tables. Here we choose TPCDS and will be able to see all the tables inside this database. However, here we will write to a new table store_sales_sample

.

.

Next we see the schema that will be written and the primary options on the left. We pick overwrite mode since we want this to only hold one set of sample data. If this was being updated periodically with new data, append might be a better option.

.

.

Next we pick the provider, if you’re using Databricks, you might choose delta, otherwise hive is the standard option.

.

.

For file format, you should pick parquet or orc as the primary columnar formats that work well with big data. In the Spark ecosystem parquet is more popular, hoever if you’re going to read this data with Hive, or have large Hive deployment, you’ll often be already using orc and then choosing this might be ok.

.

.

Finally, we come to partition column and here you should choose very very carefully. The Spark system assumes you’ve partitioned the data sensibly when reading it. So if you partition it poorly when writing the data, all downstream workflows will become slow. For very large datasets (fact tables), partitioning by date is a common strategy. Other partition keys include keys that will be used in read queries downstream (for dimension table this is often good). The goal of partitioning is

  • Parallelism when reading and writing
    • Too few partitions and there will be too few reader tasks allocated when reading this data slowing down the subsequent reads
    • Too many partitions and many tasks will be ready files that have almost no data
  • Partition keys relevant to queries
    • If your read queries mostly want to read data from one region, then partitioning by region will ensure that most of the data is skipped when reading.

.

.

Example Code

The code is fairly staightforward and contains the schema, the table to write and any chosen options.


bject SampleStoreSales {

  @UsesDataset(id = "61", version = 0)
  def apply(spark: SparkSession, in: DataFrame): Target = {
    import spark.implicits._

    val fabric = Config.fabricName
    fabric match {
      case "dev" =>
        val schemaArg = StructType(
          Array(
            StructField("ss_sold_time_sk",       IntegerType, true),
            StructField("ss_item_sk",            IntegerType, true),
            StructField("ss_customer_sk",        IntegerType, true),
            StructField("ss_cdemo_sk",           IntegerType, true),
            StructField("ss_hdemo_sk",           IntegerType, true),
            StructField("ss_addr_sk",            IntegerType, true),
            StructField("ss_store_sk",           IntegerType, true),
            StructField("ss_promo_sk",           IntegerType, true),
            StructField("ss_ticket_number",      LongType,    true),
            StructField("ss_quantity",           IntegerType, true),
            StructField("ss_wholesale_cost",     DoubleType,  true),
            StructField("ss_list_price",         DoubleType,  true),
            StructField("ss_sales_price",        DoubleType,  true),
            StructField("ss_ext_discount_amt",   DoubleType,  true),
            StructField("ss_ext_sales_price",    DoubleType,  true),
            StructField("ss_ext_wholesale_cost", DoubleType,  true),
            StructField("ss_ext_list_price",     DoubleType,  true),
            StructField("ss_ext_tax",            DoubleType,  true),
            StructField("ss_coupon_amt",         DoubleType,  true),
            StructField("ss_net_paid",           DoubleType,  true),
            StructField("ss_net_paid_inc_tax",   DoubleType,  true),
            StructField("ss_net_profit",         DoubleType,  true),
            StructField("ss_sold_date_sk",       IntegerType, true)
          )
        )
        in.write
          .mode("overwrite")
          .saveAsTable("tpcdsdb_test.store_sales_sample")
      case _ => throw new Exception("Unknown Fabric")
    }

  }

}
    

# Python code coming soon!