Skip to main content

JDBC

You can read from and write to other databases using JDBC.

Parameters

ParameterTabDescription
UsernameLocationUsername for your JDBC instance.
PasswordLocationPassword for your JDBC instance.
JDBC URLLocationJDBC URL to connect to.
The source-specific connection properties may be specified in the URL.
For example:
- jdbc:postgresql://test.us-east-1.rds.amazonaws.com:5432/postgres
- jdbc:mysql://database-mysql.test.us-east-1.rds.amazonaws.com:3306/mysql
Data SourceLocationStrategy to read data.
In the Source gem, you can select DB Table or SQL Query. In the Target gem, you must enter a table.
To learn more, see DB Table and SQL Query.
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

DB Table

The DB Table option dictates which table to use as the source to read from. You can use anything valid in a FROM clause of a SQL query. For example, instead of a table name, use a subquery in parentheses.

caution

The DB Table option and the query parameter are mutually exclusive, which means that you cannot specify both at the same time.

SQL Query

The SQL Query option specifies which query to use as a subquery in the FROM clause. Spark also assigns an alias to the subquery clause. For example, Spark issues the following query to the JDBC Source:

SELECT columns FROM (<user_specified_query>) spark_gen_alias

The following restrictions exist when you use this option:

  1. You cannot use the query and partitionColumn options at the same time.
  2. If you must specify the partitionColumn option, you can specify the subquery using the dbtable option and qualify your partition columns using the subquery alias provided as part of dbtable.

Source

The Source gem reads data from JDBC and allows you to optionally specify the following additional properties.

Source properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
DriverClass name of the JDBC driver to connect to this URL.
For PostgreSQL: org.postgresql.Driver
For MySQL: com.mysql.cj.jdbc.Driver
None
Push-down PredicateWhether Spark should push down filters to the JDBC data source as much as possible.
Predicate push-down is usually disabled when Spark performs predicate filtering faster than the JDBC data source.
true
Number of PartitionsMaximum number of partitions to use for parallelism in table reading, and the maximum number of concurrent JDBC connections.None
Query TimeoutNumber of seconds the driver to wait for a Statement object to execute.
To specify no limit, enter 0.
0
Fetch SizeNumber of rows to fetch per round trip.
This can help performance on JDBC drivers which default to low fetch size.
0
Session Init StatementCustom SQL statement, or PL/SQL block to execute after you open a database session to the remote database, and before you start reading data.
Use this to implement session initialization code. For example:
option("sessionInitStatement", """BEGIN execute immediate 'alter session set "_serial_direct_read"=true'; END;""")
None
Push-down AggregateWhether Spark should push down aggregates to the JDBC data source.
Aggregate push-down is usually disabled when Spark performs the aggregate faster than the JDBC data source.
NOTE: You can only push down aggregates if you can push down all the aggregate functions and the related filters. Spark assumes that the data source can't fully complete the aggregate and does a final aggregate over the data source output.
false
caution

If you get class not found error while you run your pipeline, add a missing dependency to your cluster.
To learn how to add dependencies for specific JDBC jar, see Spark dependencies.

Example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def Source(spark: SparkSession) -> DataFrame:
return spark.read\
.format("jdbc")\
.option("url", f"{Config.jdbc_url}")\
.option("user", f"{Config.jdbc_username}")\
.option("password", f"{Config.jdbc_password}")\
.option("dbtable", "public.demo_customers_raw")\
.option("pushDownPredicate", True)\
.option("driver", "org.postgresql.Driver")\
.load()

Target

The Target gem writes data to JDBC and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
DriverClass name of the JDBC driver to connect to this URL.
For PostgreSQL: org.postgresql.Driver
For MySQL: com.mysql.cj.jdbc.Driver
None
Number of PartitionsMaximum number of partitions to use for parallelism in table writing, and the maximum number of concurrent JDBC connections.
If the number of partitions exceeds the concurrent JDBC connections limit, call coalesce(numPartitions) to decrease the limit before writing.
None
Query TimeoutNumber of seconds the driver to wait for a Statement object to execute.
To specify no limit, enter 0. This option depends on how JDBC drivers implement the API setQueryTimeout. For example, the h2 JDBC driver checks the timeout of each query instead of an entire JDBC batch.
0
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.error
Batch SizeNumber of rows to insert per round trip.
This can help performance on JDBC drivers.
1000
Isolation LevelTransaction isolation level to apply to the current connection.
Possible values are: NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE. To learn more, see Interface Connection.
READ_UNCOMMITTED
TruncateWhen your SaveMode is set to Overwrite, Spark truncates an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata, such as indices, from being removed.
However, it does not work in some cases, such as when the new data has a different schema. In case of failures, disable the truncate option to use DROP TABLE again. Also, due to the different behavior of TRUNCATE TABLE among DBMS, it's not always safe to use this. MySQLDialect, DB2Dialect, MsSqlServerDialect, DerbyDialect, and OracleDialect supports this while PostgresDialect and the default JDBCDialect doesn't. For an unknown and unsupported JDBCDialect, the user option truncate is ignored.
false
Cascade TruncateWhether to allow TRUNCATE TABLE t CASCADE to execute.
For PostgreSQL, TRUNCATE TABLE ONLY t CASCADE executes to prevent inadvertently truncating descendant tables. This affects other tables.
Default according to the JDBCDialect
Create Table OptionsSet a database-specific table and partition options when creating a table.
For example: CREATE TABLE t (name string) ENGINE=InnoDB
None
Create Table Column TypesDatabase column data types to use instead of the defaults when creating the table.
Specify valid Spark SQL data type information in the same format as CREATE TABLE columns syntax.
For example: "name CHAR(64), comments VARCHAR(1024)"
None

Supported write modes

Write modeDescription
errorIf the data already exists, throw an exception.
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
appendIf the data already exists, append the contents of the DataFrame.
ignoreIf the data already exists, do nothing with the contents of the DataFrame.
This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL.

Example

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

def Target(spark: SparkSession, in0: DataFrame):
in0.write\
.format("jdbc")\
.option("url", f"{Config.jdbc_url}")\
.option("dbtable", "public.demo_customers_raw_output")\
.option("user", f"{Config.jdbc_username}")\
.option("password", f"{Config.jdbc_password}")\
.option("driver", "org.postgresql.Driver")\
.save()