Skip to content

Commit 824b99b

Browse files
committed
MongoSync
1 parent c643df9 commit 824b99b

File tree

11 files changed

+119
-29
lines changed

11 files changed

+119
-29
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "4.0.3"
5959

6060
libraryDependencies += "org.xerial.snappy" % "snappy-java" % "1.1.7.5" % Provided
6161

62-
libraryDependencies += "com.github.luben" % "zstd-jni" % "1.4.4-11" % Provided
62+
libraryDependencies += "com.github.luben" % "zstd-jni" % "1.4.5-2" % Provided
6363

6464
libraryDependencies += "de.bwaldvogel" % "mongo-java-server" % "1.29.1" % Provided
6565

project/build.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
sbt.version=1.3.10
1+
sbt.version=1.3.11

src/main/reference.conf renamed to src/main/resources/reference.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,6 @@ com.sfxcode.nosql.mongo.sync {
33
maxWait = 600
44
syncColumnLastSync = "_lastSync"
55
syncColumnLastUpdate = "_lastUpdate"
6+
writeSyncLogOnMaster = false
7+
syncLogTableName = "mongo-sync-log"
68
}

src/main/scala/com/sfxcode/nosql/mongo/operation/Base.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.sfxcode.nosql.mongo.operation
22

3-
import com.sfxcode.nosql.mongo.database.MongoIndex
3+
import com.sfxcode.nosql.mongo.database.{ConfigHelper, MongoIndex}
44
import com.typesafe.scalalogging.LazyLogging
55
import org.mongodb.scala.bson.conversions.Bson
66
import org.mongodb.scala.model.Sorts._

src/main/scala/com/sfxcode/nosql/mongo/operation/Crud.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.sfxcode.nosql.mongo.operation
22

3+
import java.util.Date
4+
35
import com.sfxcode.nosql.mongo.database.DatabaseProvider
46
import com.sfxcode.nosql.mongo.{Converter, _}
57
import org.mongodb.scala.bson.conversions.Bson
@@ -10,6 +12,8 @@ import org.mongodb.scala.{BulkWriteResult, Observable, SingleObservable}
1012

1113
import scala.collection.mutable.ArrayBuffer
1214
import scala.reflect.ClassTag
15+
import Updates._
16+
import com.sfxcode.nosql.mongo.sync.MongoSyncOperation
1317

1418
abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
1519

@@ -77,6 +81,9 @@ abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
7781
def updateMany(filter: Bson, update: Bson, options: UpdateOptions): Observable[UpdateResult] =
7882
coll.updateMany(filter, update, options)
7983

84+
def touchInternal(filter: Bson): Observable[UpdateResult] =
85+
updateMany(filter, set(MongoSyncOperation.SyncColumnLastUpdate, new Date()))
86+
8087
// delete
8188

8289
def deleteOne(filter: Bson): Observable[DeleteResult] = coll.deleteOne(filter)

src/main/scala/com/sfxcode/nosql/mongo/sync/MongoSyncOperation.scala

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,29 @@ case class MongoSyncOperation(
7171
countBefore: Int,
7272
diff: Seq[Document]
7373
): MongoSyncResult = {
74-
val syncInfo: MongoSyncInfo = MongoSyncInfo()
74+
val start = System.currentTimeMillis()
75+
val syncDate = new Date()
7576
if (diff.nonEmpty) {
7677
val idSet: Set[ObjectId] = diff.map(doc => doc.getObjectId(idColumnName)).toSet
7778
val documentsToSync: Seq[Document] = left.dao(collectionName).find(valueFilter(idColumnName, idSet)).results()
7879
right.dao(collectionName).bulkWriteMany(documentsToSync).result()
7980
val update = combine(
80-
set(MongoSyncOperation.SyncColumnLastSync, syncInfo.syncDate),
81-
set(MongoSyncOperation.SyncColumnLastUpdate, syncInfo.updateDate)
81+
set(MongoSyncOperation.SyncColumnLastSync, syncDate),
82+
set(MongoSyncOperation.SyncColumnLastUpdate, syncDate)
8283
)
8384
left.dao(collectionName).updateMany(Map(), update).result()
8485
right.dao(collectionName).updateMany(Map(), update).result()
8586
}
8687
val countAfter: Int = right.dao(collectionName).count().result().toInt
87-
MongoSyncResult(collectionName, true, diff.size, countBefore, countAfter, syncInfo)
88+
MongoSyncResult(
89+
collectionName,
90+
syncDate,
91+
true,
92+
diff.size,
93+
countBefore,
94+
countAfter,
95+
(System.currentTimeMillis() - start)
96+
)
8897
}
8998
}
9099

@@ -97,16 +106,20 @@ object MongoSyncOperation extends ConfigHelper {
97106
val SyncColumnLastUpdate: String =
98107
stringConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "syncColumnLastUpdate", default = "_lastUpdate").get
99108

109+
val WriteSyncLogOnMaster = booleanConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "writeSyncLogOnMaster")
110+
val SyncLogTableName: String =
111+
stringConfig(configPath = "com.sfxcode.nosql.mongo.sync", key = "syncLogTableName", default = "mongo-sync-log").get
100112
}
101113

102-
case class MongoSyncInfo(id: Any = new ObjectId(), syncDate: Date = new Date(), updateDate: Date = new Date())
114+
//case class MongoSyncInfo(id: Any = new ObjectId(), syncDate: Date = new Date(), updateDate: Date = new Date())
103115

104116
case class MongoSyncResult(
105117
collectionName: String,
118+
syncDate: Date = new Date(),
106119
acknowleged: Boolean = false,
107120
synced: Int = -1,
108121
countBefore: Int = -1,
109122
countAfter: Int = -1,
110-
syncInfo: MongoSyncInfo = MongoSyncInfo(),
123+
syncTime: Long = -1,
111124
exception: Option[Exception] = None
112125
)

src/main/scala/com/sfxcode/nosql/mongo/sync/MongoSyncer.scala

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
11
package com.sfxcode.nosql.mongo.sync
22

3+
import com.sfxcode.nosql.mongo._
34
import com.sfxcode.nosql.mongo.database.{DatabaseProvider, MongoConfig}
5+
import org.bson.codecs.configuration.CodecRegistries.fromProviders
6+
import org.mongodb.scala.bson.codecs.Macros._
47

58
import scala.collection.mutable
69

710
case class MongoSyncer(sourceConfig: MongoConfig, targetConfig: MongoConfig) {
8-
val source: DatabaseProvider = DatabaseProvider(sourceConfig)
11+
private val registry = fromProviders(classOf[MongoSyncResult])
12+
13+
val source: DatabaseProvider = DatabaseProvider(sourceConfig, registry)
914
val target: DatabaseProvider = DatabaseProvider(targetConfig)
1015

16+
object MongoSyncResultDAO extends MongoDAO[MongoSyncResult](source, MongoSyncOperation.SyncLogTableName)
17+
1118
var terminated = false
1219

1320
private val operationMap = new mutable.HashMap[String, MongoSyncOperation]()
@@ -19,10 +26,16 @@ case class MongoSyncer(sourceConfig: MongoConfig, targetConfig: MongoConfig) {
1926
if (terminated) {
2027
throw MongoSyncException("MongoSyncer already terminated")
2128
}
22-
operationMap
29+
30+
val result = operationMap
2331
.get(collectionName)
2432
.map(op => op.excecute(source, target))
2533
.getOrElse(List(MongoSyncResult(collectionName)))
34+
35+
if (MongoSyncOperation.WriteSyncLogOnMaster) {
36+
MongoSyncResultDAO.insertMany(result).results()
37+
}
38+
result
2639
}
2740

2841
def syncAll(): List[MongoSyncResult] = operationMap.keys.flatMap(key => sync(key)).toList

src/test/resources/application.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
test.mongo.database = "test"
22

3+
com.sfxcode.nosql.mongo.sync.writeSyncLogOnMaster = true
4+
5+
36
unit.test.mongo {
47
database = "simple-mongo-unit-test"
58
host = "localhost"

src/test/scala/com/sfxcode/nosql/mongo/sync/SyncSpec.scala

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.sfxcode.nosql.mongo.sync
22

33
import com.sfxcode.nosql.mongo.server.LocalServer
4+
import com.sfxcode.nosql.mongo.sync.TestSync._
45
import com.sfxcode.nosql.mongo.test.UniversityDatabase
56
import org.specs2.mutable.Specification
67
import org.specs2.specification.{AfterAll, BeforeAll}
@@ -20,36 +21,69 @@ class SyncSpec extends Specification with BeforeAll with AfterAll {
2021
"Collection" should {
2122

2223
"be synced from source to target" in {
23-
var result: MongoSyncResult = TestSync.mongoSyncer.sync(TestSync.TestCollectionSourceTargetName).head
24+
var result: MongoSyncResult = TestSync.mongoSyncer.sync(TestCollectionSourceTargetName).head
2425
result.acknowleged must beTrue
25-
TestSync.insertIntoSource(500)
26-
result = TestSync.mongoSyncer.sync(TestSync.TestCollectionSourceTargetName).head
26+
TestSync.insertIntoSource(500, TestCollectionSourceTargetName)
27+
result = TestSync.mongoSyncer.sync(TestCollectionSourceTargetName).head
2728
result.acknowleged must beTrue
2829
result.synced mustEqual 500
2930
result.countBefore mustEqual 0
3031
result.countAfter mustEqual 500
31-
TestSync.targetCount mustEqual 500
32-
result = TestSync.mongoSyncer.sync(TestSync.TestCollectionSourceTargetName).head
32+
TestSync.targetCount(TestCollectionSourceTargetName) mustEqual 500
33+
result = TestSync.mongoSyncer.sync(TestCollectionSourceTargetName).head
3334
result.acknowleged must beTrue
3435
result.countBefore mustEqual 500
3536
result.synced mustEqual 0
3637

37-
TestSync.insertIntoSource(5)
38-
result = TestSync.mongoSyncer.sync(TestSync.TestCollectionSourceTargetName).head
38+
TestSync.insertIntoSource(5, TestCollectionSourceTargetName)
39+
result = TestSync.mongoSyncer.sync(TestCollectionSourceTargetName).head
3940
result.acknowleged must beTrue
4041
result.countBefore mustEqual 500
4142
result.countAfter mustEqual 505
4243
result.synced mustEqual 5
43-
TestSync.targetCount mustEqual 505
44+
TestSync.targetCount(TestCollectionSourceTargetName) mustEqual 505
4445

45-
TestSync.insertIntoTarget(5)
46-
TestSync.targetCount mustEqual 510
47-
val resultList = TestSync.mongoSyncer.sync(TestSync.TestCollectionSourceTargetName)
46+
TestSync.insertIntoTarget(5, TestCollectionSourceTargetName)
47+
TestSync.targetCount(TestCollectionSourceTargetName) mustEqual 510
48+
val resultList = TestSync.mongoSyncer.sync(TestCollectionSourceTargetName)
4849
result = resultList.head
4950
result.acknowleged must beTrue
5051
result.countBefore mustEqual 510
5152
result.synced mustEqual 0
52-
TestSync.sourceCount mustEqual 505
53+
TestSync.sourceCount(TestCollectionSourceTargetName) mustEqual 505
54+
}
55+
56+
"be synced two way" in {
57+
var result: MongoSyncResult = TestSync.mongoSyncer.sync(TestCollectionTwoWayName).head
58+
result.acknowleged must beTrue
59+
TestSync.insertIntoSource(500, TestCollectionTwoWayName)
60+
result = TestSync.mongoSyncer.sync(TestCollectionTwoWayName).head
61+
result.acknowleged must beTrue
62+
result.synced mustEqual 500
63+
result.countBefore mustEqual 0
64+
result.countAfter mustEqual 500
65+
TestSync.targetCount(TestCollectionTwoWayName) mustEqual 500
66+
result = TestSync.mongoSyncer.sync(TestCollectionTwoWayName).head
67+
result.acknowleged must beTrue
68+
result.countBefore mustEqual 500
69+
result.synced mustEqual 0
70+
71+
TestSync.insertIntoSource(5, TestCollectionTwoWayName)
72+
result = TestSync.mongoSyncer.sync(TestCollectionTwoWayName).head
73+
result.acknowleged must beTrue
74+
result.countBefore mustEqual 500
75+
result.countAfter mustEqual 505
76+
result.synced mustEqual 5
77+
TestSync.targetCount(TestCollectionTwoWayName) mustEqual 505
78+
79+
TestSync.insertIntoTarget(5, TestCollectionTwoWayName)
80+
TestSync.targetCount(TestCollectionTwoWayName) mustEqual 510
81+
val resultList = TestSync.mongoSyncer.sync(TestCollectionTwoWayName)
82+
result = resultList.head
83+
result.acknowleged must beTrue
84+
result.countBefore mustEqual 510
85+
result.synced mustEqual 0
86+
TestSync.sourceCount(TestCollectionTwoWayName) mustEqual 510
5387
}
5488

5589
}

src/test/scala/com/sfxcode/nosql/mongo/sync/TestSync.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,34 +6,41 @@ import org.mongodb.scala.bson.collection.immutable.Document
66
object TestSync {
77

88
val TestCollectionSourceTargetName = "sync-test-source-target"
9+
val TestCollectionTwoWayName = "sync-test-two-way"
910

1011
val mongoSyncer: MongoSyncer =
1112
MongoSyncer.fromPath(sourceConfigPath = "unit.test.mongo", targetConfigPath = "unit.test.mongo.local")
1213

1314
mongoSyncer.addOperation(MongoSyncOperation(TestCollectionSourceTargetName, SyncDirection.SourceToTarget))
15+
mongoSyncer.addOperation(MongoSyncOperation(TestCollectionTwoWayName, SyncDirection.TwoWay))
1416
reset()
1517

16-
def insertIntoSource(count: Int = 1): Unit =
18+
def insertIntoSource(count: Int = 1, collectionName: String): Unit =
1719
(1 to count).foreach { _ =>
1820
mongoSyncer.source
19-
.collection(TestCollectionSourceTargetName)
21+
.collection(collectionName)
2022
.insertOne(Document("_id" -> new ObjectId(), "string" -> "Hallo", "long" -> 1))
2123
.result()
2224
}
2325

24-
def insertIntoTarget(count: Int = 1): Unit =
26+
def insertIntoTarget(count: Int = 1, collectionName: String): Unit =
2527
(1 to count).foreach { _ =>
2628
mongoSyncer.target
27-
.collection(TestCollectionSourceTargetName)
29+
.collection(collectionName)
2830
.insertOne(Document("_id" -> new ObjectId(), "string" -> "Hallo", "long" -> 1))
2931
.result()
3032
}
3133

32-
def sourceCount: Long = mongoSyncer.source.dao(TestCollectionSourceTargetName).count().result()
33-
def targetCount: Long = mongoSyncer.target.dao(TestCollectionSourceTargetName).count().result()
34+
def sourceCount(collectionName: String): Long =
35+
mongoSyncer.source.dao(collectionName).count().result()
36+
def targetCount(collectionName: String): Long =
37+
mongoSyncer.target.dao(collectionName).count().result()
3438

3539
def reset(): Unit = {
3640
mongoSyncer.source.collection(TestCollectionSourceTargetName).drop().result()
3741
mongoSyncer.target.collection(TestCollectionSourceTargetName).drop().result()
42+
43+
mongoSyncer.source.collection(TestCollectionTwoWayName).drop().result()
44+
mongoSyncer.target.collection(TestCollectionTwoWayName).drop().result()
3845
}
3946
}

0 commit comments

Comments
 (0)