Skip to main content

CompareColumns

Spark Gem

The CompareColumns Gem lets you compare columns between two DataFrames based on the key id columns defined.

Parameters

ParameterDescriptionRequired
DataFrame 1First input DataFrameTrue
DataFrame 2Second input DataFrameTrue
ID columns to retain(Select Id Columns)List of columns that are used joining two dataframesTrue
Output Column Name(Select Output Columns)In the output, alias name of the column name that was compared among dataframesTrue
Match Count Column Name(Select Output Columns)In the output, alias name of the column that shows the count of rows that matched between two dataframesTrue
Mismatch Count Column Name(Select Output Columns)In the output, alias name of the column that shows the count of rows that mismatched between two dataframesTrue
Mismatch Example Left Column Name(Select Output Columns)In the output, alias name of the column displaying an incorrect left column valueTrue
Mismatch Example Right Column Name(Select Output Columns)In the output, alias name of the column displaying an incorrect right column valueTrue
Mismatch Example ID Column Prefix(Select Output Columns)In the output, alias name of the ID column value that mismatched between two dataframesTrue

Example - Compare columns of two DataFrames

Generated code

def CompareColumns_1(spark: SparkSession, in0: DataFrame, in1: DataFrame) -> DataFrame:
joined = exploded1\
.join(
exploded2,
reduce(
lambda a, c: a & c,
[col(f"exploded1.column_name") == col(f"exploded2.column_name"), col(f"exploded1.customer_id") == col(f"exploded2.customer_id")],
lit(True)
),
"full_outer"
)\
.select(
coalesce(col(f"exploded1.column_name"), col(f"exploded2.column_name")).alias("column_name"),
coalesce(col(f"exploded1.customer_id"), col(f"exploded2.customer_id")).alias("customer_id"),
col(
f"exploded1.##value##"
)\
.alias(
"##left_value##"
),
col(
f"exploded2.##value##"
)\
.alias(
"##right_value##"
)
)\
.withColumn(
"match_count",
when(
coalesce(
(
col("##left_value##")
== col(
"##right_value##"
)
),
(
col(
"##left_value##"
)\
.isNull()
& col(
"##right_value##"
)\
.isNull()
)
),
lit(1)
)\
.otherwise(lit(0))
)\
.withColumn(
"mismatch_count",
when(
coalesce(
(
col("##left_value##")
!= col(
"##right_value##"
)
),
~ (
col(
"##left_value##"
)\
.isNull()
& col(
"##right_value##"
)\
.isNull()
)
),
lit(1)
)\
.otherwise(lit(0))
)
mismatchExamples = joined\
.select(
col("column_name"),
col("customer_id"),
lit(0).alias("match_count"),
lit(0).alias("mismatch_count"),
col(
"##left_value##"
)\
.alias("mismatch_example_left"),
col(
"##right_value##"
)\
.alias("mismatch_example_right")
)\
.dropDuplicates(["column_name"])

return joined\
.union(mismatchExamples)\
.groupBy("column_name")\
.agg(
sum("match_count").alias("match_count"),
sum("mismatch_count").alias("mismatch_count"),
first(col("mismatch_example_left"), ignorenulls = True).alias("mismatch_example_left"),
first(col("mismatch_example_right"), ignorenulls = True).alias("mismatch_example_right"),
first(
when(coalesce(col("mismatch_example_left"), col("mismatch_example_right")).isNotNull(), col("customer_id"))\
.otherwise(lit(None)),
ignorenulls = True
)\
.alias("mismatch_example_customer_id")
)\
.orderBy(col("mismatch_count").desc(), col("column_name"))

Below are the steps that are performed to compare two DataFrames in compare column Gem:

  • Pivot the DataFrame to get the key column's, compare column name and value
  • Join the pivoted DataFrames and compare the column value using key column's
  • Calculate the match and mismatch record counts
note

Repartition the DataFrames as they will be exploded and joined with each other