CosmosDB
The azure-cosmos-spark
connector is an integration between Azure CosmosDB and Apache Spark, which allows you to read and write data from and to CosmosDB using Spark.
Installation
For Spark 3.3, connect a dependency that has the following Maven coordinates on your Databricks or OnPrem Execution Cluster:
com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.18.1
For other versions of Spark, see Azure Cosmos DB OLTP Spark 3 connector.
Usage
Parameters
Parameter | Tab | Description | Default |
---|---|---|---|
Authentication Type | Location | How you want to authenticate. Possible values are: Master Key (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), Service Principle Based To see all the additional parameters for each authentication type, see Master key authentication and Service principle based authentication. | Master Key |
Data Source | Location | Whether the gem should load all data in the container, or return the data using a custom query. | DB Table |
Azure Environment | Location | Azure environment of the CosmosDB account. Possible values are: Azure , AzureChina , AzureUsGovernment , or AzureGermany . | Azure |
Custom Query | Location | Custom query to process against the CosmosDB endpoint instead of dynamically generating the query through the predicate push down. Usually it is recommended to rely on Spark's predicate push down because that generates the most efficient set of filters based on the query plan. But in Spark 3.1, there were a couple of predicates such as aggregates that cannot be pushed down yet, so the custom query is a fallback that allows you to push them into the query sent to CosmosDB. If specified, with schema inference enabled, the custom query will also be used to infer the schema. | SELECT 1 |
Schema | Properties | Schema to apply on the loaded data. In the Source gem, you can define or edit the schema visually or in JSON code. In the Target gem, you can view the schema visually or as JSON code. |
Master key authentication
Parameter | Description |
---|---|
Account Endpoint | Account URI of your CosmosDB account. |
Account Key | Account key of your CosmosDB account. |
Database | Database name of your CosmosDB account. |
Service principle based authentication
Parameter | Description |
---|---|
Subscription Id | Subscription Id of your CosmosDB account. |
Tenant Id | Tenant Id of your CosmosDB account. |
Resource Group Name | Resource group of your CosmosDB account. |
Client Id | Client Id, or Application Id of your service principle. |
Client Secret | Client secret, or password of your service principle. |
Source
The Source gem reads data from CosmosDB and allows you to optionally specify the following additional properties.
Source properties
Properties | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Enable Infer Schema | Whether the Source gem should load all data in the container. If not, the Source gem returns the data using a custom query. | true |
Application name | Name of your application. | None |
Inclusion Mode | Determines whether to serialize or skip null and default values to JSON. Possible values are: - Always , which means the Source gem should create JSON properties for null and default values - NonNull , which means the Source gem does not create JSON properties for null values - NonEmpty , which means the Source gem does not create JSON properties for empty values | Always |
DateTime Conversion Mode | How to convert your DateTime values. Possible values are: - Default , which means to convert java.sql.Date/java.time.LocalDate to EpochDay , and java.sql.Timestamp/java.time.Instant to MicrosecondsFromEpoch . - AlwaysEpochMilliseconds , which means to convert java.sql.Date , java.time.LocalDate , and java.sql.Timestamp and java.time.Instant to MillisecondsFromEpoch . - AlwaysEpochMillisecondsWithSystemDefaultTimezone , which means to convert like AlwaysEpochMilliseconds , but to also assume the System default timezone. | Default |
Schema Conversion Mode | How your document contains an attribute that does not map to the schema type. Possible values are: - Relaxed , which uses a null value - Strict , which throws an exception | Relaxed |
Partitioning Strategy | Partitioning strategy to use. Possible values are: Default , Custom , Restrictive or Aggressive | Default |
Preferred Regions List | List of regions you prefer. | None |
Disable Tcp Connection Endpoint Rediscovery | Whether to disable TCP connection endpoint rediscovery. You should disable TCP connection endpoint rediscovery when using custom domain names with private endpoints in a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery. | false |
Allow Invalid JSON With Duplicate JSON Properties | Whether the CosmosDB Java SDK and Spark connector fails when the Source gem reads JSON documents that contain JSON objects with multiple properties of the same name. | false |
Partitioning Targeted Count | Ideal number of partitions to use. If you have a custom strategy, the Spark Connector uses this value. | None |
Partitioning Feed Range Filter | Scopes the query to a single logical CosmosDB partition, or a subset of logical partitions. The Source gem modifies the partitioning strategy to only create partitions for the scoped logical partitions. This property reduces the necessary Spark tasks and partitions. | None |
Use Gateway Mode | Whether to use gateway mode for the client operations. | false |
Force Eventual Consistency | Whether to make the client use eventual consistency for read operations instead of using the default account level consistency. | true |
Max Item Count | Maximum number of documents to return for a single query or change feed request. Consider increasing this value only for average document sizes significantly smaller than 1KB, or when projection reduces the number of properties selected in queries significantly, such as only selecting id of documents. | 1000 |
Max Integrated Staleness in MS | Maximum staleness window in milliseconds for the point read or query request results in the integrated cache when using the dedicated gateway. To learn more, see MaxIntegratedCacheStaleness. | None |
Throughput control configs
Having throughput controls help isolate the performance needs of applications running against a container by limiting the amount of request units a Spark client consumes.
Several advanced scenarios benefit from client-side throughput control:
-
Different operations and tasks have different priorities
You can prevent normal transactions from throttling due to data ingestion or copy activities. Some operations or tasks aren't sensitive to latency and are more tolerant to being throttled than others.
-
Provide fairness and isolation to different end users
Applications usually have many end users, where some may send too many requests. This consumes all available throughput and causes others to throttle.
-
Load balancing of throughput between different Azure Cosmos DB clients
Assure all the clients get an equal share of the throughput.
Property | Description | Default |
---|---|---|
Enable Throughput Control Options | Whether to enable throughput control. | false |
Throughput Control Account Endpoint | CosmosDB Account Endpoint URI for throughput control. If not defined, the Source gem uses the spark.cosmos.accountEndpoint property. | None |
Throughput Control Account Key | CosmosDB Account Key for throughput control. | None |
Throughput Control Preferred Regions List | Comma-separated list of regions you prefer to use for a multi-region CosmosDB account. Use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as a preferred region. To see a list of Azure regions, see LocationNames Class. | None |
Disable TCP connection endpoint Rediscovery | Whether to disable TCP connection endpoint rediscovery. You should disable TCP connection endpoint rediscovery when using custom domain names with private endpoints in a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery. | false |
Use Gateway Mode | Whether to use gateway mode for the client operations. | false |
Use Dedicated Container | Whether to provide a dedicated throughput control container. | true |
Throughput control group name | Name of your throughput control group. | None |
Renew Interval in MS | How often the client is going to update the throughput usage of itself. | 5s |
Expire Interval in MS | How quickly to detect an offline client. | 11s |
Throughput control group target throughput | Number of request units in your throughput control group target throughput. | None |
Throughput control group target throughput threshold | Threshold in request units of your throughput control group target throughput. | None |
Database which will be used for throughput global control | Database to use for throughput global control. | None |
Container which will be used for throughput global control | Container to use for throughput global control. | None |
To learn more about throughput control, see Azure Cosmos DB Spark connector: Throughput control.
Target
The Target gem writes data to CosmosDB and allows you to optionally specify the following additional properties.
Target properties
Property | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Write Mode | How to handle existing data. For a list of the possible values, see Supported write modes. | append |
Write Strategy | How to write your data. For a list of the possible values, see Supported write strategies. | ItemOverwrite |
Enable Write Bulk | Whether to write to CosmosDB in bulk. | true |
Date Format | String that indicates a date format. | yyyy-MM-dd |
Timestamp Format | String that indicates a timestamp format. | yyyy-MM-dd'T'HH:mm:ss.SSSXXX |
Max Retry Attempts | Number of times CosmosDB writes retry attempts on retriable failures, such as a connection or write error. | 10 |
Max Concurrency | Maximum number of jobs to run at any time. If you don't specify a value, the Target gem determines a value based on the Spark executor VM Size. | None |
Max No. of Pending Bulk Operations | Limit of bulk operations being processed concurrently. If you don't specify a value, the Target gem determines a value based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of 1000 x Cores . | None |
Supported write modes
Write mode | Description |
---|---|
error | If the data already exists, throw an exception. |
overwrite | If the data already exists, overwrite the data with the contents of the DataFrame . |
append | If the data already exists, append the contents of the DataFrame . |
ignore | If the data already exists, do nothing with the contents of the DataFrame . This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL. |
Supported write strategies
Write strategy | Description |
---|---|
ItemOverwrite | Performs an upsert. |
ItemOverwriteIfNotModified | If an ETag property of the row is empty, or null, insert the document, or ignore the document if it already exists. |
ItemAppend | Same as ItemOverwriteIfNotModified . Additionally, if an Etag value exists, the connector attempts to replace the document with ETag pre-condition. If the document changes, the Target gem skips the update and does not update the document with the content of the data frame row. |
ItemDelete | Deletes all documents. |
ItemDeleteIfNotModified | Deletes all documents where the ETag hasn't changed. |
ItemPatch | Partially updates all documents based on the patch configuration. Then, provide values for the additional properties. |
ItemPatch properties
These configurations are valid only when you select ItemPatch
as your Write Strategy
.
Property | Description | Default Value |
---|---|---|
Default Patch Operation Type | Default CosmosDB patch operation type. Possible values are: None , Add , Set , Replace , Remove , or Increment . Choose none for no operation, and for the rest, see Supported operations to determine which one to select. | Replace |
Patch Column Configs | CosmosDB patch column configurations. This can contain multiple definitions that match the separated by comma pattern: - col(column).op(operationType) - col(column).path(patchInCosmosdb).op(operationType) , which also allows you to define a different CosmosDB path. | None |
Patch Filter | Conditional patch to use. | None |