Skip to main content

CosmosDB

The azure-cosmos-spark connector is an integration between Azure CosmosDB and Apache Spark, which allows you to read and write data from and to CosmosDB using Spark.

Installation

For Spark 3.3, connect a dependency that has the following Maven coordinates on your Databricks or OnPrem Execution Cluster:

com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12:4.18.1

For other versions of Spark, see Azure Cosmos DB OLTP Spark 3 connector.

Usage

Parameters

ParameterTabDescriptionDefault
Authentication TypeLocationHow you want to authenticate.
Possible values are: Master Key(PrimaryReadWriteKeys, SecondReadWriteKeys, PrimaryReadOnlyKeys, SecondReadWriteKeys), Service Principle Based
To see all the additional parameters for each authentication type, see Master key authentication and Service principle based authentication.
Master Key
Data SourceLocationWhether the gem should load all data in the container, or return the data using a custom query.DB Table
Azure EnvironmentLocationAzure environment of the CosmosDB account.
Possible values are: Azure, AzureChina, AzureUsGovernment, or AzureGermany.
Azure
Custom QueryLocationCustom query to process against the CosmosDB endpoint instead of dynamically generating the query through the predicate push down.
Usually it is recommended to rely on Spark's predicate push down because that generates the most efficient set of filters based on the query plan. But in Spark 3.1, there were a couple of predicates such as aggregates that cannot be pushed down yet, so the custom query is a fallback that allows you to push them into the query sent to CosmosDB. If specified, with schema inference enabled, the custom query will also be used to infer the schema.
SELECT 1
SchemaPropertiesSchema to apply on the loaded data.
In the Source gem, you can define or edit the schema visually or in JSON code.
In the Target gem, you can view the schema visually or as JSON code.

Master key authentication

ParameterDescription
Account EndpointAccount URI of your CosmosDB account.
Account KeyAccount key of your CosmosDB account.
DatabaseDatabase name of your CosmosDB account.

Service principle based authentication

ParameterDescription
Subscription IdSubscription Id of your CosmosDB account.
Tenant IdTenant Id of your CosmosDB account.
Resource Group NameResource group of your CosmosDB account.
Client IdClient Id, or Application Id of your service principle.
Client SecretClient secret, or password of your service principle.

Source

The Source gem reads data from CosmosDB and allows you to optionally specify the following additional properties.

Source properties

PropertiesDescriptionDefault
DescriptionDescription of your dataset.None
Enable Infer SchemaWhether the Source gem should load all data in the container. If not, the Source gem returns the data using a custom query.true
Application nameName of your application.None
Inclusion ModeDetermines whether to serialize or skip null and default values to JSON.
Possible values are:
- Always, which means the Source gem should create JSON properties for null and default values
- NonNull, which means the Source gem does not create JSON properties for null values
- NonEmpty, which means the Source gem does not create JSON properties for empty values
Always
DateTime Conversion ModeHow to convert your DateTime values.
Possible values are:
- Default, which means to convert java.sql.Date/java.time.LocalDate to EpochDay, and java.sql.Timestamp/java.time.Instant to MicrosecondsFromEpoch.
- AlwaysEpochMilliseconds, which means to convert java.sql.Date, java.time.LocalDate, and java.sql.Timestamp and java.time.Instant to MillisecondsFromEpoch.
- AlwaysEpochMillisecondsWithSystemDefaultTimezone, which means to convert like AlwaysEpochMilliseconds, but to also assume the System default timezone.
Default
Schema Conversion ModeHow your document contains an attribute that does not map to the schema type.
Possible values are:
- Relaxed, which uses a null value
- Strict, which throws an exception
Relaxed
Partitioning StrategyPartitioning strategy to use.
Possible values are: Default, Custom, Restrictive or Aggressive
Default
Preferred Regions ListList of regions you prefer.None
Disable Tcp Connection Endpoint RediscoveryWhether to disable TCP connection endpoint rediscovery.
You should disable TCP connection endpoint rediscovery when using custom domain names with private endpoints in a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.
false
Allow Invalid JSON With Duplicate JSON PropertiesWhether the CosmosDB Java SDK and Spark connector fails when the Source gem reads JSON documents that contain JSON objects with multiple properties of the same name.false
Partitioning Targeted CountIdeal number of partitions to use. If you have a custom strategy, the Spark Connector uses this value.None
Partitioning Feed Range FilterScopes the query to a single logical CosmosDB partition, or a subset of logical partitions.
The Source gem modifies the partitioning strategy to only create partitions for the scoped logical partitions. This property reduces the necessary Spark tasks and partitions.
None
Use Gateway ModeWhether to use gateway mode for the client operations.false
Force Eventual ConsistencyWhether to make the client use eventual consistency for read operations instead of using the default account level consistency.true
Max Item CountMaximum number of documents to return for a single query or change feed request.
Consider increasing this value only for average document sizes significantly smaller than 1KB, or when projection reduces the number of properties selected in queries significantly, such as only selecting id of documents.
1000
Max Integrated Staleness in MSMaximum staleness window in milliseconds for the point read or query request results in the integrated cache when using the dedicated gateway. To learn more, see MaxIntegratedCacheStaleness.None

Throughput control configs

Having throughput controls help isolate the performance needs of applications running against a container by limiting the amount of request units a Spark client consumes.

Several advanced scenarios benefit from client-side throughput control:

  • Different operations and tasks have different priorities

    You can prevent normal transactions from throttling due to data ingestion or copy activities. Some operations or tasks aren't sensitive to latency and are more tolerant to being throttled than others.

  • Provide fairness and isolation to different end users

    Applications usually have many end users, where some may send too many requests. This consumes all available throughput and causes others to throttle.

  • Load balancing of throughput between different Azure Cosmos DB clients

    Assure all the clients get an equal share of the throughput.

PropertyDescriptionDefault
Enable Throughput Control OptionsWhether to enable throughput control.false
Throughput Control Account EndpointCosmosDB Account Endpoint URI for throughput control.
If not defined, the Source gem uses the spark.cosmos.accountEndpoint property.
None
Throughput Control Account KeyCosmosDB Account Key for throughput control.None
Throughput Control Preferred Regions ListComma-separated list of regions you prefer to use for a multi-region CosmosDB account.
Use a collocated Spark cluster with your CosmosDB account and pass the Spark cluster region as a preferred region. To see a list of Azure regions, see LocationNames Class.
None
Disable TCP connection endpoint RediscoveryWhether to disable TCP connection endpoint rediscovery.
You should disable TCP connection endpoint rediscovery when using custom domain names with private endpoints in a custom Spark environment. When using Azure Databricks or Azure Synapse as Spark runtime it should never be required to disable endpoint rediscovery.
false
Use Gateway ModeWhether to use gateway mode for the client operations.false
Use Dedicated ContainerWhether to provide a dedicated throughput control container.true
Throughput control group nameName of your throughput control group.None
Renew Interval in MSHow often the client is going to update the throughput usage of itself.5s
Expire Interval in MSHow quickly to detect an offline client.11s
Throughput control group target throughputNumber of request units in your throughput control group target throughput.None
Throughput control group target throughput thresholdThreshold in request units of your throughput control group target throughput.None
Database which will be used for throughput global controlDatabase to use for throughput global control.None
Container which will be used for throughput global controlContainer to use for throughput global control.None

To learn more about throughput control, see Azure Cosmos DB Spark connector: Throughput control.

Target

The Target gem writes data to CosmosDB and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.append
Write StrategyHow to write your data. For a list of the possible values, see Supported write strategies.ItemOverwrite
Enable Write BulkWhether to write to CosmosDB in bulk.true
Date FormatString that indicates a date format.yyyy-MM-dd
Timestamp FormatString that indicates a timestamp format.yyyy-MM-dd'T'HH:mm:ss.SSSXXX
Max Retry AttemptsNumber of times CosmosDB writes retry attempts on retriable failures, such as a connection or write error.10
Max ConcurrencyMaximum number of jobs to run at any time.
If you don't specify a value, the Target gem determines a value based on the Spark executor VM Size.
None
Max No. of Pending Bulk OperationsLimit of bulk operations being processed concurrently.
If you don't specify a value, the Target gem determines a value based on the Spark executor VM Size. If the volume of data is large for the provisioned throughput on the destination container, this setting can be adjusted by following the estimation of 1000 x Cores.
None

Supported write modes

Write modeDescription
errorIf the data already exists, throw an exception.
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
appendIf the data already exists, append the contents of the DataFrame.
ignoreIf the data already exists, do nothing with the contents of the DataFrame.
This is similar to the CREATE TABLE IF NOT EXISTS clause in SQL.

Supported write strategies

Write strategyDescription
ItemOverwritePerforms an upsert.
ItemOverwriteIfNotModifiedIf an ETag property of the row is empty, or null, insert the document, or ignore the document if it already exists.
ItemAppendSame as ItemOverwriteIfNotModified. Additionally, if an Etag value exists, the connector attempts to replace the document with ETag pre-condition. If the document changes, the Target gem skips the update and does not update the document with the content of the data frame row.
ItemDeleteDeletes all documents.
ItemDeleteIfNotModifiedDeletes all documents where the ETag hasn't changed.
ItemPatchPartially updates all documents based on the patch configuration. Then, provide values for the additional properties.

ItemPatch properties

These configurations are valid only when you select ItemPatch as your Write Strategy.

PropertyDescriptionDefault Value
Default Patch Operation TypeDefault CosmosDB patch operation type.
Possible values are: None, Add, Set, Replace, Remove, or Increment.
Choose none for no operation, and for the rest, see Supported operations to determine which one to select.
Replace
Patch Column ConfigsCosmosDB patch column configurations.
This can contain multiple definitions that match the separated by comma pattern:
- col(column).op(operationType)
- col(column).path(patchInCosmosdb).op(operationType), which also allows you to define a different CosmosDB path.
None
Patch FilterConditional patch to use.None