Low Code Spark

MultiJoin

MultiJoin allows you to join many tables at once with a single component instead of using multiple join components. This is used very frequently when joining fact tables with dimension tables.

.

Example Usage

Let’s take the example of a TPC-DS query 19 that joins 6 tables

SELECT i_brand_id              brand_id, 
               i_brand                 brand, 
               i_manufact_id, 
               i_manufact, 
               Sum(ss_ext_sales_price) ext_price 
FROM   date_dim, 
       store_sales, 
       item, 
       customer, 
       customer_address, 
       store 
WHERE  d_date_sk = ss_sold_date_sk 
       AND ss_item_sk = i_item_sk 
       AND i_manager_id = 38 
       AND d_moy = 12 
       AND d_year = 1998 
       AND ss_customer_sk = c_customer_sk 
       AND c_current_addr_sk = ca_address_sk 
       AND Substr(ca_zip, 1, 5) <> Substr(s_zip, 1, 5) 
       AND ss_store_sk = s_store_sk 
GROUP  BY i_brand, 
          i_brand_id, 
          i_manufact_id, 
          i_manufact 
ORDER  BY ext_price DESC, 
          i_brand, 
          i_brand_id, 
          i_manufact_id, 
          i_manufact
LIMIT 100; 

.

For the multi-join part of the query, we’re combining store_sales with multiple dimensions in a snowflake schema. Here is what the visual representation of the graph looks like

.

.

If you open the multijoin component, you see the input ports on the left. You can rename these ports to make queries easier to write. Starting with the first table, everytime the next table is added, we provide the condition for the join with existing tables. For example, here date_dim can only refer to store_sales, item to date_dim or store_sales and so on. Here is what the component looks like

.

.

In the expressions, selected columns can be added. We can add any expressions here - we notice that i_brand_id is renamed as brand_id, and i_brand as brand in the SQL query - so we add that renaming. Here are the selected expressions .

Example Code

The generated code is quite simple with multiple join statements followed by select statement. The names of the ports become the arguments to the function, so it is easy to understand the data coming into the function.


object CombineStore {

  def apply(
    spark:       SparkSession,
    store_sales: DataFrame,
    date_dim:    DataFrame,
    item:        DataFrame,
    customer:    DataFrame,
    store:       DataFrame
  ): MultiJoin = {
    import spark.implicits._

    val df_store_sales = store_sales.as("store_sales")
    val df_date_dim    = date_dim.as("date_dim")
    val df_item        = item.as("item")
    val df_customer    = customer.as("customer")
    val df_store       = store.as("store")

    val df_join0 = df_store_sales.join(df_date_dim, col("ss_sold_date_sk") === col("d_date_sk"), "inner")

    val df_join1 = df_join0.join(df_item, col("ss_item_sk") === col("i_item_sk"), "inner")

    val df_join2 = df_join1.join(df_customer, col("ss_customer_sk") === col("c_customer_sk"), "inner")

    val df_join3 = df_join2.join(df_store, col("ss_store_sk") === col("s_store_sk"), "inner")

    val out = df_join3.select(
      col("ca_zip"),
      col("s_zip"),
      col("ss_ext_sales_price"),
      col("i_manufact"),
      col("i_manufact_id"),
      col("i_brand").as("brand"),
      col("i_brand_id").as("brand_id"),
      col("i_manager_id"),
      col("d_moy"),
      col("d_year")
    )

    out

  }

}
    

# Python code coming soon!