Skip to content

Commit 4c04722

Browse files
authored
fix: grpc queries like eventTimestamp when starting from snapshot (#1412)
1 parent 994bbca commit 4c04722

File tree

5 files changed

+240
-100
lines changed

5 files changed

+240
-100
lines changed

akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/CurrentEventsByPersistenceIdQuerySpec.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
4444
private implicit val ec: ExecutionContext = system.executionContext
4545
private val entityType = nextEntityType()
4646
private val streamId = "stream_id_" + entityType
47+
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType
4748

4849
protected override def afterAll(): Unit = {
4950
super.afterAll()
@@ -63,6 +64,13 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
6364
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
6465
.withTls(false),
6566
protobufDescriptors = Nil)
67+
68+
lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
69+
GrpcQuerySettings(streamIdFromSnapshot),
70+
GrpcClientSettings
71+
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
72+
.withTls(false),
73+
protobufDescriptors = Nil)
6674
}
6775

6876
override protected def beforeAll(): Unit = {
@@ -77,9 +85,12 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
7785
})
7886

7987
val eventProducerSource = EventProducerSource(entityType, streamId, transformation, EventProducerSettings(system))
88+
val eventProducerSourceFromSnapshot =
89+
EventProducerSource(entityType, streamIdFromSnapshot, transformation, EventProducerSettings(system))
90+
.withStartingFromSnapshots[Any, Any](identity)
8091

8192
val eventProducerService =
82-
EventProducer.grpcServiceHandler(eventProducerSource)
93+
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))
8394

8495
val service: HttpRequest => Future[HttpResponse] =
8596
ServiceHandler.concatOrNotFound(eventProducerService)

akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,27 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
5252
private implicit val ec: ExecutionContext = system.executionContext
5353
private val entityType = nextEntityType()
5454
private val streamId = "stream_id_" + entityType
55+
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType
5556

5657
class TestFixture {
5758
val pid = nextPid(entityType)
59+
val pid2 = nextPid(entityType)
5860

5961
val replyProbe = createTestProbe[Done]()
6062

6163
lazy val entity = spawn(TestEntity(pid))
64+
lazy val entity2 = spawn(TestEntity(pid2))
6265

6366
lazy val grpcReadJournal = GrpcReadJournal(
6467
GrpcQuerySettings(streamId),
6568
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
6669
protobufDescriptors = Nil)
6770

71+
lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
72+
GrpcQuerySettings(streamIdFromSnapshot),
73+
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
74+
protobufDescriptors = Nil)
75+
6876
lazy val readJournal = PersistenceQuery(system).readJournalFor[EventTimestampQuery](R2dbcReadJournal.Identifier)
6977
}
7078

@@ -73,9 +81,12 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
7381

7482
val eventProducerSource =
7583
EventProducerSource(entityType, streamId, Transformation.identity, EventProducerSettings(system))
84+
val eventProducerSourceFromSnapshot =
85+
EventProducerSource(entityType, streamIdFromSnapshot, Transformation.identity, EventProducerSettings(system))
86+
.withStartingFromSnapshots[Any, Any](identity)
7687

7788
val eventProducerService =
78-
EventProducer.grpcServiceHandler(eventProducerSource)
89+
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))
7990

8091
val service: HttpRequest => Future[HttpResponse] =
8192
ServiceHandler.concatOrNotFound(eventProducerService)
@@ -110,6 +121,26 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
110121
timestampB.isAfter(timestampA) shouldBe true
111122
}
112123

124+
"lookup event timestamp when StartingFromSnapshots" in new TestFixture {
125+
entity2 ! TestEntity.Persist("C")
126+
entity2 ! TestEntity.Persist("D")
127+
entity2 ! TestEntity.Ping(replyProbe.ref)
128+
replyProbe.receiveMessage()
129+
130+
val timestampC =
131+
grpcReadJournal.timestampOf(pid2.id, sequenceNr = 1L).futureValue.get
132+
val expectedTimestampC = readJournal.timestampOf(pid2.id, sequenceNr = 1L).futureValue.get
133+
timestampC shouldBe expectedTimestampC
134+
135+
val timestampD =
136+
grpcReadJournal.timestampOf(pid2.id, sequenceNr = 2L).futureValue.get
137+
val expectedTimestampD = readJournal.timestampOf(pid2.id, sequenceNr = 2L).futureValue.get
138+
timestampD shouldBe expectedTimestampD
139+
140+
if (timestampD != timestampC)
141+
timestampD.isAfter(timestampC) shouldBe true
142+
}
143+
113144
"handle missing event as None" in new TestFixture {
114145
grpcReadJournal
115146
.timestampOf(pid.id, sequenceNr = 13L)

akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LatestEventTimestampQuerySpec.scala

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
5252
private implicit val ec: ExecutionContext = system.executionContext
5353
private val entityType = nextEntityType()
5454
private val streamId = "stream_id_" + entityType
55+
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType
5556

5657
class TestFixture {
5758
val pid = nextPid(entityType)
@@ -67,6 +68,11 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
6768
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
6869
protobufDescriptors = Nil)
6970

71+
lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
72+
GrpcQuerySettings(streamIdFromSnapshot),
73+
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
74+
protobufDescriptors = Nil)
75+
7076
lazy val readJournal =
7177
PersistenceQuery(system).readJournalFor[LatestEventTimestampQuery](R2dbcReadJournal.Identifier)
7278
}
@@ -76,9 +82,12 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
7682

7783
val eventProducerSource =
7884
EventProducerSource(entityType, streamId, Transformation.identity, EventProducerSettings(system))
85+
val eventProducerSourceFromSnapshot =
86+
EventProducerSource(entityType, streamIdFromSnapshot, Transformation.identity, EventProducerSettings(system))
87+
.withStartingFromSnapshots[Any, Any](identity)
7988

8089
val eventProducerService =
81-
EventProducer.grpcServiceHandler(eventProducerSource)
90+
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))
8291

8392
val service: HttpRequest => Future[HttpResponse] =
8493
ServiceHandler.concatOrNotFound(eventProducerService)
@@ -99,18 +108,41 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
99108
entity ! TestEntity.Ping(replyProbe.ref)
100109
replyProbe.receiveMessage()
101110

102-
val timestampA =
111+
val timestamp =
103112
grpcReadJournal.latestEventTimestamp(streamId, slice, slice).futureValue.get
104-
val expectedTimestampA = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
105-
timestampA shouldBe expectedTimestampA
113+
val expectedTimestamp = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
114+
timestamp shouldBe expectedTimestamp
106115

107-
grpcReadJournal.latestEventTimestamp(streamId, 0, 1023).futureValue.get shouldBe expectedTimestampA
116+
grpcReadJournal.latestEventTimestamp(streamId, 0, 1023).futureValue.get shouldBe expectedTimestamp
108117

109118
val wrongSlice =
110119
if (slice == 0) 1023 else (slice - 1)
111120
grpcReadJournal.latestEventTimestamp(streamId, wrongSlice, wrongSlice).futureValue shouldBe None
112121
}
113122

123+
"lookup event timestamp when StartingFromSnapshots" in new TestFixture {
124+
entity ! TestEntity.Persist("c")
125+
entity ! TestEntity.Persist("d")
126+
entity ! TestEntity.Ping(replyProbe.ref)
127+
replyProbe.receiveMessage()
128+
129+
val timestamp =
130+
grpcReadJournalFromSnapshot.latestEventTimestamp(streamIdFromSnapshot, slice, slice).futureValue.get
131+
val expectedTimestamp = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
132+
timestamp shouldBe expectedTimestamp
133+
134+
grpcReadJournalFromSnapshot
135+
.latestEventTimestamp(streamIdFromSnapshot, 0, 1023)
136+
.futureValue
137+
.get shouldBe expectedTimestamp
138+
139+
val wrongSlice =
140+
if (slice == 0) 1023 else (slice - 1)
141+
grpcReadJournalFromSnapshot
142+
.latestEventTimestamp(streamIdFromSnapshot, wrongSlice, wrongSlice)
143+
.futureValue shouldBe None
144+
}
145+
114146
"handle missing event as None" in new TestFixture {
115147
grpcReadJournal
116148
.timestampOf(pid.id, sequenceNr = 13L)

akka-projection-grpc-integration/src/test/scala/akka/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
4646
private implicit val ec: ExecutionContext = system.executionContext
4747
private val entityType = nextEntityType()
4848
private val streamId = "stream_id_" + entityType
49+
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType
4950

5051
protected override def afterAll(): Unit = {
5152
super.afterAll()
@@ -56,15 +57,25 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
5657

5758
val replyProbe = createTestProbe[Done]()
5859
val pid = nextPid(entityType)
60+
val pid2 = nextPid(entityType)
5961

6062
lazy val entity = spawn(TestEntity(pid))
63+
lazy val entity2 = spawn(TestEntity(pid2))
6164

6265
lazy val grpcReadJournal = GrpcReadJournal(
6366
GrpcQuerySettings(streamId),
6467
GrpcClientSettings
6568
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
6669
.withTls(false),
6770
protobufDescriptors = Nil)
71+
72+
lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
73+
GrpcQuerySettings(streamIdFromSnapshot),
74+
GrpcClientSettings
75+
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
76+
.withTls(false),
77+
protobufDescriptors = Nil)
78+
6879
}
6980

7081
override protected def beforeAll(): Unit = {
@@ -79,9 +90,12 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
7990
})
8091

8192
val eventProducerSource = EventProducerSource(entityType, streamId, transformation, EventProducerSettings(system))
93+
val eventProducerSourceFromSnapshot =
94+
EventProducerSource(entityType, streamIdFromSnapshot, transformation, EventProducerSettings(system))
95+
.withStartingFromSnapshots[Any, Any](identity)
8296

8397
val eventProducerService =
84-
EventProducer.grpcServiceHandler(eventProducerSource)
98+
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))
8599

86100
val service: HttpRequest => Future[HttpResponse] =
87101
ServiceHandler.concatOrNotFound(eventProducerService)
@@ -113,6 +127,23 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
113127
.event shouldBe "B"
114128
}
115129

130+
"load event when StartingFromSnapshots" in new TestFixture {
131+
entity2 ! TestEntity.Persist("c")
132+
entity2 ! TestEntity.Persist("d")
133+
entity2 ! TestEntity.Ping(replyProbe.ref)
134+
replyProbe.receiveMessage()
135+
136+
grpcReadJournalFromSnapshot
137+
.loadEnvelope[String](pid2.id, sequenceNr = 1L)
138+
.futureValue
139+
.event shouldBe "C"
140+
141+
grpcReadJournalFromSnapshot
142+
.loadEnvelope[String](pid2.id, sequenceNr = 2L)
143+
.futureValue
144+
.event shouldBe "D"
145+
}
146+
116147
"load filtered event" in new TestFixture {
117148
entity ! TestEntity.Persist("a*")
118149
entity ! TestEntity.Ping(replyProbe.ref)

0 commit comments

Comments
 (0)