Skip to content

Commit c643df9

Browse files
committed
MongoSync added
1 parent d5f0c71 commit c643df9

File tree

17 files changed

+305
-30
lines changed

17 files changed

+305
-30
lines changed

src/main/reference.conf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
2+
com.sfxcode.nosql.mongo.sync {
3+
maxWait = 600
4+
syncColumnLastSync = "_lastSync"
5+
syncColumnLastUpdate = "_lastUpdate"
6+
}

src/main/scala/com/sfxcode/nosql/mongo/Field.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.sfxcode.nosql.mongo
22

33
import com.sfxcode.nosql.mongo.bson.BsonConverter
4+
import com.sfxcode.nosql.mongo.database.DatabaseProvider._
45
import org.mongodb.scala.bson.BsonValue
56
import org.mongodb.scala.model.Accumulators._
67
import org.mongodb.scala.model.BsonField
@@ -51,7 +52,7 @@ trait Field {
5152
"$" + name
5253
}
5354
}.toList
54-
BsonConverter.toBson(Map("_id" -> list))
55+
BsonConverter.toBson(Map(ObjectIdKey -> list))
5556
}
5657

5758
}

src/main/scala/com/sfxcode/nosql/mongo/database/DatabaseProvider.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ class DatabaseProvider(val config: MongoConfig, val registry: CodecRegistry) ext
129129
}
130130

131131
object DatabaseProvider {
132+
val ObjectIdKey = "_id"
132133
val CollectionSeparator = ":"
133134

134135
private val CustomRegistry = fromProviders(CustomCodecProvider())

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Metadata.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ abstract class Metadata(provider: DatabaseProvider, bucketName: String) extends
1919

2020
def updateMetadata(oid: ObjectId, value: Any): Observable[UpdateResult] = {
2121
val doc: BsonValue = BsonConverter.toBson(value)
22-
val result = Files.updateOne(equal("_id", oid), set("metadata", doc))
22+
val result = Files.updateOne(equal(DatabaseProvider.ObjectIdKey, oid), set("metadata", doc))
2323
result
2424
}
2525

@@ -36,10 +36,10 @@ abstract class Metadata(provider: DatabaseProvider, bucketName: String) extends
3636
}
3737

3838
def updateMetadataElements(oid: ObjectId, elements: Map[String, Any]): Observable[UpdateResult] =
39-
updateMetadataElements(equal("_id", oid), elements)
39+
updateMetadataElements(equal(DatabaseProvider.ObjectIdKey, oid), elements)
4040

4141
def updateMetadataElement(oid: ObjectId, key: String, value: Any): Observable[UpdateResult] =
42-
updateMetadataElements(equal("_id", oid), Map(key -> value))
42+
updateMetadataElements(equal(DatabaseProvider.ObjectIdKey, oid), Map(key -> value))
4343

4444
def updateMetadataElement(filter: Bson, key: String, value: Any): Observable[UpdateResult] =
4545
updateMetadataElements(filter, Map(key -> value))

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Search.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3+
import com.sfxcode.nosql.mongo.database.DatabaseProvider
34
import org.mongodb.scala.Document
45
import org.mongodb.scala.bson.ObjectId
56
import org.mongodb.scala.bson.conversions.Bson
@@ -16,7 +17,7 @@ abstract class Search extends Base {
1617
gridfsBucket.find(filter).sort(sort)
1718
}
1819

19-
def findById(oid: ObjectId): GridFSFindObservable = find(equal("_id", oid))
20+
def findById(oid: ObjectId): GridFSFindObservable = find(equal(DatabaseProvider.ObjectIdKey, oid))
2021

2122
def find(key: String, value: Any): GridFSFindObservable =
2223
find(equal(key, value))

src/main/scala/com/sfxcode/nosql/mongo/operation/Crud.scala

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.sfxcode.nosql.mongo.operation
22

3+
import com.sfxcode.nosql.mongo.database.DatabaseProvider
34
import com.sfxcode.nosql.mongo.{Converter, _}
45
import org.mongodb.scala.bson.conversions.Bson
56
import org.mongodb.scala.model.Filters._
6-
import org.mongodb.scala.model._
7+
import org.mongodb.scala.model.{BulkWriteOptions, _}
78
import org.mongodb.scala.result.{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult}
89
import org.mongodb.scala.{BulkWriteResult, Observable, SingleObservable}
910

@@ -26,29 +27,36 @@ abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
2627

2728
// bulk write
2829

30+
def bulkWrite(requests: List[WriteModel[_ <: A]], options: BulkWriteOptions): SingleObservable[BulkWriteResult] =
31+
coll.bulkWrite(requests, options)
32+
2933
def bulkWrite(requests: List[WriteModel[_ <: A]], ordered: Boolean = true): SingleObservable[BulkWriteResult] =
30-
coll.bulkWrite(requests, BulkWriteOptions().ordered(ordered))
34+
bulkWrite(requests, BulkWriteOptions().ordered(ordered))
35+
36+
def bulkWriteMany(values: Seq[A], options: BulkWriteOptions): SingleObservable[BulkWriteResult] = {
37+
val requests: ArrayBuffer[WriteModel[_ <: A]] = ArrayBuffer()
38+
values.foreach(value => requests.append(InsertOneModel(value)))
39+
bulkWrite(requests.toList, options)
40+
}
3141

3242
def bulkWriteMany(values: Seq[A], ordered: Boolean = true): SingleObservable[BulkWriteResult] = {
3343
val requests: ArrayBuffer[WriteModel[_ <: A]] = ArrayBuffer()
34-
values.foreach { value =>
35-
requests.append(InsertOneModel(value))
36-
}
44+
values.foreach(value => requests.append(InsertOneModel(value)))
3745
bulkWrite(requests.toList, ordered)
3846
}
3947

4048
// update
4149

4250
def replaceOne(value: A): Observable[UpdateResult] = {
4351
val document = Converter.toDocument(value)
44-
val oid = document.get("_id").get
45-
coll.replaceOne(equal("_id", oid), value)
52+
val oid = document.get(DatabaseProvider.ObjectIdKey).get
53+
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), value)
4654
}
4755

4856
def replaceOne(value: A, options: ReplaceOptions): Observable[UpdateResult] = {
4957
val document = Converter.toDocument(value)
50-
val oid = document.get("_id").get
51-
coll.replaceOne(equal("_id", oid), value, options)
58+
val oid = document.get(DatabaseProvider.ObjectIdKey).get
59+
coll.replaceOne(equal(DatabaseProvider.ObjectIdKey, oid), value, options)
5260
}
5361

5462
def replaceOne(filter: Bson, value: A): Observable[UpdateResult] =
@@ -77,8 +85,8 @@ abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
7785
coll.deleteOne(filter, options)
7886

7987
def deleteOne(value: A): Observable[DeleteResult] = {
80-
val oid = Converter.toDocument(value).get("_id").get
81-
coll.deleteOne(equal("_id", oid))
88+
val oid = Converter.toDocument(value).get(DatabaseProvider.ObjectIdKey).get
89+
coll.deleteOne(equal(DatabaseProvider.ObjectIdKey, oid))
8290
}
8391

8492
def deleteMany(filter: Bson): Observable[DeleteResult] =

src/main/scala/com/sfxcode/nosql/mongo/operation/CrudObserver.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.sfxcode.nosql.mongo.operation
22

33
import com.sfxcode.nosql.mongo.Converter
4+
import com.sfxcode.nosql.mongo.database.DatabaseProvider
45
import com.typesafe.scalalogging.LazyLogging
56
import org.mongodb.scala.Observer
67
import org.mongodb.scala.bson.conversions.Bson
@@ -19,8 +20,8 @@ trait CrudObserver[A] extends Crud[A] {
1920
replaceOne(value).subscribe(observer)
2021

2122
def deleteValue(value: A, observer: Observer[DeleteResult] = new SimpleObserver[DeleteResult]): Unit = {
22-
val oid = Converter.toDocument(value).get("_id").get
23-
val filter = equal("_id", oid)
23+
val oid = Converter.toDocument(value).get(DatabaseProvider.ObjectIdKey).get
24+
val filter = equal(DatabaseProvider.ObjectIdKey, oid)
2425
deleteOne(filter).subscribe(observer)
2526
}
2627

src/main/scala/com/sfxcode/nosql/mongo/operation/Search.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.sfxcode.nosql.mongo.operation
22

33
import com.sfxcode.nosql.mongo._
44
import com.sfxcode.nosql.mongo.bson.BsonConverter._
5+
import com.sfxcode.nosql.mongo.database.DatabaseProvider
56
import org.bson.BsonValue
67
import org.mongodb.scala.bson.ObjectId
78
import org.mongodb.scala.bson.conversions.Bson
@@ -27,7 +28,7 @@ abstract class Search[A]()(implicit ct: ClassTag[A]) extends Base[A] {
2728
coll.find(filter).sort(sort).projection(projection)
2829
}
2930

30-
def findById(oid: ObjectId): FindObservable[A] = find(equal("_id", oid))
31+
def findById(oid: ObjectId): FindObservable[A] = find(equal(DatabaseProvider.ObjectIdKey, oid))
3132

3233
def find(name: String, value: Any): FindObservable[A] =
3334
find(equal(name, value))

src/main/scala/com/sfxcode/nosql/mongo/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.util.Date
55
import com.sfxcode.nosql.mongo.Converter
66
import com.sfxcode.nosql.mongo.bson.BsonConverter
77
import com.sfxcode.nosql.mongo.bson.convert.JsonDateTimeConverter
8-
import com.sfxcode.nosql.mongo.database.MongoConfig
8+
import com.sfxcode.nosql.mongo.database.{DatabaseProvider, MongoConfig}
99
import com.sfxcode.nosql.mongo.gridfs.GridFSStreamObserver
1010
import com.sfxcode.nosql.mongo.operation.ObservableIncludes
1111
import org.bson.BsonValue
@@ -135,5 +135,5 @@ trait DocumentIncludes {
135135
implicit def stringToObjectId(str: String): ObjectId = new ObjectId(str)
136136

137137
implicit def documentToObjectId(doc: Document): ObjectId =
138-
doc.getObjectId("_id")
138+
doc.getObjectId(DatabaseProvider.ObjectIdKey)
139139
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
package com.sfxcode.nosql.mongo.sync
2+
3+
import java.util.Date
4+
5+
import com.sfxcode.nosql.mongo._
6+
import com.sfxcode.nosql.mongo.database.{ConfigHelper, DatabaseProvider}
7+
import com.sfxcode.nosql.mongo.sync.SyncDirection.SyncDirection
8+
import com.sfxcode.nosql.mongo.sync.SyncStrategy.SyncStrategy
9+
import com.typesafe.scalalogging.LazyLogging
10+
import org.mongodb.scala.Document
11+
import org.mongodb.scala.bson.ObjectId
12+
import org.mongodb.scala.model.Projections._
13+
import org.mongodb.scala.model.Updates._
14+
15+
object SyncStrategy extends Enumeration {
16+
type SyncStrategy = Value
17+
val Replcace = Value
18+
}
19+
20+
object SyncDirection extends Enumeration {
21+
type SyncDirection = Value
22+
val SourceToTarget, TargetToSource, TwoWay = Value
23+
}
24+
25+
case class MongoSyncException(message: String) extends Exception(message)
26+
27+
case class MongoSyncOperation(
28+
collectionName: String,
29+
syncDirection: SyncDirection = SyncDirection.SourceToTarget,
30+
syncStrategy: SyncStrategy = SyncStrategy.Replcace,
31+
idColumnName: String = DatabaseProvider.ObjectIdKey
32+
) extends LazyLogging
33+
with Filter {
34+
val includes = include(idColumnName, MongoSyncOperation.SyncColumnLastSync, MongoSyncOperation.SyncColumnLastUpdate)
35+
36+
def excecute(source: DatabaseProvider, target: DatabaseProvider): List[MongoSyncResult] =
37+
try {
38+
val sourceInfos: Seq[Document] =
39+
source.dao(collectionName).find().projection(includes).results(MongoSyncOperation.MaxWait)
40+
val targetInfos: Seq[Document] =
41+
target.dao(collectionName).find().projection(includes).results(MongoSyncOperation.MaxWait)
42+
43+
if (SyncDirection.SourceToTarget == syncDirection) {
44+
val diff = sourceInfos.diff(targetInfos)
45+
List(syncInternal(source, target, targetInfos.size, diff))
46+
}
47+
else if (SyncDirection.TargetToSource == syncDirection) {
48+
val diff = targetInfos.diff(sourceInfos)
49+
List(syncInternal(target, source, sourceInfos.size, diff))
50+
}
51+
else if (SyncDirection.TwoWay == syncDirection) {
52+
List(
53+
syncInternal(source, target, targetInfos.size, sourceInfos.diff(targetInfos)),
54+
syncInternal(target, source, sourceInfos.size, targetInfos.diff(sourceInfos))
55+
)
56+
}
57+
else {
58+
List(MongoSyncResult(collectionName))
59+
}
60+
}
61+
catch {
62+
case e: Exception => {
63+
logger.error(e.getMessage, e)
64+
List(MongoSyncResult(collectionName, exception = Some(e)))
65+
}
66+
}
67+
68+
private def syncInternal(
69+
left: DatabaseProvider,
70+
right: DatabaseProvider,
71+
countBefore: Int,
72+
diff: Seq[Document]
73+
): MongoSyncResult = {
74+
val syncInfo: MongoSyncInfo = MongoSyncInfo()
75+
if (diff.nonEmpty) {
76+
val idSet: Set[ObjectId] = diff.map(doc => doc.getObjectId(idColumnName)).toSet
77+
val documentsToSync: Seq[Document] = left.dao(collectionName).find(valueFilter(idColumnName, idSet)).results()
78+
right.dao(collectionName).bulkWriteMany(documentsToSync).result()
79+
val update = combine(
80+
set(MongoSyncOperation.SyncColumnLastSync, syncInfo.syncDate),
81+
set(MongoSyncOperation.SyncColumnLastUpdate, syncInfo.updateDate)
82+
)
83+
left.dao(collectionName).updateMany(Map(), update).result()
84+
right.dao(collectionName).updateMany(Map(), update).result()
85+
}
86+
val countAfter: Int = right.dao(collectionName).count().result().toInt
87+
MongoSyncResult(collectionName, true, diff.size, countBefore, countAfter, syncInfo)
88+
}
89+
}
90+
91+
object MongoSyncOperation extends ConfigHelper {
92+
val MaxWaitDefault = 600
93+
val MaxWait: Int = intConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "maxWait", default = MaxWaitDefault)
94+
95+
val SyncColumnLastSync: String =
96+
stringConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "syncColumnLastSync", default = "_lastSync").get
97+
val SyncColumnLastUpdate: String =
98+
stringConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "syncColumnLastUpdate", default = "_lastUpdate").get
99+
100+
}
101+
102+
case class MongoSyncInfo(id: Any = new ObjectId(), syncDate: Date = new Date(), updateDate: Date = new Date())
103+
104+
case class MongoSyncResult(
105+
collectionName: String,
106+
acknowleged: Boolean = false,
107+
synced: Int = -1,
108+
countBefore: Int = -1,
109+
countAfter: Int = -1,
110+
syncInfo: MongoSyncInfo = MongoSyncInfo(),
111+
exception: Option[Exception] = None
112+
)

0 commit comments

Comments
 (0)