Deduplicate
Spark Gem
Removes rows with duplicate values of specified columns.
Parameters
Parameter | Description |
---|---|
DataFrame | Input DataFrame |
Row to keep |
|
Deduplicate columns | Columns to consider while removing duplicate rows (not required for Distinct Rows ) |
Order columns | Columns to sort DataFrame on before de-duping in case of First and Last rows to keep |
Examples
Rows to keep: Any
- Python
- Scala
def dedup(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.dropDuplicates(["tran_id"])
object dedup {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.dropDuplicates(List("tran_id"))
}
}
Rows to keep: First
- Python
- Scala
def earliest_cust_order(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"row_number",
row_number()\
.over(Window\
.partitionBy("customer_id")\
.orderBy(col("order_dt").asc())
)\
.filter(col("row_number") == lit(1))\
.drop("row_number")
object earliest_cust_order {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"row_number",
row_number().over(
Window
.partitionBy("customer_id")
.orderBy(col("order_date").asc)
)
)
.filter(col("row_number") === lit(1))
.drop("row_number")
}
}
Rows to keep: Last
- Python
- Scala
def latest_cust_order(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"row_number",
row_number()\
.over(Window\
.partitionBy("customer_id")\
.orderBy(col("order_dt").asc())
)\
.withColumn(
"count",
count("*")\
.over(Window\
.partitionBy("customer_id")
)\
.filter(col("row_number") == col("count"))\
.drop("row_number")\
.drop("count")
object latest_cust_order {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"row_number",
row_number().over(
Window
.partitionBy("customer_id")
.orderBy(col("order_date").asc)
)
)
.withColumn(
"count",
count("*").over(
Window
.partitionBy("customer_id")
)
)
.filter(col("row_number") === col("count"))
.drop("row_number")
.drop("count")
}
}
Rows to keep: Unique Only
- Python
- Scala
def single_order_customers(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0\
.withColumn(
"count",
count("*")\
.over(Window\
.partitionBy("customer_id")
)\
.filter(col("count") == lit(1))\
.drop("count")
object single_order_customers {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
import org.apache.spark.sql.expressions.Window
in.withColumn(
"count",
count("*").over(
Window
.partitionBy("customer_id")
)
)
.filter(col("count") === lit(1))
.drop("count")
}
}
Rows to keep: Distinct Rows
- Python
- Scala
def single_order_customers(spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.distinct()
object single_order_customers {
def apply(spark: SparkSession, in: DataFrame): DataFrame = {
in.distinct()
}
}