Skip to main content

BigQuery

Built on

This connector is built on top of the already available Apache Spark SQL connector for Google BigQuery.

For non-Databricks clusters, install the corresponding library, and see Spark BigQuery library compatibility matrix documentation.

You can read from and write to BigQuery.

Parameters

ParameterTabDescription
Parent Project NameLocationGoogle Cloud Project ID of the table to bill for the export.
Table NameLocationName of the table.
CredentialsLocationHow you want to connect to BigQuery.
Possible values are: None, JSON Credentials Filepath, or Databricks secrets.
To learn more, see Credentials.
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

Credentials

Credential typeDescription
NoneYou don't have to set credentials if the BigQuery configurations are set at the cluster level.
JSON Credentials FilepathBigQuery JSON key configuration you can pass to BigQuery.
To learn how, see Retrieve JSON Credentials.
Databricks secretsIf the JSON configuration is directly stored on pipeline configuration as Databricks secrets, then refer to the config variable as ${config_variable}. If the configuration variable above is Base64 encoded, enable Is secret base64 encoded.

Retrieve JSON credentials

To get your JSON Credentials from BigQuery:

  1. Navigate to your Google Cloud Credentials page.

  2. In the top navigation bar, click + CREATE CREDENTIALS.

  3. Select Service account.

  4. If you don't have a Service account, create a service account.

    a. If you don't have a Service account, enter your Service account name, Service account ID, and Service account description. Then click Create and continue.
    b. Click Done.

  5. Under the Service Accounts section, click on your service account email.

  6. Navigate to the Keys tab.

  7. Create a new key.

    a. Click Add key.
    b. Click Create new key
    c. Select the JSON key type.
    d. Click Create.

Source

The Source gem reads data from BigQuery and allows you to optionally specify the following additional properties.

Source properties

PropertiesDescriptionDefault
DescriptionDescription of your dataset.None
Project NameGoogle Cloud Project ID of the table.Project of the Service Account
Dataset NameDataset containing the table.
This is required unless you mention it in the Table Name.
None
Maximum partitionsMaximum number of partitions to split the data into.
The actual number may be less if BigQuery deems the data small enough.
None
Minimum partitionsMinimum number of partitions to split the data into.
The actual number may be less if BigQuery deems the data small enough.
None
Enables read viewsWhether to enable the connector to read from views and not only tables.false
MaterializedView projectIDProject ID where you create the materialized view.None
MaterializedView datasetDataset where you create the materialized view.
This dataset should be in the same location as the view or the queried tables.
None
Materialized expiration time in min'sExpiration time in minutes of the temporary table holding the materialized data of a view or a query.
The connector may re-use the temporary table due to the use of local cache and to reduce BigQuery computation, so very low values may cause errors.
None
Read dataformatData format for reading from BigQuery.
Possible values are: ARROW, or AVRO
Note: Unsupported Arrow filters are not pushed down and results are filtered later by Spark. Currently, Arrow does not support disjunction across columns.
None
Enable optimize-empty-projectionWhether the connector uses an optimized empty projection (a SELECT without any columns) logic for a count() execution.false
Enable push-all-filtersWhether to push all the filters Spark can delegate to BigQuery Storage API.
This reduces the amount of data that BigQuery Storage API servers need to send to Spark clients.
true
Additional Job LabelsLabels to add to the connector-initiated query and load BigQuery jobs.None
Traceability Application NameApplication name to trace BigQuery Storage read and write sessions.
You must set this property to set the trace ID on the sessions.
None
Traceability Job IDJob ID to trace BigQuery Storage read and write sessions.None
Proxy URLHTTP proxy and address in the host:port format.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.address.
None
Proxy usernameUsername to connect to the proxy.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.username.
None
Proxy passwordPassword to connect to the proxy.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.password.
None
Maximum HTTP retriesMaximum number of retries for the low-level HTTP requests to BigQuery.
You can alternatively set in the Spark configuration spark.conf.set("httpMaxRetry", ...), or Hadoop Configuration fs.gs.http.max.retry.
10
HTTP Connection timeout in MSec'sTimeout in milliseconds to establish a connection with BigQuery.
You can alternatively set in the Spark configuration spark.conf.set("httpConnectTimeout", ...), or Hadoop Configuration fs.gs.http.connect-timeout.
60000
HTTP Read timeout in MSec'sTimeout in milliseconds to read data from an established connection.
You can alternatively set in the Spark configuration spark.conf.set("httpReadTimeout", ...), or Hadoop Configuration fs.gs.http.read-timeout.
60000
Arrow Compression CodecCompression codec to use while reading from a BigQuery table when using Arrow format.
Possible values are: ZSTD, LZ4_FRAME, or COMPRESSION_UNSPECIFIED.
COMPRESSION_UNSPECIFIED
Cache expiration time in min'sExpiration time of the in-memory cache storing query information.
To disable caching, set the value to 0.
15
Cache read session timeout in sec'sTimeout in seconds to create a read session when reading a table.
For extremely large tables, this value should be increased.
600
GCP Access TokenGCP token that allows you to use Google API's.None
Conversation datetime zone IDTime zone ID to use when converting BigQuery's DATETIME into Spark's Timestamp, and vice versa.
To see a full list, run java.time.ZoneId.getAvailableZoneIds() in Java/Scala, or sc.\_jvm.java.time.ZoneId.getAvailableZoneIds() in PySpark.
UTC
Job query priorityPriority levels set for the job while reading data from a BigQuery query.
Possible values are:
- BATCH, which means to queue and start a query as soon as idle resources are available.
- INTERACTIVE, which is automatically selected if the query hasn't started within 3 hours.
None
Filter ConditionOne or more boolean expressions to further filter results for the output.
Supports SQL, Python, and Scala expressions.
None

Example

The following example fetches all customer data from BigQuery table using Prophecy.

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def read_bigquery(spark: SparkSession) -> DataFrame:
return spark.read\
.format("bigquery")\
.option("credentials", "dbfs:/bucket/prefix/file.json")\
.option("table", "tablename")\
.load()

Target

The Target gem writes data to BigQuery and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
Job query priorityPriority levels set for the job while reading data from a BigQuery query.
This property is available when DIRECT write is used with OVERWRITE mode, where the connector overwrites the destination table using MERGE statement.
Possible values are:
- BATCH, which means to queue and start a query as soon as idle resources are available.
- INTERACTIVE, which is automatically selected if the query hasn't started within 3 hours.
None
Project NameGoogle Cloud Project ID of the table.Project of the Service Account
Dataset NameDataset containing the table.
This is required unless you mention it in the Table Name.
None
Table labelsOne or more labels to add to the table while writing.None
Disposition creationSpecifies whether the job can create new tables.
Possible values are:
- Create table if not exists, which configures the job to create the table if it does not exist.
- Don't create table, which configures the job to fail if the table does not exist.
None
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.None
Write MethodMethod to write data to BigQuery.
Possible values are:
- Use storage write API, which directly uses the BigQuery Storage Write API
- Write first to GCS and Load, which writes the data first to GCS and then triggers a BigQuery load operation
None
Temporary GCS BucketGCS bucket that temporarily holds the data before it loads to BigQuery.
This is required unless set in the Spark configuration spark.conf.set(...).
Note: This is not supported by the DIRECT write method.
None
Persistent GCS BucketGCS bucket that holds the data before it loads to BigQuery.
If informed, the data won't be deleted after writing data into BigQuery.
Note: This is not supported by the DIRECT write method.
None
Persistent GCS PathGCS path that holds the data before it loads to BigQuery.
This is only used with Persistent GcCS Bucket.
Note: This is not supported by the DIRECT write method.
None
Intermediate dataformatFormat of the data before it loads to BigQuery.
Possible values are: Parquet,ORC, or Avro
In order to use the Avro format, you must add the spark-avro package in runtime.
Parquet
Date partitionDate partition in the format YYYYMMDD that the data writes to.
You can use this to overwrite the data of a single partition:
df.write.format("bigquery").option("datePartition", "20220331")
.mode("overwrite").save("table")
You can also use this with different partition types: HOUR: YYYYMMDDHH, MONTH: YYYYMM, YEAR: YYYY
None
Partition expiration MSec'sNumber of milliseconds to keep the storage for partitions in the table.
The storage in a partition has an expiration time of its partition time plus this value.
NOTE: This is not supported by the DIRECT write method.
None
Partition type of the fieldType of partition field. Possible values are: Hour, Day, Month, or Year.
Note: This property is mandatory for a target table to be partitioned, and is not supported by the DIRECT write method.
Day
Partition fieldField to partition the table by.
The field must be a top-level TIMESTAMP or DATE field. Its mode must be NULLABLE or REQUIRED. If the property is not set for a partitioned table, then the table will be partitioned by a pseudo column, referenced through either '_PARTITIONTIME' as TIMESTAMP type, or '_PARTITIONDATE' as DATE type.
NOTE: You must specify this field with partitionType. This field is also not supported by the DIRECT write method.
None
Enable allow-field-additionWhether to add the ALLOW_FIELD_ADDITION SchemaUpdateOption to the BigQuery LoadJob.false
Enable allow-field-relaxationWhether to add the ALLOW_FIELD_RELAXATION SchemaUpdateOption to the BigQuery LoadJob.false
Proxy URLHTTP proxy and address in the host:port format.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.address.
None
Proxy usernameUsername to connect to the proxy.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.username.
None
Proxy passwordPassword to connect to the proxy.
You can alternatively set this in the Spark configuration spark.conf.set(...), or Hadoop Configuration fs.gs.proxy.password.
None
Maximum HTTP retriesMaximum number of retries for the low-level HTTP requests to BigQuery.
You can alternatively set in the Spark configuration spark.conf.set("httpMaxRetry", ...), or Hadoop Configuration fs.gs.http.max.retry.
10
HTTP Connection timeout in MSec'sTimeout in milliseconds to establish a connection with BigQuery.
You can alternatively set in the Spark configuration spark.conf.set("httpConnectTimeout", ...), or Hadoop Configuration fs.gs.http.connect-timeout.
60000
Enable mode-check-for-schema-fieldsWhether to check the mode of every field in the destination schema to be equal to the mode in the corresponding source field schema, during DIRECT write.true
Enable list-interfaceWhether to use schema inference when the mode is Parquet.true
Conversation datetime zone IDTime zone ID to use when converting BigQuery's DATETIME into Spark's Timestamp, and vice versa.
To see a full list, run java.time.ZoneId.getAvailableZoneIds() in Java/Scala, or sc.\_jvm.java.time.ZoneId.getAvailableZoneIds() in PySpark.
UTC
GCP Access TokenGCP token that allows you to use Google API's.None

Supported write modes

Write modeDescription
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
appendIf the data already exists, append the contents of the DataFrame.

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

Direct write using the BigQuery Storage Write API:

def write_bigquery(spark: SparkSession, in0: DataFrame):
in0.write \
.format("bigquery") \
.option("writeMethod", "direct") \
.save("dataset.table")

Indirect write using the BigQuery Storage Write API:

def write_bigquery(spark: SparkSession, in0: DataFrame):
in0.write \
.format("bigquery") \
.option("temporaryGcsBucket","some-bucket") \
.save("dataset.table")