Skip to content

Commit 183f789

Browse files
authored
#186 Add MongoDB writer (#188)
* #186 Add the implementation of MongoDB writer. * #186 Add a unit tests for the MongoDB writer * #186 Add an integration tests for the MongoDB writer. * #186 Add foreachBatch() MongoDB writer workaround. * Fix SonarCloud warnings. * Make sure the checkpoint is set for the foreachBatch MongoDB writer. * Fix the checkpoint location in integration tests. * Add an integration test to ensure the checkpoint logic works as expected. * Fix PR suggestions.
1 parent e7b1980 commit 183f789

File tree

16 files changed

+831
-1
lines changed

16 files changed

+831
-1
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,25 @@ Any additional properties for the `DataStreamWriter` can be added with the prefi
293293
| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) |
294294
| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) |
295295

296+
##### MongoDbStreamWriter
297+
| Property Name | Required | Description |
298+
| :--- | :---: | :--- |
299+
| `writer.mongodb.uri` | Yes | Output MongoDB URI, e.g. `mongodb://host:port/database.collection`. |
300+
| `writer.parquet.database` | No | Database name (if not specified as the part of URI). |
301+
| `writer.parquet.collection` | No | Collection name (if not specified as the part of URI). |
302+
| `writer.common.trigger.type` | No | See [Combination writer properties](#common-writer-properties) |
303+
| `writer.common.trigger.processing.time` | No | See [Combination writer properties](#common-writer-properties) |
304+
305+
Any additional properties for the `DataStreamWriter` can be added with the prefix `writer.mongodb.options`, e.g. `writer.mongodb.options.key=value`
306+
307+
Common MongoDB additional options
308+
309+
| Property Name | Default | Description |
310+
| :--- | :--- | :--- |
311+
| `writer.mongodb.option.spark.mongodb.output.ordered` | `true` | When set to `false` inserts are done in parallel, increasing performance, but the order of documents is not preserved. |
312+
| `writer.mongodb.option.spark.mongodb.output.forceInsert` | `false`| Forces saves to use inserts, even if a Dataset contains `_id.` |
313+
More on these options: https://docs.mongodb.com/spark-connector/current/configuration
314+
296315
#### Common writer properties
297316

298317
| Property Name | Required |Description |

ingestor-default/pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,24 @@
5252
<groupId>org.apache.spark</groupId>
5353
<artifactId>spark-sql-kafka-${kafka.spark.version}_${scala.compat.version}</artifactId>
5454
</dependency>
55+
<!--MongoDb-->
56+
<dependency>
57+
<groupId>org.mongodb.spark</groupId>
58+
<artifactId>mongo-spark-connector_${scala.compat.version}</artifactId>
59+
</dependency>
60+
61+
<!--Embedded Mongo for testing-->
62+
<dependency>
63+
<groupId>de.flapdoodle.embed</groupId>
64+
<artifactId>de.flapdoodle.embed.mongo</artifactId>
65+
<scope>test</scope>
66+
</dependency>
67+
<!-- MongoDB driver for testing -->
68+
<dependency>
69+
<groupId>org.mongodb.scala</groupId>
70+
<artifactId>mongo-scala-driver_${scala.compat.version}</artifactId>
71+
<scope>test</scope>
72+
</dependency>
5573

5674
<!-- Tests -->
5775
<dependency>

ingestor-default/src/main/resources/META-INF/services/za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterFactoryProvider

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414
#
1515
za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriterLoader
1616
za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriterLoader
17+
za.co.absa.hyperdrive.ingestor.implementation.writer.mongodb.MongoDbStreamWriterLoader
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.writer.mongodb
17+
18+
import org.apache.commons.configuration2.Configuration
19+
import org.apache.logging.log4j.LogManager
20+
import org.apache.spark.sql.{DataFrame, SaveMode}
21+
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
22+
import za.co.absa.hyperdrive.ingestor.api.utils.{ConfigUtils, StreamWriterUtil}
23+
import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriter, StreamWriterFactory, StreamWriterProperties}
24+
25+
private[writer] class MongoDbStreamWriter(trigger: Trigger,
26+
checkpointLocation: String,
27+
uri: String,
28+
database: Option[String],
29+
collection: Option[String],
30+
val extraConfOptions: Map[String, String]) extends StreamWriter {
31+
private val logger = LogManager.getLogger
32+
if (!uri.toLowerCase.startsWith("mongodb://")) {
33+
throw new IllegalArgumentException(s"Invalid MongoDB URI: '$uri'. It should start with 'mongodb://'.")
34+
}
35+
36+
override def write(dataFrame: DataFrame): StreamingQuery = {
37+
val options = extraConfOptions ++
38+
database.map(db => ("spark.mongodb.output.database", db)) ++
39+
collection.map(c => ("spark.mongodb.output.collection", c))
40+
41+
logger.info(s"Writing to $uri")
42+
if (options.nonEmpty) {
43+
val optionsStr = options.map { case (k, v) => s"$k='$v'" }.mkString(", ")
44+
logger.info(s"Options: $optionsStr")
45+
}
46+
47+
dataFrame.writeStream
48+
.trigger(trigger)
49+
.outputMode(OutputMode.Append())
50+
.option(StreamWriterProperties.CheckpointLocation, checkpointLocation)
51+
.options(options)
52+
.foreachBatch((df, batchId) => {
53+
logger.info(s"Writing batchId: $batchId")
54+
df.write
55+
.mode(SaveMode.Append)
56+
.format("mongo")
57+
.option("spark.mongodb.output.uri", uri)
58+
.options(options)
59+
.save()
60+
})
61+
.start()
62+
}
63+
64+
def getDestination: String = uri
65+
}
66+
67+
object MongoDbStreamWriter extends StreamWriterFactory with MongoDbStreamWriterAttributes {
68+
69+
def apply(config: Configuration): StreamWriter = {
70+
val trigger = StreamWriterUtil.getTrigger(config)
71+
val checkpointLocation = StreamWriterUtil.getCheckpointLocation(config)
72+
73+
val uri = getUri(config)
74+
val database = ConfigUtils.getOrNone(KEY_DATABASE, config)
75+
val collection = ConfigUtils.getOrNone(KEY_COLLECTION, config)
76+
77+
val extraOptions = getExtraOptions(config)
78+
79+
val dbOptions = Seq(
80+
database.map(db => ("database", db)),
81+
collection.map(c => ("collection", c))
82+
).flatMap {
83+
case Some((k, v)) => Some(s"$k='$v'")
84+
case None => None
85+
}
86+
.mkString(", ", ", ", "")
87+
88+
LogManager.getLogger.info(s"Going to create MongoDbStreamWriter instance using: " +
89+
s"trigger='$trigger', checkpointLocation='$checkpointLocation', url='$uri'$dbOptions, extra options='$extraOptions'")
90+
91+
new MongoDbStreamWriter(trigger, checkpointLocation, uri, database, collection, extraOptions)
92+
}
93+
94+
override def getExtraConfigurationPrefix: Option[String] = Some(KEY_EXTRA_CONFS_ROOT)
95+
96+
private def getUri(configuration: Configuration): String = ConfigUtils.getOrThrow(KEY_URI, configuration, errorMessage = s"Output MongoDB URI is not specified. Is '$KEY_URI' defined?")
97+
98+
private def getExtraOptions(configuration: Configuration): Map[String, String] = ConfigUtils.getPropertySubset(configuration, KEY_EXTRA_CONFS_ROOT)
99+
}
100+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.writer.mongodb
17+
18+
import za.co.absa.hyperdrive.ingestor.api.writer.StreamWriterCommonAttributes
19+
import za.co.absa.hyperdrive.ingestor.api.{HasComponentAttributes, PropertyMetadata}
20+
21+
trait MongoDbStreamWriterAttributes extends HasComponentAttributes {
22+
private val rootFactoryConfKey = "writer.mongodb"
23+
val KEY_URI = s"$rootFactoryConfKey.uri"
24+
val KEY_DATABASE = s"$rootFactoryConfKey.database"
25+
val KEY_COLLECTION = s"$rootFactoryConfKey.collection"
26+
val KEY_EXTRA_CONFS_ROOT = s"$rootFactoryConfKey.options"
27+
28+
override def getName: String = "MongoDB Stream Writer"
29+
30+
override def getDescription: String = "This writer saves ingested data in MongoDB"
31+
32+
override def getProperties: Map[String, PropertyMetadata] = Map(
33+
KEY_URI -> PropertyMetadata("Output MongoDB URI (should start with 'mongodb://')", Some("Should start with 'mongodb://', e.g. 'mongodb://127.0.0.1/'"), required = true),
34+
KEY_DATABASE -> PropertyMetadata("Database name", Some("Can be omitted if specified in the URL"), required = false),
35+
KEY_COLLECTION -> PropertyMetadata("Collection name", Some("Can be omitted if specified in the URL"), required = false),
36+
StreamWriterCommonAttributes.keyTriggerProcessingTime -> StreamWriterCommonAttributes.triggerProcessingTimeMetadata,
37+
StreamWriterCommonAttributes.keyCheckpointBaseLocation -> StreamWriterCommonAttributes.checkpointBaseLocation
38+
)
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.writer.mongodb
17+
18+
import za.co.absa.hyperdrive.ingestor.api.writer.{StreamWriterFactory, StreamWriterFactoryProvider}
19+
20+
class MongoDbStreamWriterLoader extends StreamWriterFactoryProvider {
21+
override def getComponentFactory: StreamWriterFactory = MongoDbStreamWriter
22+
}

ingestor-default/src/test/scala/za/co/absa/hyperdrive/ingestor/implementation/TestServiceProviderConfiguration.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import za.co.absa.hyperdrive.ingestor.implementation.transformer.column.selectio
3030
import za.co.absa.hyperdrive.ingestor.implementation.transformer.dateversion.AddDateVersionTransformer
3131
import za.co.absa.hyperdrive.ingestor.implementation.transformer.enceladus.columns.AddEnceladusColumnsTransformer
3232
import za.co.absa.hyperdrive.ingestor.implementation.writer.kafka.KafkaStreamWriter
33+
import za.co.absa.hyperdrive.ingestor.implementation.writer.mongodb.MongoDbStreamWriter
3334
import za.co.absa.hyperdrive.ingestor.implementation.writer.parquet.ParquetStreamWriter
3435

3536
import scala.reflect.ClassTag
@@ -57,7 +58,7 @@ class TestServiceProviderConfiguration extends FlatSpec with Matchers {
5758

5859
it should "load StreamWriters" in {
5960
val factoryProviders = loadServices[StreamWriterFactoryProvider, StreamWriterFactory]()
60-
factoryProviders should contain theSameElementsAs Seq(ParquetStreamWriter, KafkaStreamWriter)
61+
factoryProviders should contain theSameElementsAs Seq(ParquetStreamWriter, KafkaStreamWriter, MongoDbStreamWriter)
6162
}
6263

6364
private def loadServices[P <: ComponentFactoryProvider[F], F <: ComponentFactory[_]]()(implicit classTag: ClassTag[P]): Iterable[F] = {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.testutils
17+
18+
import org.apache.spark.sql.{DataFrame, Row}
19+
import org.apache.spark.sql.catalyst.encoders.RowEncoder
20+
import org.apache.spark.sql.execution.streaming.MemoryStream
21+
import org.scalatest.Suite
22+
import za.co.absa.commons.spark.SparkTestBase
23+
24+
trait MemoryStreamFixture {
25+
26+
this: Suite with SparkTestBase =>
27+
28+
def withStreamingData(inputDf: DataFrame)(f: DataFrame => Unit): Unit = {
29+
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(inputDf.schema))
30+
val streamingDf: DataFrame = memoryStream.toDF()
31+
32+
inputDf.collect().foreach(e => {
33+
memoryStream.addData(e)
34+
})
35+
36+
f(streamingDf)
37+
}
38+
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.testutils.mongodb
17+
18+
import de.flapdoodle.embed.mongo.config.{MongodConfigBuilder, Net, RuntimeConfigBuilder}
19+
import de.flapdoodle.embed.mongo.distribution.Version
20+
import de.flapdoodle.embed.mongo.{Command, MongodExecutable, MongodStarter}
21+
import de.flapdoodle.embed.process.config.io.ProcessOutput
22+
import de.flapdoodle.embed.process.runtime.Network
23+
import org.apache.logging.log4j.LogManager
24+
25+
object EmbeddedMongoDbSingleton {
26+
private val log = LogManager.getLogger
27+
28+
lazy val embeddedMongoDb: (MongodExecutable, Int) = startEmbeddedMongoDb()
29+
30+
/**
31+
* Create and run a MongoDb instance.
32+
*
33+
* How to configure embedded MongoDB: https://github.com/flapdoodle-oss/de.flapdoodle.embed.mongo
34+
*
35+
* @return A pair: a MongoDb executable object to be used to stop it and the port number the embedded MongoDB listens to.
36+
*/
37+
private def startEmbeddedMongoDb(): (MongodExecutable, Int) = {
38+
val mongoPort: Int = Network.getFreeServerPort()
39+
40+
// Do not print Embedded MongoDB logs
41+
val runtimeConfig = new RuntimeConfigBuilder()
42+
.defaults(Command.MongoD)
43+
.processOutput(ProcessOutput.getDefaultInstanceSilent)
44+
.build()
45+
46+
val starter = MongodStarter.getInstance(runtimeConfig)
47+
48+
val mongodConfig = new MongodConfigBuilder()
49+
.version(Version.Main.V4_0)
50+
.net(new Net("localhost", mongoPort, Network.localhostIsIPv6()))
51+
.build()
52+
53+
val executable = starter.prepare(mongodConfig)
54+
executable.start()
55+
(executable, mongoPort)
56+
}
57+
58+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2018 ABSA Group Limited
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
package za.co.absa.hyperdrive.ingestor.implementation.testutils.mongodb
17+
18+
import org.mongodb.scala.{MongoClient, MongoDatabase}
19+
20+
import scala.util.Try
21+
22+
trait MongoDbConnection extends AutoCloseable {
23+
def getDatabase: MongoDatabase
24+
25+
def getConnectionString: String
26+
}
27+
28+
object MongoDbConnection {
29+
def getConnection(connectionString: String, db: String): MongoDbConnection = {
30+
val mongoClient = MongoClient(connectionString)
31+
32+
getConnection(mongoClient, connectionString, db)
33+
}
34+
35+
def getConnection(mongoClient: MongoClient, connectionString: String, db: String): MongoDbConnection = {
36+
new MongoDbConnection {
37+
override def getDatabase: MongoDatabase = mongoClient.getDatabase(db)
38+
39+
override def getConnectionString: String = connectionString
40+
41+
override def close(): Unit = Try(mongoClient.close()) // Ignore all exceptions
42+
}
43+
}
44+
45+
}

0 commit comments

Comments
 (0)