MongoDB
Built on MongoDB Spark Connector v10.0.
To add mongodb-spark-connector
jar as a dependency, see Spark dependencies.
You can read from and write to MongoDB.
Parameters
Parameter | Tab | Description |
---|---|---|
Username | Location | Username of your MongoDB instance. |
Password | Location | Password of your MongoDB instance. |
Driver | Location | Driver string for your mongodb connection. Possible values are: mongodb , or mongodb+srv |
Cluster IP Address and Options | Location | Cluster IP and options for your MongoDB connection. For example: cluster0.prophecy.mongodb.xyz/?retryWrites=true&w=majority |
Database | Location | Database where you want to read from and write to. |
Collection | Location | Collection where you want to read from and write to. |
Source
The Source gem reads data from MongoDB and allows you to optionally specify the following additional properties.
Source properties
Properties | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Mongo client factory | MongoClientFactory configuration key. To specify a custom implementation, implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface. | com.mongodb.spark.sql .connector.connection. DefaultMongoClientFactory |
partitioner class name | Partitioner full class name. To specify a custom implementation, implement the com.mongodb.spark.sql.connector.read.partitioner.Partitioner interface. | com.mongodb.spark.sql. .connector.read.partitioner. SamplePartitioner |
Partition field | Unique field to use for partitioning. | _id |
Partition size | Size in MB for each partition. Smaller partition sizes create more partitions that contain fewer documents. | 64 |
Number of samples per partition | Number of samples to take per partition. The total number of samples taken is: samples per partition * ( count / number of documents per partition) | 10 |
Minimum no. of Docs for Schema inference | Number of documents to sample from the collection when inferring the schema. | 1000 |
Enable Map types when inferring schema | Whether to enable Map types when inferring the schema. If you enable this, the Source gem infers large compatible struct types to a MapType instead. | true |
Minimum no. of a StructType for MapType inference | Minimum size of a StructType before inferring it as a MapType . | 250 |
Pipeline aggregation | Custom aggregation pipeline to apply to the collection before sending the data to Spark. The value must be an extended JSON single document or list of documents. A single document should resemble the following: {"$match": {"closed": false}} A list of documents should resemble the following: [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] | {"$match": {"closed": false}} |
Enable AllowDiskUse aggregation | Whether to enable AllowDiskUse aggregation. | false |
Example
The following example configures a Source gem to read from the sample_airbnb.listingsAndReviews
collection in MongoDB.
After you configure the Source gem, view schema by clicking Infer Schema
in the Properties tab and view data by clicking Load
inside the Preview tab.
Compiled code
To see the compiled code of your project, switch to the Code view in the project header.
- Scala
object input_mongodb {
def apply(context: Context): DataFrame = {
context.spark.read
.format("mongodb")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test_input")
.option("collection", "listAndReviews")
.load()
}
}
Target
The Target gem writes data to MongoDB and allows you to optionally specify the following additional properties.
Target properties
Property | Description | Default |
---|---|---|
Description | Description of your dataset. | None |
Write Mode | How to handle existing data. For a list of the possible values, see Supported write modes. | overwrite |
Mongo client factory | MongoClientFactory configuration key. To specify a custom implementation, implement the com.mongodb.spark.sql.connector.connection.MongoClientFactory interface. | com.mongodb.spark.sql. connector.connection. DefaultMongoClientFactory |
Maximum batch size | Maximum number of operations to batch in bulk operations. | 512 |
ordered | Whether to perform ordered bulk operations. | true |
operationType | Type of write operation to perform. Possible values are: insert , replace or update | replace |
List of id fields | Field or list of fields to organize the data. To specify more than one field, separate them using a comma. For example: "fieldName1,fieldName2" | _id |
writeConcern.w | Write concern option to acknowledge the level the change propagated in the MongoDB replica set. Possible values are: MAJORITY , W1 , W2 , W3 , ACKNOWLEDGED or UNACKNOWLEDGED | ACKNOWLEDGED |
Enable Write journal | Whether to enable request for acknowledgment that the data is confirmed on the on-disk journal for the criteria specified in the w option. | false |
Write timeout in MSec | Non-negative number of milliseconds to wait before returning an error when a write operation. | 0 |
Supported write modes
Write mode | Description |
---|---|
overwrite | If the data already exists, overwrite the data with the contents of the DataFrame . |
append | If the data already exists, append the contents of the DataFrame . |
Example
The following example configures a Target gem to write data to the sample_airbnb.listingsAndReviews
collection in MongoDB.
Compiled code
To see the compiled code of your project, switch to the Code view in the project header.
- Scala
object output_mongodb {
def apply(context: Context, df: DataFrame): Unit = {
df.write
.format("mongodb")
.mode("overwrite")
.option(
"connection.uri",
f"${"mongodb+srv"}://${"ashish_mongotrial"}:${"password"}@${"cluster0.zveltwx.mongodb.net/?retryWrites=true&w=majority"}".trim
)
.option("database", "test")
.option("collection", "test_output")
.option("ordered", "true")
.option("operationType", "replace")
.save()
}
}