Skip to main content

Gem Builder reference for Spark

This page provides information about how gems are written in code. Reference this page when you are building or editing custom gems.

Requirements

Some options require a specific gemLibsVersion. To update this, you must manually change the gemLibsVersion value inside pbt_project.yml in your project Git repository.

Mode

There are a few different types of gems that you can create. The table below describes each mode you can choose.

ModeDescriptionAdditional settings
TransformationEdits intermediate data in the pipeline that is in-memory.Choose the category of the transformation gem
Dataset FormatReads and writes data between storage and memory.Choose whether the type is batch or streaming
Custom Subgraph (Python only)Controls the flow of gems. Visit the Subgraph page for an example.None

Classes

The following classes must be included in all Spark gems. Each class extends a base class that Prophecy has defined.

  • A class where you inherit the representation of the overall gem.
  • A class that contains the properties to be made available to the user for this particular gem.
  • A class that defines the Spark code that needs to run on your Spark cluster.
ClassBase Class for TransformationBase Class for Dataset FormatBase Class for Custom Subgraph
class CustomGem(BaseClass)ComponentSpecDatasetSpecMetaComponentSpec
class YourProperties(BaseClass)ComponentPropertiesComponentPropertiesMetaComponentProperties
class YourCode(BaseClass)ComponentCodeComponentCodeMetaComponentCode

Functions

The following functions can be used to customize Spark gems.

FunctionPurposeReturnGem Mode
optimizeCodeEnables the Prophecy optimizer to simplify the gem code when it runs.BooleanAll
customOutputSchemaEnabledEnables the custom schema option by default in the gem. Requires gemLibsVersion 1.1.47+ for Scala.BooleanTransformation
dialogDefines how you want the gem to look like in the visual interface.Dialog objectTransformation and Subgraph
sourceDialogDefines how you want the source gem to look like in the visual interface.DatasetDialog objectDataset and Subgraph
targetDialogDefines how you want the target gem to look like in the visual interface.DatasetDialog objectDataset and Subgraph
validateDefines how to detect user errors when using the gem.Diagnostics arrayAll
onChangeDefine UI state transformations.Properties objectAll
serializeProperty(Scala only) Takes a Properties object and converts it into JSON format.StringAll
deserializeProperty(Scala only) Parses a JSON string and converts it into a Properties object.Properties objectAll
applyIncluded in the class that extends component code to define Spark logic.None, DataFrame, or list of DataFramesTransformation and Subgraph
sourceApplyIncluded in the class that extends component code to define Spark logic.DataFrameDataset
targetApplyIncluded in the class that extends component code to define Spark logic.NoneDataset

Examples

Parent Class

class Filter(ComponentSpec):
name: str = "Filter"
category: str = "Transform"
def optimizeCode(self) -> bool:
return True

Properties Classes

@dataclass(frozen=True)
class FilterProperties(ComponentProperties):
columnsSelector: List[str] = field(default_factory=list)
condition: SColumn = SColumn("lit(True)")

Dialog (UI)

def dialog(self) -> Dialog:
return Dialog("Filter").addElement(
ColumnsLayout(height="100%")
.addColumn(PortSchemaTabs(selectedFieldsProperty=("columnsSelector")).importSchema(), "2fr")
.addColumn(StackLayout(height=("100%"))
.addElement(TitleElement("Filter Condition"))
.addElement(
Editor(height=("100%")).withSchemaSuggestions().bindProperty("condition.expression")
), "5fr"))

After the Dialog object is defined, it is serialized as JSON and rendered in the UI. When you preview this visual interface of the example code above, it appears like this:

Dialog

Various UI components can be added to this function such as scroll boxes, tabs, buttons, and more. You can also group these components into different panels.

Column Selector: This is a special property that you should add if you want to select the columns from UI and then highlight the used columns using the onChange function.

Validation

def validate(self, component: Component[FilterProperties]) -> List[Diagnostic]:
return validateSColumn(component.properties.condition, "condition", component)

State Changes

def onChange(self, oldState: Component[FilterProperties], newState: Component[FilterProperties]) -> Component[
FilterProperties]:
newProps = newState.properties
usedColExps = getColumnsToHighlight2([newProps.condition], newState)
return newState.bindProperties(replace(newProps, columnsSelector=usedColExps))

Component Code

class FilterCode(ComponentCode):
def __init__(self, newProps):
self.props: Filter.FilterProperties = newProps

def apply(self, spark: SparkSession, in0: DataFrame) -> DataFrame:
return in0.filter(self.props.condition.column())

If you want to test your Spark code, you can modify properties in the visual preview and save the changes. Then, you can see the generated Spark code which will eventually run on your cluster.

info

To keep gems generally compatible with each other, they must conform to a common interface. Therefore, as defined in the apply() method, gems must accept and produce DataFrame objects at the input and output ports.

note

To assist the Spark Catalyst Optimizer to build scalable code, Prophecy performs some minor optimizations to the code generated by the apply() method.

Dataset Format example

The previous examples were for Transformation gems. The following example is the code for a Dataset Format gem.

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType

from prophecy.cb.server.base.ComponentBuilderBase import ComponentCode, Diagnostic, SeverityLevelEnum
from prophecy.cb.server.base.DatasetBuilderBase import DatasetSpec, DatasetProperties, Component
from prophecy.cb.ui.uispec import *


class ParquetFormat(DatasetSpec):
name: str = "parquet"
datasetType: str = "File"

def optimizeCode(self) -> bool:
return True

@dataclass(frozen=True)
class ParquetProperties(DatasetProperties):
schema: Optional[StructType] = None
description: Optional[str] = ""
useSchema: Optional[bool] = False
path: str = ""
mergeSchema: Optional[bool] = None
datetimeRebaseMode: Optional[str] = None
int96RebaseMode: Optional[str] = None
compression: Optional[str] = None
partitionColumns: Optional[List[str]] = None
writeMode: Optional[str] = None
pathGlobFilter: Optional[str] = None
modifiedBefore: Optional[str] = None
modifiedAfter: Optional[str] = None
recursiveFileLookup: Optional[bool] = None

def sourceDialog(self) -> DatasetDialog:
return DatasetDialog("parquet") \
.addSection("LOCATION", TargetLocation("path")) \
.addSection(
"PROPERTIES",
ColumnsLayout(gap=("1rem"), height=("100%"))
.addColumn(
ScrollBox().addElement(
StackLayout(height=("100%"))
.addElement(
StackItem(grow=(1)).addElement(
FieldPicker(height=("100%"))
.addField(
TextArea("Description", 2, placeholder="Dataset description..."),
"description",
True
)
.addField(Checkbox("Use user-defined schema"), "useSchema", True)
.addField(Checkbox("Merge schema"), "mergeSchema")
.addField(
SelectBox("Datetime Rebase Mode")
.addOption("EXCEPTION", "EXCEPTION")
.addOption("CORRECTED", "CORRECTED")
.addOption("LEGACY", "LEGACY"),
"datetimeRebaseMode"
)
.addField(
SelectBox("Int96 Rebase Mode")
.addOption("EXCEPTION", "EXCEPTION")
.addOption("CORRECTED", "CORRECTED")
.addOption("LEGACY", "LEGACY"),
"int96RebaseMode"
)
.addField(Checkbox("Recursive File Lookup"), "recursiveFileLookup")
.addField(TextBox("Path Global Filter").bindPlaceholder(""), "pathGlobFilter")
.addField(TextBox("Modified Before").bindPlaceholder(""), "modifiedBefore")
.addField(TextBox("Modified After").bindPlaceholder(""), "modifiedAfter")
)
)
),
"auto"
)
.addColumn(SchemaTable("").bindProperty("schema"), "5fr")
) \
.addSection(
"PREVIEW",
PreviewTable("").bindProperty("schema")
)

def targetDialog(self) -> DatasetDialog:
return DatasetDialog("parquet") \
.addSection("LOCATION", TargetLocation("path")) \
.addSection(
"PROPERTIES",
ColumnsLayout(gap=("1rem"), height=("100%"))
.addColumn(
ScrollBox().addElement(
StackLayout(height=("100%")).addElement(
StackItem(grow=(1)).addElement(
FieldPicker(height=("100%"))
.addField(
TextArea("Description", 2, placeholder="Dataset description..."),
"description",
True
)
.addField(
SelectBox("Write Mode")
.addOption("error", "error")
.addOption("overwrite", "overwrite")
.addOption("append", "append")
.addOption("ignore", "ignore"),
"writeMode"
)
.addField(
SchemaColumnsDropdown("Partition Columns")
.withMultipleSelection()
.bindSchema("schema")
.showErrorsFor("partitionColumns"),
"partitionColumns"
)
.addField(
SelectBox("Compression Codec")
.addOption("none", "none")
.addOption("uncompressed", "uncompressed")
.addOption("gzip", "gzip")
.addOption("lz4", "lz4")
.addOption("snappy", "snappy")
.addOption("lzo", "lzo")
.addOption("brotli", "brotli")
.addOption("zstd", "zstd"),
"compression"
)
)
)
),
"auto"
)
.addColumn(SchemaTable("").isReadOnly().withoutInferSchema().bindProperty("schema"), "5fr")
)

def validate(self, component: Component) -> list:
diagnostics = super(ParquetFormat, self).validate(component)
if len(component.properties.path) == 0:
diagnostics.append(
Diagnostic("properties.path", "path variable cannot be empty [Location]", SeverityLevelEnum.Error))
return diagnostics

def onChange(self, oldState: Component, newState: Component) -> Component:
return newState

class ParquetFormatCode(ComponentCode):
def __init__(self, props):
self.props: ParquetFormat.ParquetProperties = props

def sourceApply(self, spark: SparkSession) -> DataFrame:
reader = spark.read.format("parquet")
if self.props.mergeSchema is not None:
reader = reader.option("mergeSchema", self.props.mergeSchema)
if self.props.datetimeRebaseMode is not None:
reader = reader.option("datetimeRebaseMode", self.props.datetimeRebaseMode)
if self.props.int96RebaseMode is not None:
reader = reader.option("int96RebaseMode", self.props.int96RebaseMode)
if self.props.modifiedBefore is not None:
reader = reader.option("modifiedBefore", self.props.modifiedBefore)
if self.props.modifiedAfter is not None:
reader = reader.option("modifiedAfter", self.props.modifiedAfter)
if self.props.recursiveFileLookup is not None:
reader = reader.option("recursiveFileLookup", self.props.recursiveFileLookup)
if self.props.pathGlobFilter is not None:
reader = reader.option("pathGlobFilter", self.props.pathGlobFilter)

if self.props.schema is not None and self.props.useSchema:
reader = reader.schema(self.props.schema)

return reader.load(self.props.path)

def targetApply(self, spark: SparkSession, in0: DataFrame):
writer = in0.write.format("parquet")
if self.props.compression is not None:
writer = writer.option("compression", self.props.compression)

if self.props.writeMode is not None:
writer = writer.mode(self.props.writeMode)
if self.props.partitionColumns is not None and len(self.props.partitionColumns) > 0:
writer = writer.partitionBy(*self.props.partitionColumns)

writer.save(self.props.path)