Skip to main content

MongoDB

Built on

Built on MongoDB Spark Connector v10.0.
To add mongodb-spark-connector jar as a dependency, see Spark dependencies.

You can read from and write to MongoDB.

Parameters

ParameterTabDescription
UsernameLocationUsername of your MongoDB instance.
PasswordLocationPassword of your MongoDB instance.
DriverLocationDriver string for your mongodb connection.
Possible values are: mongodb, or mongodb+srv
Cluster IP Address and OptionsLocationCluster IP and options for your MongoDB connection.
For example: cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority
DatabaseLocationDatabase where you want to read from and write to.
CollectionLocationCollection where you want to read from and write to.

Source

The Source gem reads data from MongoDB and allows you to optionally specify the following additional properties.

Source properties

PropertiesDescriptionDefault
DescriptionDescription of your dataset.None
Mongo client factoryMongoClientFactory configuration key.
To specify a custom implementation, implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.
com.mongodb.spark.sql
.connector.connection.
DefaultMongoClientFactory
partitioner class namePartitioner full class name.
To specify a custom implementation, implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface.
com.mongodb.spark.sql.
.connector.read.partitioner.
SamplePartitioner
Partition fieldUnique field to use for partitioning._id
Partition sizeSize in MB for each partition.
Smaller partition sizes create more partitions that contain fewer documents.
64
Number of samples per partitionNumber of samples to take per partition.
The total number of samples taken is:
samples per partition * ( count / number of documents per partition)
10
Minimum no. of Docs for Schema inferenceNumber of documents to sample from the collection when inferring the schema.1000
Enable Map types when inferring schemaWhether to enable Map types when inferring the schema.
If you enable this, the Source gem infers large compatible struct types to a MapType instead.
true
Minimum no. of a StructType for MapType inferenceMinimum size of a StructType before inferring it as a MapType.250
Pipeline aggregationCustom aggregation pipeline to apply to the collection before sending the data to Spark.
The value must be an extended JSON single document or list of documents.
A single document should resemble the following: {"$match": {"closed": false}}
A list of documents should resemble the following: [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}]
{"$match": {"closed": false}}
Enable AllowDiskUse aggregationWhether to enable AllowDiskUse aggregation.false

Example

The following example configures a Source gem to read from the sample_airbnb.listingsAndReviews collection in MongoDB. After you configure the Source gem, view schema by clicking Infer Schema in the Properties tab and view data by clicking Load inside the Preview tab.

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

object input_mongodb {

def apply(context: Context): DataFrame = {
context.spark.read
.format("mongodb")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test_input")
.option("collection", "listAndReviews")
.load()
}

}

Target

The Target gem writes data to MongoDB and allows you to optionally specify the following additional properties.

Target properties

PropertyDescriptionDefault
DescriptionDescription of your dataset.None
Write ModeHow to handle existing data. For a list of the possible values, see Supported write modes.overwrite
Mongo client factoryMongoClientFactory configuration key.
To specify a custom implementation, implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface.
com.mongodb.spark.sql.
connector.connection.
DefaultMongoClientFactory
Maximum batch sizeMaximum number of operations to batch in bulk operations.512
orderedWhether to perform ordered bulk operations.true
operationTypeType of write operation to perform.
Possible values are: insert, replace or update
replace
List of id fieldsField or list of fields to organize the data.
To specify more than one field, separate them using a comma.
For example: "fieldName1,fieldName2"
_id
writeConcern.wWrite concern option to acknowledge the level the change propagated in the MongoDB replica set.
Possible values are: MAJORITY, W1, W2, W3, ACKNOWLEDGED or UNACKNOWLEDGED
ACKNOWLEDGED
Enable Write journalWhether to enable request for acknowledgment that the data is confirmed on the on-disk journal for the criteria specified in the w option.false
Write timeout in MSecNon-negative number of milliseconds to wait before returning an error when a write operation.0

Supported write modes

Write modeDescription
overwriteIf the data already exists, overwrite the data with the contents of the DataFrame.
appendIf the data already exists, append the contents of the DataFrame.

Example

The following example configures a Target gem to write data to the sample_airbnb.listingsAndReviews collection in MongoDB.

Compiled code

tip

To see the compiled code of your project, switch to the Code view in the project header.

object output_mongodb {
def apply(context: Context, df: DataFrame): Unit = {
df.write
.format("mongodb")
.mode("overwrite")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test")
.option("collection", "test_output")
.option("ordered", "true")
.option("operationType", "replace")
.save()
}
}