Low Code Spark

Catalog

Spark (and more largeley Hadoop) environments have built-in Metastore (Hive Metastore) that has databases and tables registered, for which it also keeps statistics that help with query planning. Databricks additionally has the delta catalog of delta tables.

.

Example

In this example, we’ll read a catalog table, take a few sample rows and write them out to another catalog table.

We produced TPC-DS dataset and have registered it as a table in the catalog. When you use a Source component and choose Catalog as the type, the next screen allows you to browse the metastore and see the available databases and tables. Here we choose TPCDS and will be able to see all the tables inside this database.

.

.

Once the table is chosen, the next screen will show the schema from the catalog and a few rows of sample data. We click create and that’s all.

.

.

Example Code

The code has a schema, and a single line to read the table.


object StoreSales {

  @UsesDataset(id = "60", version = 0)
  def apply(spark: SparkSession): Source = {
    import spark.implicits._

    val fabric = Config.fabricName

    val out = fabric match {
      case "dev" =>
        val schemaArg = StructType(
          Array(
            StructField("ss_sold_time_sk",       IntegerType, true),
            StructField("ss_item_sk",            IntegerType, true),
            StructField("ss_customer_sk",        IntegerType, true),
            StructField("ss_cdemo_sk",           IntegerType, true),
            StructField("ss_hdemo_sk",           IntegerType, true),
            StructField("ss_addr_sk",            IntegerType, true),
            StructField("ss_store_sk",           IntegerType, true),
            StructField("ss_promo_sk",           IntegerType, true),
            StructField("ss_ticket_number",      LongType,    true),
            StructField("ss_quantity",           IntegerType, true),
            StructField("ss_wholesale_cost",     DoubleType,  true),
            StructField("ss_list_price",         DoubleType,  true),
            StructField("ss_sales_price",        DoubleType,  true),
            StructField("ss_ext_discount_amt",   DoubleType,  true),
            StructField("ss_ext_sales_price",    DoubleType,  true),
            StructField("ss_ext_wholesale_cost", DoubleType,  true),
            StructField("ss_ext_list_price",     DoubleType,  true),
            StructField("ss_ext_tax",            DoubleType,  true),
            StructField("ss_coupon_amt",         DoubleType,  true),
            StructField("ss_net_paid",           DoubleType,  true),
            StructField("ss_net_paid_inc_tax",   DoubleType,  true),
            StructField("ss_net_profit",         DoubleType,  true),
            StructField("ss_sold_date_sk",       IntegerType, true)
          )
        )
        spark.read.table("tpcdsdb_test.store_sales")
      case _ => throw new Exception(s"The fabric '$fabric' is not handled")
    }

    out

  }

}
    

# Python code coming soon!