CosmosDB
The azure-cosmos-spark
connector is an integration between Azure CosmosDB and Apache Spark, enabling you to read and write data from and to CosmosDB using Spark. This document will cover the key properties available for configuration when using this connector.
Installation
On your Execution Cluster (Databricks or OnPrem), connect a dependency that has the following Maven coordinates (for Spark 3.3; For others please check this link):
com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.18.1
Usage
Common Auth Properties
These properties are shared between the read and write operations.
Property | Required | Default Value | Description |
---|---|---|---|
Authentication Type | True | MasterKey | There are two auth types are supported currently: MasterKey (PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrinciple . |
Azure Environment | True | Azure | The azure environment of the CosmosDB account: Azure, AzureChina, AzureUsGovernment, AzureGermany. |
Account Endpoint | True | None | The CosmosDB account URI. |
Account Key | True | None | The CosmosDB account key. |
Subscription ID | False | None | The subscriptionId of the CosmosDB account. Required for ServicePrinciple authentication. |
Tenant | False | None | The tenantId of the CosmosDB account. Required for ServicePrinciple authentication. |
Resource Group Name | False | None | The resource group of the CosmosDB account. Required for ServicePrinciple authentication. |
Client Id | False | None | The clientId/ApplicationId of the service principle. Required for ServicePrinciple authentication. |
Client Secret | False | None | The client secret/password of the service principle. Required for ServicePrinciple authentication. |
Database | True | None | The CosmosDB database name. |
Source Properties
Important Source Properties
These properties are specific to read operations.
Property | Required | Default Value | Description |
---|---|---|---|
Data Source | True | DB Table | Whether all data in container should be loaded or should data be returned using a custom query |
Container | False | None | The CosmosDB container name. |
Custom Query | False | SELECT 1 | When provided the custom query will be processed against the CosmosDB endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of predicates like aggregates (count, group by, avg, sum, etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. If specified, with schema inference enabled, the custom query will also be used to infer the schema. |
Other Source Properties
Property | Required | Default Value | Description |
---|---|---|---|
Use Gateway Mode | False | false | Use gateway mode for the client operations. |
Force Eventual Consistency | False | true | Makes the client use Eventual consistency for read operations instead of using the default account level consistency. |
Application name | False | None | Application name |
Preferred Regions List | False | None | Preferred regions list to be used for a multi region CosmosDB account. This is a comma separated value (e.g., [East US, West US] or East US, West US) provided preferred regions will be used as hint. You should use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as preferred region. See list of azure regions here. Note that you can also use spark.cosmos.preferredRegions as alias. |
Disable Tcp Connection Endpoint Rediscovery | False | false | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery. |
Allow Invalid JSON With Duplicate JSON Properties | False | false | By default (when set to false) the CosmosDB Java SDK and Spark connector will raise a hard failure when JSON documents are read that contain JSON object with multiple properties of the same name. This config option can be used to override the behavior and silently ignore the invalid JSON and instead use the last occurrence of the property when parsing the JSON. NOTE: This is only meant to be used as a temporary workaround. We strongly recommend fixing the invalid JSON from even being ingested into the data and only use this workaround while cleaning up the documents with invalid JSON. |
Max Item Count | False | 1000 | Overrides the maximum number of documents that can be returned for a single query or change feed request. The default value is 1000. Consider increasing this only for average document sizes significantly smaller than 1KB or when projection reduces the number of properties selected in queries significantly (like when only selecting "id" of documents, etc.). |
Max Integrated Staleness in milliseconds | False | None | Sets the max staleness window in milliseconds for the point read or query request results in the integrated cache when using the dedicated gateway. Learn more about max integrated cache staleness here. |
Inclusion Mode | False | Always | Determines whether null/default values will be serialized to JSON or whether properties with null/default value will be skipped. The behavior follows the same ideas as Jackson's JsonInclude.Include. Always means JSON properties are created even for null and default values. NonNull means no JSON properties will be created for explicit null values. NonEmpty means JSON properties will not be created for empty string values or empty arrays/mpas. NonDefault means JSON properties will be skipped not just for null/empty but also when the value is identical to the default value 0 for numeric properties for example. |
DateTime Conversion Mode | False | Default | The date/time conversion mode options are Default , AlwaysEpochMilliseconds , AlwaysEpochMillisecondsWithSystemDefaultTimezone . With Default , the standard Spark 3.* behavior is used: java.sql.Date/java.time.LocalDate are converted to EpochDay, java.sql.Timestamp/java.time.Instant are converted to MicrosecondsFromEpoch. With AlwaysEpochMilliseconds , the same behavior as the CosmosDB connector for Spark 2.4 is used: java.sql.Date, java.time.LocalDate, java.sql.Timestamp and java.time.Instant are converted to MillisecondsFromEpoch. The behavior for AlwaysEpochMillisecondsWithSystemDefaultTimezone is identical with AlwaysEpochMilliseconds except that the connector will assume System default time zone / Spark session time zone (specified via spark.sql.session.timezone ) instead of UTC when the date/time to be parsed has no explicit time zone. |
Schema Conversion Mode | False | Relaxed | The schema conversion behavior (Relaxed , Strict ). When reading JSON documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a null value (Relaxed ) or an exception (Strict ). |
Partitioning Strategy | False | Default | The partitioning strategy used (Default , Custom , Restrictive or Aggressive ) |
Partitioning Targeted Count | False | None | An Integer value representing the targeted Partition Count. This parameter is optional and ignored unless strategy is Custom . In this case the Spark Connector won't dynamically calculate the number of partitions but stick with this value. |
Partitioning Feed Range Filter | False | None | Can be used to scope the query to a single logical CosmosDB partition (or a subset of logical partitions). If this parameter is optionally provided, the partitioning strategy will be modified - only partitions for the scoped logical partitions will be created. So, the main benefit of this config option is to reduce the necessary SparkTasks/Partitions. |
Schema Inference Related Configs
Property | Required | Default Value | Description |
---|---|---|---|
Enable Infer Schema | True | true | Whether all data in the container should be loaded or should data be returned using a custom query |
Enable Custom Query for Inferring Schema | False | None | Checkbox, which if marked, will open up an Editor for writing Custom SQL Query |
Make all Columns Nullable | False | true | When schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns (except cosmos system properties) will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column. |
Record Sample Size for Schema Inference | False | 1000 | Sampling size to use when inferring schema and not using a query. |
Include all System Properties | False | false | When schema inference is enabled, whether the resulting schema will include all CosmosDB system properties. |
Include Document Timestamp field | False | false | When schema inference is enabled, whether the resulting schema will include the document Timestamp (_ts). Not required if spark.cosmos.read.inferSchema.includeSystemProperties is enabled, as it will already include all system properties. |
Infer Schema Query | False | SELECT * FROM r | When schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns. |
Throughput Control Configs
Having throughput control helps to isolate the performance needs of applications running against a container, by limiting the amount of request units that can be consumed by a given Spark client. There are several advanced scenarios that benefit from client-side throughput control:
- Different operations and tasks have different priorities - there can be a need to prevent normal transactions from being throttled due to data ingestion or copy activities. Some operations and/or tasks aren't sensitive to latency, and are more tolerant to being throttled than others.
- Provide fairness/isolation to different end users/tenants - An application will usually have many end users. Some users may send too many requests, which consume all available throughput, causing others to get throttled.
- Load balancing of throughput between different Azure Cosmos DB clients - in some use cases, it's important to make sure all the clients get a fair (equal) share of the throughput.
Property | Required | Default Value | Description |
---|---|---|---|
Throughput Control Enabled | False | false | Whether throughput control is enabled |
Throughput Control: Account Endpoint | False | None | CosmosDB Account Endpoint Uri for throughput control. If not defined, then spark.cosmos.accountEndpoint will be used. |
Throughput Control Account Key | False | None | CosmosDB Account Key for throughput control. |
Throughput Control Preferred Regions List | False | None | Preferred regions list to be used for a multi region CosmosDB account. This is a comma separated value (e.g., [East US , West US ] or East US , West US ) provided preferred regions will be used as hint. You should use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as preferred region. See list of azure regions here. |
Disable TCP connection endpoint Rediscovery | False | false | Can be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery. |
Use Gateway Mode | False | false | Use gateway mode for the client operations |
Use Dedicated Container | False | true | Flag to indicate when configured with throughput control, whether dedicated throughput control container will be provided. |
Throughput control group name | False | None | Throughput control group name |
Throughput control group target throughput | False | None | Throughput control group target throughput in number of request units |
Throughput control group target throughput threshold | False | None | Throughput control group target throughput threshold in request units |
Database which will be used for throughput global control | False | None | Database which will be used for throughput global control |
Container which will be used for throughput global control | False | None | Container which will be used for throughput global control |
Renew Interval in milliseconds | False | 5s | How often the client is going to update the throughput usage of itself |
Expire Interval in milliseconds | False | 11s | How quickly an offline client will be detected |
More information about Throughput control is available here.
Target Properties
These properties are specific to write operations.
Write Configurations
Property | Required | Default Value | Description |
---|---|---|---|
Write Mode | True | append | Spark Write Mode. The possible values are error , append , overwrite and ignore . More information on Write Modes is available here. |
Date Format | False | yyyy-MM-dd | Format for Date columns. More information on the possible formats is given here. |
Timestamp Format | False | yyyy-MM-dd'T'HH:mm:ss.SSSXXX | Format for Timestamp columns. More information on the possible formats is given here. |
Write Strategy | True | ItemOverwrite | CosmosDB Item Write Strategy: ItemOverwrite (using upsert); ItemOverwriteIfNotModified (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as ItemAppend , if an etag value exists the connector will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row), ItemAppend (using create, ignore pre-existing items i.e., Conflicts), ItemDelete (delete all documents), ItemDeleteIfNotModified (delete all documents for which the etag hasn't changed),ItemPatch (Partial update all documents based on the patch config). |
Max Retry Attempts | False | 10 | CosmosDB Write Max Retry Attempts on retriable failures (e.g., connection error, write error, etc.) |
Max Concurrency | False | None | CosmosDB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size. |
Max No. of Pending Bulk Operations | False | None | CosmosDB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of 1000 x Cores . |
Enable Write Bulk | False | true | CosmosDB Item Write bulk enabled |
Patch Specific Configurations
These Configurations are valid only when Write Strategy
is given as ItemPatch
Property | Required | Default Value | Description |
---|---|---|---|
Default Patch Operation Type | True | Replace | Default CosmosDB patch operation type. Supported types include none , add , set , replace , remove , increment . Choose none for no-op, for others please reference here for full context. |
Patch Column Configs | False | None | CosmosDB patch column configs. It can contain multiple definitions that match the following patterns (separated by comma) => col(column).op(operationType) or col(column).path(patchInCosmosdb).op(operationType) - The difference of the second pattern is that it also allows you to define a different CosmosDB path. |
Patch Filter | False | None | Used for Conditional patch |