Skip to main content

WindowFunction

Spark Gem

The WindowFunction lets you define a WindowSpec and apply window functions on a DataFrame.

Parameters

ParameterDescriptionRequired
DataFrameInput DataFrameTrue
Target columnOutput Column nameTrue
Source expressionWindow function expression to perform over the created WindowTrue
Order columnsColumns to order by in Window. Must be a numeric type column if a Range Frame is selectedRequired when Source expression has a Ranking/Analytical function OR when Range Frame is selected
Partition columnColumn to partition by in WindowFalse
Row frameRow based frame boundary to apply on WindowFalse
Range frameRange based frame boundary to apply on WindowFalse
info

When Order Columns are not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default.

info

When Order Columns are defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

Examples


Ranking Functions with Window

Examples of ranking functions are: row_number(), rank(), dense_rank() and ntile()

info

Only the default window frame (rowFrame, unboundedPreceding, currentRow) can be used with Ranking functions

Example usage of Window - Ranking

def rank_cust_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"order_number",
row_number().over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc())
)
)\
.withColumn(
"order_recency",
ntile(2).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_date").asc())
)
)

Analytical Functions with Window

Examples of analytical functions are: lead(), lag(), cume_dist(), etc.

info

Window frame for lead() and lag() can not be specified.

info

Only the default window frame (rangeFrame, unboundedPreceding, currentRow) can be used with cume_dist()

Example usage of Window - Analytical

def analyse_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"previous_order_date",
lag(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_id").asc())
)
)\
.withColumn(
"next_order_date",
lead(col("order_date")).over(
Window.partitionBy(col("customer_id")).orderBy(col("order_id").asc())
)
)

Aggregate Functions with Window

Examples of analytical functions are: min(), max(), avg(), etc. Example usage of Window - Aggregate

def agg_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"running_avg_spend",
avg(col("amount"))\
.over(Window.partitionBy(col("customer_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow))
)\
.withColumn("running_max_spend", max(col("amount"))\
.over(Window.partitionBy(col("customer_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)))