MongoDB
Built on
Built on MongoDB Spark Connector v10.0.
Add mongodb-spark-connector
jar as dependency for more.
Allows read and write operations on MongoDB
.
Source
Source Parameters
Parameter | Description | Required |
---|---|---|
Username | Username for MongoDB instance | True |
Password | Password for MongoDB instance | True |
Driver | Driver string for mongodb connection, e.g. mongodb or mongodb+srv | True |
Cluster IP Address and Options | Cluster IP and options(if required) for the MongoDB connection, e.g. cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority | True |
Database | Database from which we want to read the data. | True |
Collection | Collection from which we want to read the data. | True |
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface. Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | False |
partitioner | The partitioner full class name. You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface. Default: com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner | False |
partitioner.options.partition.field | The field to use for partitioning, which must be a unique field. Default: _id | False |
partitioner.options.partition.size | The size (in MB) for each partition. Smaller partition sizes create more partitions containing fewer documents. Default: 64 | False |
partitioner.options.samples.per.partition | The number of samples to take per partition. The total number of samples taken is: samples per partiion * ( count / number of documents per partition) Default: 10 | False |
sampleSize | The number of documents to sample from the collection when inferring the schema. Default: 1000 | False |
sql.inferSchema.mapTypes.enabled | Whether to enable Map types when inferring the schema. When enabled, large compatible struct types are inferred to a MapType instead. Default: true | False |
sql.inferSchema.mapTypes.minimum.key.size | Minimum size of a StructType before inferring as a MapType. Default: 250 | False |
aggregation.pipeline | Specifies a custom aggregation pipeline to apply to the collection before sending data to Spark. The value must be either an extended JSON single document or list of documents. A single document should resemble the following: {"$match": {"closed": false}} A list of documents should resemble the following: [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] | False |
aggregation.allowDiskUse | Specifies whether to allow storage to disk when running the aggregation. Default: true | False |
Example
Below is an example of configuring MongoDB Source using Prophecy IDE.
We will be reading Airbnb public listingReviews
dataset using in-built MongoDB
Source Gem.
After configuration you can view schema by clicking Infer Schema
in properties tab and also view data by clicking Load
inside Preview tab.
Generated Code
- Scala
object input_mongodb {
def apply(context: Context): DataFrame = {
context.spark.read
.format("mongodb")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test_input")
.option("collection", "listAndReviews")
.load()
}
}
Target
Target Parameters
Parameter | Description | Required |
---|---|---|
Username | Username for MongoDB instance | True |
Password | Password for MongoDB instance | True |
Driver | Driver string for mongodb connection, e.g. mongodb or mongodb+srv | True |
Cluster IP Address and Options | Cluster IP and options(if required) for the MongoDB connection, e.g. cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority | True |
Database | Database to which we want to write the data. | True |
Collection | Collection to which we want to write the data. | True |
mongoClientFactory | MongoClientFactory configuration key. You can specify a custom implementation which must implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface. Default: com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory | False |
maxBatchSize | Specifies the maximum number of operations to batch in bulk operations. Default: 512 | False |
ordered | Specifies whether to perform ordered bulk operations. Default: true | False |
operationType | Specifies the type of write operation to perform. You can set this to one of the following values: insert , replace or update Default: replace | False |
idFieldList | Field or list of fields by which to split the collection data. To specify more than one field, separate them using a comma as shown in the following example:"fieldName1,fieldName2" Default: _id | False |
writeConcern.w | Specifies w, a write concern option to acknowledge the level to which the change propagated in the MongoDB replica set. You can specify one of the following values: MAJORITY , W1 , W2 , W3 , ACKNOWLEDGED or UNACKNOWLEDGED Default: _ACKNOWLEDGED | False |
writeConcern.journal | Specifies j, a write concern option to enable request for acknowledgment that the data is confirmed on on-disk journal for the criteria specified in the w option. You can specify either true or false . | False |
writeConcern.wTimeoutMS | Specifies wTimeoutMS, a write concern option to return an error when a write operation exceeds the number of milliseconds. If you use this optional setting, you must specify a non-negative integer. | False |
Supported Write Modes
Write Mode | Description |
---|---|
overwrite | If data already exists, overwrite the contents of the Collection with data. |
append | If data already exists, append the data on to the contents of the Collection. |
Example
Below is an example of configuring MongoDB Target using Prophecy IDE.
We will be writing back Airbnb public listingReviews
data into a collection in MongoDB
using our in-built Target Gem.
Generated Code
- Scala
object output_mongodb {
def apply(context: Context, df: DataFrame): Unit = {
df.write
.format("mongodb")
.mode("overwrite")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test")
.option("collection", "test_output")
.option("ordered", "true")
.option("operationType", "replace")
.save()
}
}