Developer

SQLStatement

SQL Statement component allows you to write free form SQL queries.

.

Example Usage

Let’s consider the TPCDS_19 query that joins 6 tables

SELECT i_brand_id              brand_id, 
               i_brand                 brand, 
               i_manufact_id, 
               i_manufact, 
               Sum(ss_ext_sales_price) ext_price 
FROM   date_dim, 
       store_sales, 
       item, 
       customer, 
       customer_address, 
       store 
WHERE  d_date_sk = ss_sold_date_sk 
       AND ss_item_sk = i_item_sk 
       AND i_manager_id = 38 
       AND d_moy = 12 
       AND d_year = 1998 
       AND ss_customer_sk = c_customer_sk 
       AND c_current_addr_sk = ca_address_sk 
       AND Substr(ca_zip, 1, 5) <> Substr(s_zip, 1, 5) 
       AND ss_store_sk = s_store_sk 
GROUP  BY i_brand, 
          i_brand_id, 
          i_manufact_id, 
          i_manufact 
ORDER  BY ext_price DESC, 
          i_brand, 
          i_brand_id, 
          i_manufact_id, 
          i_manufact
LIMIT 100; 

We can write the entire query using visual components:

Alternately, you can just write the SQL query reading from reusable datasets:

Then we can open the SQL query, and on the left panel rename the incoming ports to match the table names. (This is a contrived case, where we are trying to match an existing query, otherwise we’d be writing from scratch and this will not mattter)

.

Example Code

The SQL code is as you’d expect it to be


object Query19 {

  def apply(
    spark:            SparkSession,
    store_sales:      DataFrame,
    item:             DataFrame,
    customer:         DataFrame,
    customer_address: DataFrame,
    date_dim:         DataFrame,
    store:            DataFrame
  ): SQLStatement = {
    import spark.implicits._

    store_sales.createOrReplaceTempView("store_sales")
    item.createOrReplaceTempView("item")
    customer.createOrReplaceTempView("customer")
    customer_address.createOrReplaceTempView("customer_address")
    date_dim.createOrReplaceTempView("date_dim")
    store.createOrReplaceTempView("store")

    val out0 = spark.sql("""
    SELECT  i_brand_id              brand_id, 
            i_brand                 brand, 
            i_manufact_id, 
            i_manufact, 
            Sum(ss_ext_sales_price) ext_price 
            FROM date_dim, 
                store_sales, 
                item, 
                customer, 
                customer_address, 
                store 
            WHERE  d_date_sk = ss_sold_date_sk 
                AND ss_item_sk = i_item_sk 
                AND i_manager_id = 38 
                AND d_moy = 12 
                AND d_year = 1998 
                AND ss_customer_sk = c_customer_sk 
                AND c_current_addr_sk = ca_address_sk 
                AND Substr(ca_zip, 1, 5) <> Substr(s_zip, 1, 5) 
                AND ss_store_sk = s_store_sk 
            GROUP  BY i_brand, 
                    i_brand_id, 
                    i_manufact_id, 
                    i_manufact 
            ORDER  BY ext_price DESC, 
                    i_brand, 
                    i_brand_id, 
                    i_manufact_id, 
                    i_manufact
            LIMIT 100
""")

    register_output_schema(
      "out0",
      StructType(
        Array(
          StructField("brand_id",      IntegerType, true),
          StructField("brand",         StringType,  true),
          StructField("i_manufact_id", IntegerType, true),
          StructField("i_manufact",    StringType,  true),
          StructField("ext_price",     DoubleType,  true)
        )
      )
    )

    out0

  }

}
    

# Coming soon!