Skip to main content

Aggregate

Allows you to group the data and apply aggregation methods and pivot operation.

Input and Output

The Aggregate gem accepts the following input and output

PortDescription
in0Input DataFrame that contains data to be aggregated.
outOutput DataFrame that includes the key column and the aggregated columns.

Parameters

Configure the Aggregate gem using the following parameters. Each section describes a different tab of the gem configuration.

Aggregate

ParameterDescriptionRequired
Target ColumnOutput column name of aggregated column.True
ExpressionAggregate function expression that generates the target column values.
Example: sum(amount), count(*), avg(amount)
True
Propagate All Input ColumnsIf true, all columns from the DataFrame would be propagated to output DataFrame. By default, all columns apart from ones specified in group by, pivot, aggregate expressions are propagated as first(col_name) in the output DataFrame.False

Group By

ParameterDescriptionRequired
Target ColumnOutput column name of the key column for grouping.Required if Pivot is present
ExpressionExpression that generates how to group the data.
In many cases, this is simply the column name.
Required for each target column.

Pivot

ParameterDescriptionRequired
Pivot ColumnName of the column whose unique values become the new column headers.False
Unique valuesList of values in the pivot column that will be translated to columns in the output DataFrame. Providing Unique values while performing pivot operation improves performance since Spark does not have to first compute the list of distinct values of the pivot column.False

Advanced

The Advanced tab lets you configure multiple aggregation options using a concise syntax. This is a lightweight alternative to writing full PySpark code.

Examples

These examples demonstrate common use cases of the Aggregate gem, showing how to configure aggregation operations with and without grouping, how to perform pivot operations, and how to propagate all input columns to the output. Each example includes the relevant gem parameter settings and the equivalent generated PySpark and Scala code.

Aggregation without grouping

This example counts the total number of rows in the dataset, producing a single aggregated value without any grouping.

TabParameterValue
AggregateTarget Columnnumber_of_orders
AggregateExpressioncount(*)

This gem configuration is compiled into the following PySpark code:

def total_orders(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.agg(count(lit(1)).alias("number_of_orders"))

Aggregation with grouping

This example counts orders per month by extracting and grouping on the month and year from the order_date.

TabParameterValue
AggregateTarget Columnnumber_of_orders
AggregateExpressioncount(*)
Group ByTarget Columnorder_month(MM/YYYY)
Group ByExpressionconcat(month(col("order_date")), lit("/"), year(col("order_date"))
def orders_by_date(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(concat(month(col("order_date")), lit("/"), year(col("order_date")))
.alias("order_month(MM/YYYY)"))
return df1.agg(count(lit(1)).alias("number_of_orders"))

Pivot the data

This example shows how to pivot order_status values into separate columns while grouping by month and aggregating the number of orders.

TabParameterValue
AggregateTarget Columnnumber_of_orders
AggregateExpressioncount(*)
Group ByTarget Columnorder_month(MM/YYYY)
Group ByExpressionconcat(month(col("order_date")), lit("/"), year(col("order_date"))
PivotPivot Columnorder_status
PivotUnique ValuesFinished, Approved, Pending, Started
def orders_by_date_N_status(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(concat(month(col("order_date")), lit("/"), year(col("order_date"))).alias("order_month(MM/YYYY)"))
df2 = df1.pivot("order_status", ["Approved", "Finished", "Pending", "Started"])
return df2.agg(count(lit(1)).alias("number_of_orders"))

Propagate all input columns

This option in used to propagate all columns from input DataFrame to output DataFrame. By default first(col_name) is used as aggregate function for columns not specified in group by, pivot, aggregate expressions.

def Aggregate_1(spark: SparkSession, in0: DataFrame) -> DataFrame:
df1 = in0.groupBy(col("customer_id"))

return df1.agg(
*[first(col("order_date")).alias("order_date")],
*[
first(col(x)).alias(x)
for x in in0.columns
if x not in ["order_date", "customer_id"]
]
)