Skip to content

Commit 42c13e0

Browse files
committed
add ChangeObserver
1 parent a0e2287 commit 42c13e0

File tree

5 files changed

+81
-31
lines changed

5 files changed

+81
-31
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package com.sfxcode.nosql.mongo.database
2+
3+
import com.typesafe.scalalogging.LazyLogging
4+
import org.mongodb.scala.model.changestream.ChangeStreamDocument
5+
import org.mongodb.scala.{ Observer, Subscription }
6+
7+
case class ChangeObserver[A](onChangeCallback: ChangeStreamDocument[A] => Unit)
8+
extends Observer[ChangeStreamDocument[A]]
9+
with LazyLogging {
10+
11+
override def onSubscribe(subscription: Subscription): Unit = subscription.request(Long.MaxValue) // Request data
12+
13+
override def onNext(changeDocument: ChangeStreamDocument[A]): Unit =
14+
onChangeCallback(changeDocument)
15+
16+
override def onError(throwable: Throwable): Unit =
17+
logger.error(throwable.getMessage, throwable)
18+
19+
override def onComplete(): Unit = {}
20+
21+
}

src/main/scala/com/sfxcode/nosql/mongo/database/DatabaseProvider.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ class DatabaseProvider(config: MongoConfig, registry: CodecRegistry) extends Ser
4040
cachedDatabaseMap(databaseName)
4141
}
4242

43+
def addChangeObserver(observer: ChangeObserver[Document]): ChangeObserver[Document] = {
44+
database().watch().subscribe(observer)
45+
observer
46+
}
47+
4348
def collection(collectionName: String): MongoCollection[Document] =
4449
dao(collectionName).collection
4550

@@ -60,7 +65,7 @@ class DatabaseProvider(config: MongoConfig, registry: CodecRegistry) extends Ser
6065
database(name).listCollectionNames()
6166

6267
case class DocumentDao(provider: DatabaseProvider, collectionName: String)
63-
extends MongoDAO[Document](this, collectionName)
68+
extends MongoDAO[Document](this, collectionName)
6469

6570
}
6671

@@ -75,9 +80,8 @@ object DatabaseProvider {
7580
def apply(config: MongoConfig, registry: CodecRegistry = codecRegistry): DatabaseProvider =
7681
new DatabaseProvider(config, fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
7782

78-
def fromPath(
79-
configPath: String = MongoConfig.DefaultConfigPathPrefix,
80-
registry: CodecRegistry = codecRegistry): DatabaseProvider =
83+
def fromPath(configPath: String = MongoConfig.DefaultConfigPathPrefix,
84+
registry: CodecRegistry = codecRegistry): DatabaseProvider =
8185
apply(MongoConfig.fromPath(configPath), fromRegistries(registry, CustomRegistry, DEFAULT_CODEC_REGISTRY))
8286

8387
}
Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package com.sfxcode.nosql.mongo.operation
22

3-
import com.sfxcode.nosql.mongo.database.MongoIndex
3+
import com.sfxcode.nosql.mongo.database.{ ChangeObserver, MongoIndex }
44
import com.typesafe.scalalogging.LazyLogging
55
import org.mongodb.scala.bson.conversions.Bson
66
import org.mongodb.scala.model.Sorts._
@@ -19,26 +19,23 @@ abstract class Base[A]()(implicit ct: ClassTag[A]) extends LazyLogging {
1919

2020
def drop(): Observable[Void] = coll.drop()
2121

22-
def createIndexForField(
23-
fieldName: String,
24-
sortAscending: Boolean = true,
25-
options: IndexOptions = IndexOptions()): SingleObservable[String] =
22+
def createIndexForField(fieldName: String,
23+
sortAscending: Boolean = true,
24+
options: IndexOptions = IndexOptions()): SingleObservable[String] =
2625
if (sortAscending) {
2726
createIndex(ascending(fieldName), options)
2827
} else {
2928
createIndex(descending(fieldName), options)
3029
}
3130

32-
def createIndexForFieldWithName(
33-
fieldName: String,
34-
sortAscending: Boolean = true,
35-
name: String): SingleObservable[String] =
31+
def createIndexForFieldWithName(fieldName: String,
32+
sortAscending: Boolean = true,
33+
name: String): SingleObservable[String] =
3634
createIndexForField(fieldName, sortAscending, MongoIndex.indexOptionsWithName(Some(name)))
3735

38-
def createUniqueIndexForField(
39-
fieldName: String,
40-
sortAscending: Boolean = true,
41-
name: Option[String] = None): SingleObservable[String] =
36+
def createUniqueIndexForField(fieldName: String,
37+
sortAscending: Boolean = true,
38+
name: Option[String] = None): SingleObservable[String] =
4239
createIndexForField(fieldName, sortAscending, MongoIndex.indexOptionsWithName(name).unique(true))
4340

4441
def createHashedIndexForField(fieldName: String, options: IndexOptions = IndexOptions()): SingleObservable[String] =
@@ -47,15 +44,13 @@ abstract class Base[A]()(implicit ct: ClassTag[A]) extends LazyLogging {
4744
def createTextIndexForField(fieldName: String, options: IndexOptions = IndexOptions()): SingleObservable[String] =
4845
createIndex(Indexes.text(fieldName), options)
4946

50-
def createExpiringIndexForField(
51-
fieldName: String,
52-
duration: Duration,
53-
sortAscending: Boolean = true,
54-
name: Option[String] = None): SingleObservable[String] =
55-
createIndexForField(
56-
fieldName,
57-
sortAscending,
58-
MongoIndex.indexOptionsWithName(name).expireAfter(duration._1, duration._2))
47+
def createExpiringIndexForField(fieldName: String,
48+
duration: Duration,
49+
sortAscending: Boolean = true,
50+
name: Option[String] = None): SingleObservable[String] =
51+
createIndexForField(fieldName,
52+
sortAscending,
53+
MongoIndex.indexOptionsWithName(name).expireAfter(duration._1, duration._2))
5954

6055
def createIndex(key: Bson, options: IndexOptions = IndexOptions()): SingleObservable[String] =
6156
coll.createIndex(key, options)
@@ -72,4 +67,9 @@ abstract class Base[A]()(implicit ct: ClassTag[A]) extends LazyLogging {
7267

7368
def hasIndexForField(fieldName: String): Boolean = MongoIndex.hasIndexForFieldWithName(listIndexes, fieldName)
7469

70+
def addChangeObserver(observer: ChangeObserver[A]): ChangeObserver[A] = {
71+
coll.watch[A]().subscribe(observer)
72+
observer
73+
}
74+
7575
}

src/test/scala/com/sfxcode/nosql/mongo/TestDatabase.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,33 @@
11
package com.sfxcode.nosql.mongo
22

3-
import com.sfxcode.nosql.mongo.database.DatabaseProvider
3+
import com.mongodb.client.model.changestream.OperationType
4+
import com.sfxcode.nosql.mongo.database.{ ChangeObserver, DatabaseProvider }
45
import com.sfxcode.nosql.mongo.model._
6+
import com.typesafe.scalalogging.LazyLogging
57
import org.bson.codecs.configuration.CodecRegistries._
8+
import org.mongodb.scala.Document
69
import org.mongodb.scala.bson.codecs.Macros._
10+
import org.mongodb.scala.model.changestream.ChangeStreamDocument
711

8-
object TestDatabase {
12+
object TestDatabase extends LazyLogging {
913

1014
private val registry = fromProviders(classOf[Person], classOf[Friend], classOf[CodecTest])
1115
private val universityRegistry = fromProviders(classOf[Student], classOf[Score], classOf[Grade])
1216

1317
val provider =
1418
DatabaseProvider.fromPath(configPath = "unit.test.mongo", registry = fromRegistries(registry, universityRegistry))
1519

20+
provider.addChangeObserver(ChangeObserver(consumeDatabaseChanges))
21+
22+
def consumeDatabaseChanges(changeStreamDocument: ChangeStreamDocument[Document]): Unit =
23+
if (changeStreamDocument.getOperationType != OperationType.INSERT) {
24+
logger.info(
25+
"changed %s:%s with ID: %s".format(changeStreamDocument.getNamespace,
26+
changeStreamDocument.getOperationType,
27+
changeStreamDocument.getDocumentKey)
28+
)
29+
}
30+
1631
object PersonDAO extends MongoDAO[Person](provider, "people")
1732

1833
object BookDAO extends MongoDAO[Book](provider, "books")

src/test/scala/com/sfxcode/nosql/mongo/operation/CrudSpec.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,28 @@ package com.sfxcode.nosql.mongo.operation
33
import com.sfxcode.nosql.mongo.TestDatabase._
44
import com.sfxcode.nosql.mongo._
55
import com.sfxcode.nosql.mongo.dao.PersonSpecification
6+
import com.sfxcode.nosql.mongo.database.ChangeObserver
67
import com.sfxcode.nosql.mongo.model.CodecTest
7-
import org.specs2.mutable.Specification
8-
import org.specs2.specification.BeforeAll
8+
import com.typesafe.scalalogging.LazyLogging
9+
import org.mongodb.scala.model.changestream.ChangeStreamDocument
910

10-
class CrudSpec extends PersonSpecification {
11+
class CrudSpec extends PersonSpecification with LazyLogging {
1112

1213
sequential
1314

1415
override def beforeAll(): Unit = {
1516
super.beforeAll()
1617
CodecDao.drop().result()
18+
CodecDao.addChangeObserver(ChangeObserver(consumeCodecChanges))
1719
CodecDao.insertOne(CodecTest()).result()
20+
21+
def consumeCodecChanges(changeStreamDocument: ChangeStreamDocument[CodecTest]): Unit =
22+
logger.info(
23+
"codec changed %s:%s with ID: %s".format(changeStreamDocument.getNamespace,
24+
changeStreamDocument.getOperationType,
25+
changeStreamDocument.getDocumentKey)
26+
)
27+
1828
}
1929

2030
"Crud Operations" should {

0 commit comments

Comments
 (0)