Skip to main content

MongoDB

Built on

Built on MongoDB Spark Connector v10.0.
Add mongodb-spark-connector jar as dependency for more.

Allows read and write operations on MongoDB.

Source

Source Parameters

Official documentation

ParameterDescriptionRequired
UsernameUsername for MongoDB instanceTrue
PasswordPassword for MongoDB instanceTrue
DriverDriver string for mongodb connection, e.g. mongodb or mongodb+srvTrue
Cluster IP Address and OptionsCluster IP and options(if required) for the MongoDB connection,
e.g. cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority
True
DatabaseDatabase from which we want to read the data.True
CollectionCollection from which we want to read the data.True
mongoClientFactoryMongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.
Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
False
partitionerThe partitioner full class name. You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner
False
partitioner.options.partition.fieldThe field to use for partitioning, which must be a unique field.
Default: _id
False
partitioner.options.partition.sizeThe size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents.
Default: 64
False
partitioner.options.samples.per.partitionThe number of samples to take per partition. The total number of samples taken is: samples per partiion * ( count / number of documents per partition)
Default: 10
False
sampleSizeThe number of documents to sample from the collection when inferring the schema.
Default: 1000
False
sql.inferSchema.mapTypes.enabledWhether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a MapType instead.
Default: true
False
sql.inferSchema.mapTypes.minimum.key.sizeMinimum size of a StructType before inferring as a MapType.
Default: 250
False
aggregation.pipelineSpecifies a custom aggregation pipeline to apply to the collection before sending data to Spark. The value must be either an extended JSON single document or list of documents.
A single document should resemble the following:
{"$match": {"closed": false}}
A list of documents should resemble the following:
[{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]
False
aggregation.allowDiskUseSpecifies whether to allow storage to disk when running the aggregation.
Default: true
False

Example

Below is an example of configuring MongoDB Source using Prophecy IDE. We will be reading Airbnb public listingReviews dataset using in-built MongoDB Source Gem.
After configuration you can view schema by clicking Infer Schema in properties tab and also view data by clicking Load inside Preview tab.

Generated Code

object input_mongodb {

def apply(context: Context): DataFrame = {
context.spark.read
.format("mongodb")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test_input")
.option("collection", "listAndReviews")
.load()
}
}

Target

Official documentation

Target Parameters

ParameterDescriptionRequired
UsernameUsername for MongoDB instanceTrue
PasswordPassword for MongoDB instanceTrue
DriverDriver string for mongodb connection, e.g. mongodb or mongodb+srvTrue
Cluster IP Address and OptionsCluster IP and options(if required) for the MongoDB connection,
e.g. cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority
True
DatabaseDatabase to which we want to write the data.True
CollectionCollection to which we want to write the data.True
mongoClientFactoryMongoClientFactory configuration key.
You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.
Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory
False
maxBatchSizeSpecifies the maximum number of operations to batch in bulk operations.
Default: 512
False
orderedSpecifies whether to perform ordered bulk operations.
Default: true
False
operationTypeSpecifies the type of write operation to perform. You can set this to one of the following values: insert, replace or update
Default: replace
False
idFieldListField or list of fields by which to split the collection data. To specify more than one field, separate them using a comma as shown in the following example:"fieldName1,fieldName2"
Default: _id
False
writeConcern.wSpecifies w, a write concern option to acknowledge the level to which the change propagated in the MongoDB replica set. You can specify one of the following values: MAJORITY, W1, W2, W3, ACKNOWLEDGED or UNACKNOWLEDGED
Default: _ACKNOWLEDGED
False
writeConcern.journalSpecifies j, a write concern option to enable request for acknowledgment that the data is confirmed on on-disk journal for the criteria specified in the w option.
You can specify either true or false.
False
writeConcern.wTimeoutMSSpecifies wTimeoutMS, a write concern option to return an error when a write operation exceeds the number of milliseconds. If you use this optional setting, you must specify a non-negative integer.False

Supported Write Modes

Write ModeDescription
overwriteIf data already exists, overwrite the contents of the Collection with data.
appendIf data already exists, append the data on to the contents of the Collection.

Example

Below is an example of configuring MongoDB Target using Prophecy IDE. We will be writing back Airbnb public listingReviews data into a collection in MongoDB using our in-built Target Gem.

Generated Code

object output_mongodb {
def apply(context: Context, df: DataFrame): Unit = {
df.write
.format("mongodb")
.mode("overwrite")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test")
.option("collection", "test_output")
.option("ordered", "true")
.option("operationType", "replace")
.save()
}
}