Deduplicate
Spark Gem
Removes rows with duplicate values of specified columns.
Parameters
Parameter | Description | Required |
---|---|---|
Dataframe | Input dataframe | True |
Row to keep | - Any : Keeps any one row among duplicates. Uses underlying dropDuplicates construct- First : Keeps first occurrence of the duplicate row - Last : Keeps last occurrence of the duplicate row - Unique Only : Keeps rows that don't have duplicates - Distinct Rows : Keeps all distinct rows. This is equivalent to performing a df.distinct() operation Default is Any | True |
Deduplicate columns | Columns to consider while removing duplicate rows (not required for Distinct Rows ) | True |
Order columns | Columns to sort Dataframe on before de-duping in case of First and Last rows to keep | False |
Examples
Rows to keep - Any
- Python
- Scala
def dedup(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.dropDuplicates(["tran_id"])
object dedup {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.dropDuplicates(List("tran_id"))
}
}
Rows to keep - First
- Python
- Scala
def earliest_cust_order(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"row_number",
row_number()\
.over(Window\
.partitionBy("customer_id")\
.orderBy(col("order_dt").asc())
)\
.filter(col("row_number") == lit(1))\
.drop("row_number")
object earliest_cust_order {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"row_number",
row_number().over(
Window
.partitionBy("customer_id")
.orderBy(col("order_date").asc)
)
)
.filter(col("row_number") === lit(1))
.drop("row_number")
}
}
Rows to keep - Last
- Python
- Scala
def latest_cust_order(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"row_number",
row_number()\
.over(Window\
.partitionBy("customer_id")\
.orderBy(col("order_dt").asc())
)\
.withColumn(
"count",
count("*")\
.over(Window\
.partitionBy("customer_id")
)\
.filter(col("row_number") == col("count"))\
.drop("row_number")\
.drop("count")
object latest_cust_order {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"row_number",
row_number().over(
Window
.partitionBy("customer_id")
.orderBy(col("order_date").asc)
)
)
.withColumn(
"count",
count("*").over(
Window
.partitionBy("customer_id")
)
)
.filter(col("row_number") === col("count"))
.drop("row_number")
.drop("count")
}
}
Rows to keep - Unique Only
- Python
- Scala
def single_order_customers(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"count",
count("*")\
.over(Window\
.partitionBy("customer_id")
)\
.filter(col("count") == lit(1))\
.drop("count")
object single_order_customers {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"count",
count("*").over(
Window
.partitionBy("customer_id")
)
)
.filter(col("count") === lit(1))
.drop("count")
}
}
Rows to keep - Distinct Rows
- Python
- Scala
def single_order_customers(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.distinct()
object single_order_customers {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.distinct()
}
}