Developer

SchemaTransformer

Schema Transformer is used to rename, add, edit or remove a few columns from the incoming stream. As compared to Reformat that is a set operation (where all the transforms are applied in parallel), here the transforms are applied in order.

Note: This component is similar to Reformat in the transformation applied. Reformat is a SQL select and is preferable when making many changes.

Example

Commonly we have data coming from multiple operational systems where the data is same or similar, but some column names and formats don’t match exactly. Let’s take an example where we have first_name and last_name as incoming columns, but we only want to output a single column name instead. .

This is the incoming data:

.

.

This is what we want it to look like going out:

.

.

So, we’ll add a SchemaTransformer component in the pipeline called Standardize like this:

.

.

that has these transforms:

.

.

Example Code

So notice that the transforms are applied in order and instead of providing all the output columns we only provide the changes to the incoming data.

Warning: Applying many transforms like this can slow down the Spark compilation due to very large logical tree - though after optimizations it will perform the same as using reformat

object Standardize {

  def apply(spark: SparkSession, in: DataFrame): SchemaTransformer = {
    import spark.implicits._

    val out = in
      .withColumn("name", concat(col("first_name"), lit(" "), col("last_name")))
      .drop("first_name")
      .drop("last_name")
      .withColumnRenamed("country_code", "cc")

    out

  }

}
    

# Python code coming soon!