Deduplicate removes rows with duplicate values of certain keys.



Let’s read the same customer data three times and union it as shown here. Notice that now we have 300 rows, with each customer row three times



Now if we want to deduplicate data, we need to give the key that identifies that two rows are identical. Here, since we’re reading customer data, we know that customer_id uniquely identifies a customer, so we enter that. .


Now, the next questions is that once we get a set of rows - how do we decide which one to keep? In our case these are identical so it doesn’t matter, but sometimes you only keep unique rows - so you can choose such options. Sending in sorted data can make choosing first or last more meaningful. .


Example Code

Here is the generated code. We’re using deduplicate that is a utility function written by Prophecy.

object RemoveCopies {

  def apply(spark: SparkSession, in: DataFrame): Deduplicate = {
    import spark.implicits._

    val out = in.deduplicate(
      typeToKeep = "any",
      groupByColumns = List(col("customer_id"))




# Python code coming soon!


Prophecy Internals

For the curious, here is our extension of Spark Dataframe that adds the deduplicate function

object ProphecyDataFrame {

  implicit class ExtendedDataFrame(var dataFrame: DataFrame) {
      * Method for Deduplicate operation when rows to be kept in each group of rows to be either first, Last or
      * unique-only. It does first groupBy on all passed groupByColumns and then depending on typeToKeep value it
      * does further operations.
      * For both first and last option, it adds new temporary row_number column which returns the row number within a
      * group of rows grouped by groupByColumns. Then to find first records it simply filters out all rows with
      * row_number as 1. To find last records within each group it also computes the count value for each group
      * and filters out all the records where row_number is same as group count
      * For unique-only case it adds new temporary count column which returns the count of rows within a window
      * partition. Then it filters the resultant dataframe with count value 1.
      * @param typeToKeep     option to find kind of rows. Possible values are first, last and unique-only
      * @param groupByColumns columns to be used to group input records.
      * @return DataFrame with first or last or unique-only records in each grouping of input records.
    def deduplicate(
      typeToKeep:     String,
      groupByColumns: List[Column]
    ): DataFrame = {
      val window = Window
        .partitionBy(groupByColumns: _*)
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
      val windowForCount = Window
        .partitionBy(groupByColumns: _*)
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

      typeToKeep match {
        case "any" 
          val columns =
            if (groupByColumns.isEmpty) this.dataFrame.columns.toList else groupByColumns
        case "first" 
          val dataFrameWithRowNumber =
            dataFrame.withColumn("row_number", row_number().over(window))
            .filter(col("row_number") === lit(1))

        case "last" 
          val dataFrameWithRowNumber = dataFrame
            .withColumn("row_number", row_number().over(window))
            .withColumn("count", count("*").over(windowForCount))
            .filter(col("row_number") === col("count"))

        case "unique-only" 
          val dataFrameWithCount =
            dataFrame.withColumn("count", count("*").over(windowForCount))
          dataFrameWithCount.filter(col("count") === lit(1)).drop("count")