Developer

Deduplicate

Deduplicate removes rows with duplicate values of certain keys.

.

Example

Let’s read the same customer data three times and union it as shown here. Notice that now we have 300 rows, with each customer row three times

.

.

Now if we want to deduplicate data, we need to give the key that identifies that two rows are identical. Here, since we’re reading customer data, we know that customer_id uniquely identifies a customer, so we enter that. .

.

Now, the next questions is that once we get a set of rows - how do we decide which one to keep? In our case these are identical so it doesn’t matter, but sometimes you only keep unique rows - so you can choose such options. Sending in sorted data can make choosing first or last more meaningful. .

.

Example Code

Here is the generated code. We’re using deduplicate that is a utility function written by Prophecy.


object RemoveCopies {

  def apply(spark: SparkSession, in: DataFrame): Deduplicate = {
    import spark.implicits._

    val out = in.deduplicate(
      typeToKeep = "any",
      groupByColumns = List(col("customer_id"))
    )

    out

  }

}
    

# Python code coming soon!
    

.

Prophecy Internals

For the curious, here is our extension of Spark Dataframe that adds the deduplicate function

object ProphecyDataFrame {

  implicit class ExtendedDataFrame(var dataFrame: DataFrame) {
    /**
      * Method for Deduplicate operation when rows to be kept in each group of rows to be either first, Last or
      * unique-only. It does first groupBy on all passed groupByColumns and then depending on typeToKeep value it
      * does further operations.
      *
      * For both first and last option, it adds new temporary row_number column which returns the row number within a
      * group of rows grouped by groupByColumns. Then to find first records it simply filters out all rows with
      * row_number as 1. To find last records within each group it also computes the count value for each group
      * and filters out all the records where row_number is same as group count
      *
      * For unique-only case it adds new temporary count column which returns the count of rows within a window
      * partition. Then it filters the resultant dataframe with count value 1.
      *
      * @param typeToKeep     option to find kind of rows. Possible values are first, last and unique-only
      * @param groupByColumns columns to be used to group input records.
      * @return DataFrame with first or last or unique-only records in each grouping of input records.
      */
    def deduplicate(
      typeToKeep:     String,
      groupByColumns: List[Column]
    ): DataFrame = {
      val window = Window
        .partitionBy(groupByColumns: _*)
        .orderBy(lit(1))
        .rowsBetween(Window.unboundedPreceding, Window.currentRow)
      val windowForCount = Window
        .partitionBy(groupByColumns: _*)
        .orderBy(lit(1))
        .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

      typeToKeep match {
        case "any" 
          val columns =
            if (groupByColumns.isEmpty) this.dataFrame.columns.toList else groupByColumns
          dataFrame.dropDuplicates(columns.map(_.toString()))
        case "first" 
          val dataFrameWithRowNumber =
            dataFrame.withColumn("row_number", row_number().over(window))
          dataFrameWithRowNumber
            .filter(col("row_number") === lit(1))
            .drop("row_number")

        case "last" 
          val dataFrameWithRowNumber = dataFrame
            .withColumn("row_number", row_number().over(window))
            .withColumn("count", count("*").over(windowForCount))
          dataFrameWithRowNumber
            .filter(col("row_number") === col("count"))
            .drop("row_number")
            .drop("count")

        case "unique-only" 
          val dataFrameWithCount =
            dataFrame.withColumn("count", count("*").over(windowForCount))
          dataFrameWithCount.filter(col("count") === lit(1)).drop("count")
      }
    }
  }
}