Skip to main content

Aggregate

Spark Gem

Allows you to group the data and apply aggregation methods and pivot operation.

Parameters

ParameterDescriptionRequired
DataFrameInput DataFrameTrue
Target column (Aggregate Tab)Output column name of aggregated columnTrue
Expression (Aggregate Tab)Aggregate function expression
Eg: sum("amount"), count(*), avg("amount")
True
Target column (Group By Tab)Output column name of grouped columnRequired if Pivot Column is present
Expression (Group By Tab)Column expression to group on
Eg: col("id"), month(col("order_date"))
Required if a Target Column(Group By) is present
Pivot columnColumn name to pivotFalse
Unique valuesList of values in Pivot Column that will be translated to columns in the output DataFrameFalse
Propagate All Input ColumnsIf true, all columns from the DataFrame would be propagated to output DataFrame. By default all columns apart from ones specified in group by, pivot, aggregate expressions are propagated as first(col_name) in the output DataFrameFalse
info

Providing Unique values while performing pivot operation improves the performance of the operation since Spark does not have to first compute the list of distinct values of Pivot Column internally.

Examples

Aggregation without Grouping

Example usage of Aggregate - Aggregation without Grouping

def total_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.agg(count(lit(1)).alias("number_of_orders"))

Aggregation with Grouping

Example usage of Aggregate - Aggregation with Grouping

def orders_by_date(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(concat(month(col("order_date")), lit("/"), year(col("order_date")))
.alias("order_month(MM/YYYY)"))
return df1.agg(count(lit(1)).alias("number_of_orders"))

Pivot Columns

Example usage of Aggregate - Pivoting

def orders_by_date_N_status(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(concat(month(col("order_date")), lit("/"), year(col("order_date"))).alias("order_month(MM/YYYY)"))
df2 = df1.pivot("order_status", ["Approved", "Finished", "Pending", "Started"])
return df2.agg(count(lit(1)).alias("number_of_orders"))

Propagate all input Columns

This option in used to propagate all columns from input DataFrame to output DataFrame. By default first(col_name) is used as aggregate function for columns not specified in group by, pivot, aggregate expressions.

def Aggregate_1(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(col("customer_id"))

return df1.agg(
*[first(col("order_date")).alias("order_date")],
*[
first(col(x)).alias(x)
for x in in0.columns
if x not in ["order_date", "customer_id"]
]
)