WindowFunction
Spark Gem
The WindowFunction lets you define a WindowSpec and apply window functions on a DataFrame.
Parameters
Parameter | Description | Required |
---|---|---|
DataFrame | Input DataFrame | True |
Target column | Output Column name | True |
Source expression | Window function expression to perform over the created Window | True |
Order columns | Columns to order by in Window. Must be a numeric type column if a Range Frame is selected | Required when Source expression has a Ranking/Analytical function OR when Range Frame is selected |
Partition column | Column to partition by in Window | False |
Row frame | Row based frame boundary to apply on Window | False |
Range frame | Range based frame boundary to apply on Window | False |
When Order Columns
are not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing)
is used by default.
When Order Columns
are defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow)
is used by default.
Examples
Ranking Functions with Window
Examples of ranking functions are: row_number()
, rank()
, dense_rank()
and ntile()
Only the default window frame (rowFrame, unboundedPreceding, currentRow)
can be used with Ranking functions
- Python
- Scala
def rank_cust_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"order_number",
row_number().over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc())
)
)\
.withColumn(
"order_recency",
ntile(2).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc())
)
)
object rank_cust_orders {
def apply(spark: SparkSession, in1: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.{Window, WindowSpec}
in1
.withColumn(
"order_number",
row_number().over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc)
)
)
.withColumn(
"order_recency",
ntile(2).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc)
)
)
}
}
Analytical Functions with Window
Examples of analytical functions are: lead()
, lag()
, cume_dist()
, etc.
Window frame for lead()
and lag()
can not be specified.
Only the default window frame (rangeFrame, unboundedPreceding, currentRow)
can be used with cume_dist()
- Python
- Scala
def analyse_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"previous_order_date",
lag(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_id").asc())
)
)\
.withColumn(
"next_order_date",
lead(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_id").asc())
)
)
object analyse_orders {
def apply(spark: SparkSession, in1: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.{Window, WindowSpec}
in1
.withColumn(
"previous_order_date",
lag(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc)
)
)
.withColumn(
"next_order_date",
lead(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc)
)
)
}
}
Aggregate Functions with Window
Examples of analytical functions are: min()
, max()
, avg()
, etc.
- Python
- Scala
def agg_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"running_avg_spend",
avg(col("amount"))\
.over(Window.partitionBy(col("customer_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)\
.withColumn("running_max_spend", max(col("amount"))\
.over(Window.partitionBy(col("customer_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)))
object agg_orders {
def apply(spark: SparkSession, in1: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.{Window, WindowSpec}
in1
.withColumn("running_avg_spend",
avg(col("amount")).over(
Window
.partitionBy(col("customer_id"))
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
)
.withColumn("running_max_spend",
max(col("amount")).over(
Window
.partitionBy(col("customer_id"))
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
)
)
}
}