Skip to main content

CosmosDB

The azure-cosmos-spark connector is an integration between Azure CosmosDB and Apache Spark, enabling you to read and write data from and to CosmosDB using Spark. This document will cover the key properties available for configuration when using this connector.

Installation

On your Execution Cluster (Databricks or OnPrem), please install a library having the following Maven coordinates (for Spark 3.3; For others please check this link):

com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.18.1

Usage

Common Auth Properties

These properties are shared between the read and write operations.

PropertyRequiredDefault ValueDescription
Authentication TypeTrueMasterKeyThere are two auth types are supported currently: MasterKey(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), ServicePrinciple.
Azure EnvironmentTrueAzureThe azure environment of the CosmosDB account: Azure, AzureChina, AzureUsGovernment, AzureGermany.
Account EndpointTrueNoneThe CosmosDB account URI.
Account KeyTrueNoneThe CosmosDB account key.
Subscription IDFalseNoneThe subscriptionId of the CosmosDB account. Required for ServicePrinciple authentication.
TenantFalseNoneThe tenantId of the CosmosDB account. Required for ServicePrinciple authentication.
Resource Group NameFalseNoneThe resource group of the CosmosDB account. Required for ServicePrinciple authentication.
Client IdFalseNoneThe clientId/ApplicationId of the service principle. Required for ServicePrinciple authentication.
Client SecretFalseNoneThe client secret/password of the service principle. Required for ServicePrinciple authentication.
DatabaseTrueNoneThe CosmosDB database name.

Source Properties

Important Source Properties

These properties are specific to read operations.

PropertyRequiredDefault ValueDescription
Data SourceTrueDB TableWhether all data in container should be loaded or should data be returned using a custom query
ContainerFalseNoneThe CosmosDB container name.
Custom QueryFalseSELECT 1When provided the custom query will be processed against the CosmosDB endpoint instead of dynamically generating the query via predicate push down. Usually it is recommended to rely on Spark's predicate push down because that will allow to generate the most efficient set of filters based on the query plan. But there are a couple of predicates like aggregates (count, group by, avg, sum etc.) that cannot be pushed down yet (at least in Spark 3.1) - so the custom query is a fallback to allow them to be pushed into the query sent to Cosmos. If specified, with schema inference enabled, the custom query will also be used to infer the schema.

Other Source Properties

PropertyRequiredDefault ValueDescription
Use Gateway ModeFalsefalseUse gateway mode for the client operations
Force Eventual ConsistencyFalsetrueMakes the client use Eventual consistency for read operations instead of using the default account level consistency
Application nameFalseNoneApplication name
Preferred Regions ListFalseNonePreferred regions list to be used for a multi region CosmosDB account. This is a comma separated value (e.g., [East US, West US] or East US, West US) provided preferred regions will be used as hint. You should use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as preferred region. See list of azure regions here. Please note that you can also use spark.cosmos.preferredRegions as alias.
Disable Tcp Connection Endpoint RediscoveryFalsefalseCan be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.
Allow Invalid JSON With Duplicate JSON PropertiesFalsefalseBy default (when set to false) the CosmosDB Java SDK and Spark connector will raise a hard failure when JSON documents are read that contain JSON object with multiple properties of the same name. This config option can be used to override the behavior and silently ignore the invalid JSON and instead use the last occurrence of the property when parsing the JSON. NOTE: This is only meant to be used as a temporary workaround. We strongly recommend fixing the invalid JSON from even being ingested into the data and only use this workaround while cleaning up the documents with invalid JSON.
Max Item CountFalse1000Overrides the maximum number of documents that can be returned for a single query or change feed request. The default value is 1000. Consider increasing this only for average document sizes significantly smaller than 1KB or when projection reduces the number of properties selected in queries significantly (like when only selecting "id" of documents etc.).
Max Integrated Staleness in millisecondsFalseNoneSets the max staleness window in milliseconds for the point read or query request results in the integrated cache when using the dedicated gateway. Learn more about max integrated cache staleness here.
Inclusion ModeFalseAlwaysDetermines whether null/default values will be serialized to JSON or whether properties with null/default value will be skipped. The behavior follows the same ideas as Jackson's JsonInclude.Include. Always means JSON properties are created even for null and default values. NonNull means no JSON properties will be created for explicit null values. NonEmpty means JSON properties will not be created for empty string values or empty arrays/mpas. NonDefault means JSON properties will be skipped not just for null/empty but also when the value is identical to the default value 0 for numeric properties for example.
DateTime Conversion ModeFalseDefaultThe date/time conversion mode options are Default, AlwaysEpochMilliseconds, AlwaysEpochMillisecondsWithSystemDefaultTimezone. With Default, the standard Spark 3.* behavior is used: java.sql.Date/java.time.LocalDate are converted to EpochDay, java.sql.Timestamp/java.time.Instant are converted to MicrosecondsFromEpoch. With AlwaysEpochMilliseconds, the same behavior as the CosmosDB connector for Spark 2.4 is used: java.sql.Date, java.time.LocalDate, java.sql.Timestamp and java.time.Instant are converted to MillisecondsFromEpoch. The behavior for AlwaysEpochMillisecondsWithSystemDefaultTimezone is identical with AlwaysEpochMilliseconds except that the connector will assume System default time zone / Spark session time zone (specified via spark.sql.session.timezone) instead of UTC when the date/time to be parsed has no explicit time zone.
Schema Conversion ModeFalseRelaxedThe schema conversion behavior (Relaxed, Strict). When reading JSON documents, if a document contains an attribute that does not map to the schema type, the user can decide whether to use a null value (Relaxed) or an exception (Strict).
Partitioning StrategyFalseDefaultThe partitioning strategy used (Default, Custom, Restrictive or Aggressive)
Partitioning Targeted CountFalseNoneAn Integer value representing the targeted Partition Count. This parameter is optional and ignored unless strategy is Custom. In this case the Spark Connector won't dynamically calculate the number of partitions but stick with this value.
Partitioning Feed Range FilterFalseNoneCan be used to scope the query to a single logical CosmosDB partition (or a subset of logical partitions). If this parameter is optionally provided, the partitioning strategy will be modified - only partitions for the scoped logical partitions will be created. So, the main benefit of this config option is to reduce the necessary SparkTasks/Partitions.
PropertyRequiredDefault ValueDescription
Enable Infer SchemaTruetrueWhether all data in the container should be loaded or should data be returned using a custom query
Enable Custom Query for Inferring SchemaFalseNoneCheckbox, which if marked, will open up an Editor for writing Custom SQL Query
Make all Columns NullableFalsetrueWhen schema inference is enabled, whether the resulting schema will make all columns nullable. By default, all columns (except cosmos system properties) will be treated as nullable even if all rows within the sample set have non-null values. When disabled, the inferred columns are treated as nullable or not depending on whether any record in the sample set has null-values within a column.
Record Sample Size for Schema InferenceFalse1000Sampling size to use when inferring schema and not using a query.
Include all System PropertiesFalsefalseWhen schema inference is enabled, whether the resulting schema will include all CosmosDB system properties.
Include Document Timestamp fieldFalsefalseWhen schema inference is enabled, whether the resulting schema will include the document Timestamp (_ts). Not required if spark.cosmos.read.inferSchema.includeSystemProperties is enabled, as it will already include all system properties.
Infer Schema QueryFalseSELECT * FROM rWhen schema inference is enabled, used as custom query to infer it. For example, if you store multiple entities with different schemas within a container and you want to ensure inference only looks at certain document types or you want to project only particular columns.

Throughput Control Configs

Having throughput control helps to isolate the performance needs of applications running against a container, by limiting the amount of request units that can be consumed by a given Spark client. There are several advanced scenarios that benefit from client-side throughput control:

  • Different operations and tasks have different priorities - there can be a need to prevent normal transactions from being throttled due to data ingestion or copy activities. Some operations and/or tasks aren't sensitive to latency, and are more tolerant to being throttled than others.
  • Provide fairness/isolation to different end users/tenants - An application will usually have many end users. Some users may send too many requests, which consume all available throughput, causing others to get throttled.
  • Load balancing of throughput between different Azure Cosmos DB clients - in some use cases, it's important to make sure all the clients get a fair (equal) share of the throughput
PropertyRequiredDefault ValueDescription
Throughput Control EnabledFalsefalseWhether throughput control is enabled
Throughput Control: Account EndpointFalseNoneCosmosDB Account Endpoint Uri for throughput control. If not defined, then spark.cosmos.accountEndpoint will be used.
Throughput Control Account KeyFalseNoneCosmosDB Account Key for throughput control.
Throughput Control Preferred Regions ListFalseNonePreferred regions list to be used for a multi region CosmosDB account. This is a comma separated value (e.g., [East US, West US] or East US, West US) provided preferred regions will be used as hint. You should use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as preferred region. See list of azure regions here.
Disable TCP connection endpoint RediscoveryFalsefalseCan be used to disable TCP connection endpoint rediscovery. TCP connection endpoint rediscovery should only be disabled when using custom domain names with private endpoints when using a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.
Use Gateway ModeFalsefalseUse gateway mode for the client operations
Use Dedicated ContainerFalsetrueFlag to indicate when configured with throughput control, whether dedicated throughput control container will be provided.
Throughput control group nameFalseNoneThroughput control group name
Throughput control group target throughputFalseNoneThroughput control group target throughput in number of request units
Throughput control group target throughput thresholdFalseNoneThroughput control group target throughput threshold in request units
Database which will be used for throughput global controlFalseNoneDatabase which will be used for throughput global control
Container which will be used for throughput global controlFalseNoneContainer which will be used for throughput global control
Renew Interval in millisecondsFalse5sHow often the client is going to update the throughput usage of itself
Expire Interval in millisecondsFalse11sHow quickly an offline client will be detected

More information about Throughput control is available here.

Target Properties

These properties are specific to write operations.

Write Configurations

PropertyRequiredDefault ValueDescription
Write ModeTrueappendSpark Write Mode. The possible values are error, append, overwrite and ignore .More information on Write Modes is available here
Date FormatFalseyyyy-MM-ddFormat for Date columns. More information on the possible formats is given here
Timestamp FormatFalseyyyy-MM-dd'T'HH:mm:ss.SSSXXXFormat for Timestamp columns. More information on the possible formats is given here
Write StrategyTrueItemOverwriteCosmosDB Item Write Strategy: ItemOverwrite (using upsert); ItemOverwriteIfNotModified (if etag property of the row is empty/null it will just do an insert and ignore if the document already exists - same as ItemAppend, if an etag value exists the connector will attempt to replace the document with etag pre-condition. If the document changed - identified by precondition failure - the update is skipped and the document is not updated with the content of the data frame row), ItemAppend (using create, ignore pre-existing items i.e., Conflicts), ItemDelete (delete all documents), ItemDeleteIfNotModified (delete all documents for which the etag hasn't changed),ItemPatch (Partial update all documents based on the patch config)
Max Retry AttemptsFalse10CosmosDB Write Max Retry Attempts on retriable failures (e.g., connection error, write error etc.)
Max ConcurrencyFalseNoneCosmosDB Item Write Max concurrency. If not specified it will be determined based on the Spark executor VM Size
Max No. of Pending Bulk OperationsFalseNoneCosmosDB Item Write bulk mode maximum pending operations. Defines a limit of bulk operations being processed concurrently. If not specified it will be determined based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of 1000 x Cores
Enable Write BulkFalsetrueCosmosDB Item Write bulk enabled

Patch Specific Configurations

These Configurations are valid only when Write Strategy is given as ItemPatch

PropertyRequiredDefault ValueDescription
Default Patch Operation TypeTrueReplaceDefault CosmosDB patch operation type. Supported types include none, add, set, replace, remove, increment. Choose none for no-op, for others please reference here for full context.
Patch Column ConfigsFalseNoneCosmosDB patch column configs. It can contain multiple definitions that match the following patterns (separated by comma) => col(column).op(operationType) or col(column).path(patchInCosmosdb).op(operationType) - The difference of the second pattern is that it also allows you to define a different CosmosDB path.
Patch FilterFalseNoneUsed for Conditional patch