Parquet
Parquet is an open-source Columnar storage data format. It handles large volumes of data by supporting complex pushdown predicates, nested schemas and a wide variety of column encoding types.
This Gem allows you to read from or write to Parquet files.
Source
Reads data from Parquet files at the given path.
Source Parameters
Parameter | Description | Required | Default |
---|---|---|---|
Location | File path where parquet files are present | True | None |
Schema | Schema to be applied on the loaded data. Can be defined/edited as json or inferred using Infer Schema button. | True | None |
Recursive File Lookup | This is used to recursively load files from the given Location. Disables partition discovery. An exception will be thrown if this option and a partitionSpec are specified. | False | False |
Path Global Filter | An optional glob pattern to only include files with paths matching the pattern. The syntax follows GlobFilter. It does not change the behavior of partition discovery. | False | None |
Modified Before | An optional Timestamp to only include files with modification times occurring before the specified Time. The provided timestamp must be in YYYY-MM-DDTHH:mm:ss form (e.g. 2020-06-01T13:00:00 ) | False | None |
Modified After | An optional timestamp to only include files with modification times occurring after the specified Time. The provided timestamp must be in YYYY-MM-DDTHH:mm:ss form (e.g. 2020-06-01T13:00:00 ) | False | None |
Merge Schema | Sets whether schemas should be merged from all collected Parquet part-files. This will override spark.sql.parquet.mergeSchema . | False | (value of spark.sql.parquet. mergeSchema ) |
Int96 Rebase mode | The int96RebaseMode option allows to specify the rebasing mode for INT96 timestamps from the Julian to Proleptic Gregorian calendar. Currently supported modes are: EXCEPTION : fails in reads of ancient INT96 timestamps that are ambiguous between the two calendars.CORRECTED : loads INT96 timestamps without rebasing.LEGACY : performs rebasing of ancient timestamps from the Julian to Proleptic Gregorian calendar. | False | (value of spark.sql.parquet .int96RebaseModeInRead ) |
Datetime Rebase mode | The datetimeRebaseMode option allows to specify the rebasing mode for the values of the DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS logical types from the Julian to Proleptic Gregorian calendar.Currently supported modes are: EXCEPTION : fails in reads of ancient dates/timestamps that are ambiguous between the two calendars.CORRECTED : loads dates/timestamps without rebasing.LEGACY : performs rebasing of ancient dates/timestamps from the Julian to Proleptic Gregorian calendar. | False | (value of spark.sql.parquet .datetimeRebaseModeInRead ) |
Example
Generated Code
- Python
- Scala
def read_parquet(spark: SparkSession) -> DataFrame:
return spark.read\
.format("parquet")\
.option("mergeSchema", True)\
.load("dbfs:/FileStore/Users/parquet/test.parquet")
object read_parquet {
def apply(spark: SparkSession): DataFrame =
spark.read
.format("parquet")
.option("mergeSchema", true)
.load("dbfs:/FileStore/Users/parquet/test.parquet")
}
Target
Target Parameters
Write data as Parquet files at the specified path.
Parameter | Description | Required | Default |
---|---|---|---|
Location | File path where the Parquet files will be written | True | None |
Compression | Compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none , uncompressed , snappy , gzip , lzo , brotli , lz4 , and zstd ). This will override spark.sql.parquet.compression.codec . | False | `snappy |
Write Mode | How to handle existing data. See this table for a list of available options. | True | error |
Partition Columns | List of columns to partition the Parquet files by | False | None |
Supported Write Modes
Write Mode | Description |
---|---|
overwrite | If data already exists, overwrite with the contents of the Dataframe. |
append | If data already exists, append the contents of the Dataframe. |
ignore | If data already exists, do nothing with the contents of the Dataframe. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
error | If data already exists, throw an exception. |
Example
Generated Code
- Python
- Scala
def write_parquet(spark: SparkSession, in0: DataFrame):
in0.write\
.format("parquet")\
.mode("overwrite")\
.save("dbfs:/data/test_output.parquet")
object write_parquet {
def apply(spark: SparkSession, in: DataFrame): Unit =
in.write
.format("parquet")
.mode("overwrite")
.save("dbfs:/data/test_output.parquet")
}
info
To know more about tweaking Parquet related properties in Spark config click here.