Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
private implicit val ec: ExecutionContext = system.executionContext
private val entityType = nextEntityType()
private val streamId = "stream_id_" + entityType
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType

protected override def afterAll(): Unit = {
super.afterAll()
Expand All @@ -63,6 +64,13 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false),
protobufDescriptors = Nil)

lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
GrpcQuerySettings(streamIdFromSnapshot),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false),
protobufDescriptors = Nil)
}

override protected def beforeAll(): Unit = {
Expand All @@ -77,9 +85,12 @@ class CurrentEventsByPersistenceIdQuerySpec(testContainerConf: TestContainerConf
})

val eventProducerSource = EventProducerSource(entityType, streamId, transformation, EventProducerSettings(system))
val eventProducerSourceFromSnapshot =
EventProducerSource(entityType, streamIdFromSnapshot, transformation, EventProducerSettings(system))
.withStartingFromSnapshots[Any, Any](identity)

val eventProducerService =
EventProducer.grpcServiceHandler(eventProducerSource)
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,27 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
private implicit val ec: ExecutionContext = system.executionContext
private val entityType = nextEntityType()
private val streamId = "stream_id_" + entityType
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType

class TestFixture {
val pid = nextPid(entityType)
val pid2 = nextPid(entityType)

val replyProbe = createTestProbe[Done]()

lazy val entity = spawn(TestEntity(pid))
lazy val entity2 = spawn(TestEntity(pid2))

lazy val grpcReadJournal = GrpcReadJournal(
GrpcQuerySettings(streamId),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
protobufDescriptors = Nil)

lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
GrpcQuerySettings(streamIdFromSnapshot),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
protobufDescriptors = Nil)

lazy val readJournal = PersistenceQuery(system).readJournalFor[EventTimestampQuery](R2dbcReadJournal.Identifier)
}

Expand All @@ -73,9 +81,12 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)

val eventProducerSource =
EventProducerSource(entityType, streamId, Transformation.identity, EventProducerSettings(system))
val eventProducerSourceFromSnapshot =
EventProducerSource(entityType, streamIdFromSnapshot, Transformation.identity, EventProducerSettings(system))
.withStartingFromSnapshots[Any, Any](identity)

val eventProducerService =
EventProducer.grpcServiceHandler(eventProducerSource)
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
Expand Down Expand Up @@ -110,6 +121,26 @@ class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
timestampB.isAfter(timestampA) shouldBe true
}

"lookup event timestamp when StartingFromSnapshots" in new TestFixture {
entity2 ! TestEntity.Persist("C")
entity2 ! TestEntity.Persist("D")
entity2 ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

val timestampC =
grpcReadJournal.timestampOf(pid2.id, sequenceNr = 1L).futureValue.get
val expectedTimestampC = readJournal.timestampOf(pid2.id, sequenceNr = 1L).futureValue.get
timestampC shouldBe expectedTimestampC

val timestampD =
grpcReadJournal.timestampOf(pid2.id, sequenceNr = 2L).futureValue.get
val expectedTimestampD = readJournal.timestampOf(pid2.id, sequenceNr = 2L).futureValue.get
timestampD shouldBe expectedTimestampD

if (timestampD != timestampC)
timestampD.isAfter(timestampC) shouldBe true
}

"handle missing event as None" in new TestFixture {
grpcReadJournal
.timestampOf(pid.id, sequenceNr = 13L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
private implicit val ec: ExecutionContext = system.executionContext
private val entityType = nextEntityType()
private val streamId = "stream_id_" + entityType
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType

class TestFixture {
val pid = nextPid(entityType)
Expand All @@ -67,6 +68,11 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
protobufDescriptors = Nil)

lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
GrpcQuerySettings(streamIdFromSnapshot),
GrpcClientSettings.fromConfig(system.settings.config.getConfig("akka.projection.grpc.consumer.client")),
protobufDescriptors = Nil)

lazy val readJournal =
PersistenceQuery(system).readJournalFor[LatestEventTimestampQuery](R2dbcReadJournal.Identifier)
}
Expand All @@ -76,9 +82,12 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)

val eventProducerSource =
EventProducerSource(entityType, streamId, Transformation.identity, EventProducerSettings(system))
val eventProducerSourceFromSnapshot =
EventProducerSource(entityType, streamIdFromSnapshot, Transformation.identity, EventProducerSettings(system))
.withStartingFromSnapshots[Any, Any](identity)

val eventProducerService =
EventProducer.grpcServiceHandler(eventProducerSource)
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
Expand All @@ -99,18 +108,41 @@ class LatestEventTimestampQuerySpec(testContainerConf: TestContainerConf)
entity ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

val timestampA =
val timestamp =
grpcReadJournal.latestEventTimestamp(streamId, slice, slice).futureValue.get
val expectedTimestampA = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
timestampA shouldBe expectedTimestampA
val expectedTimestamp = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
timestamp shouldBe expectedTimestamp

grpcReadJournal.latestEventTimestamp(streamId, 0, 1023).futureValue.get shouldBe expectedTimestampA
grpcReadJournal.latestEventTimestamp(streamId, 0, 1023).futureValue.get shouldBe expectedTimestamp

val wrongSlice =
if (slice == 0) 1023 else (slice - 1)
grpcReadJournal.latestEventTimestamp(streamId, wrongSlice, wrongSlice).futureValue shouldBe None
}

"lookup event timestamp when StartingFromSnapshots" in new TestFixture {
entity ! TestEntity.Persist("c")
entity ! TestEntity.Persist("d")
entity ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

val timestamp =
grpcReadJournalFromSnapshot.latestEventTimestamp(streamIdFromSnapshot, slice, slice).futureValue.get
val expectedTimestamp = readJournal.latestEventTimestamp(entityType, slice, slice).futureValue.get
timestamp shouldBe expectedTimestamp

grpcReadJournalFromSnapshot
.latestEventTimestamp(streamIdFromSnapshot, 0, 1023)
.futureValue
.get shouldBe expectedTimestamp

val wrongSlice =
if (slice == 0) 1023 else (slice - 1)
grpcReadJournalFromSnapshot
.latestEventTimestamp(streamIdFromSnapshot, wrongSlice, wrongSlice)
.futureValue shouldBe None
}

"handle missing event as None" in new TestFixture {
grpcReadJournal
.timestampOf(pid.id, sequenceNr = 13L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
private implicit val ec: ExecutionContext = system.executionContext
private val entityType = nextEntityType()
private val streamId = "stream_id_" + entityType
private val streamIdFromSnapshot = "stream_id_from_snapshot" + entityType

protected override def afterAll(): Unit = {
super.afterAll()
Expand All @@ -56,15 +57,25 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)

val replyProbe = createTestProbe[Done]()
val pid = nextPid(entityType)
val pid2 = nextPid(entityType)

lazy val entity = spawn(TestEntity(pid))
lazy val entity2 = spawn(TestEntity(pid2))

lazy val grpcReadJournal = GrpcReadJournal(
GrpcQuerySettings(streamId),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false),
protobufDescriptors = Nil)

lazy val grpcReadJournalFromSnapshot = GrpcReadJournal(
GrpcQuerySettings(streamIdFromSnapshot),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
.withTls(false),
protobufDescriptors = Nil)

}

override protected def beforeAll(): Unit = {
Expand All @@ -79,9 +90,12 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
})

val eventProducerSource = EventProducerSource(entityType, streamId, transformation, EventProducerSettings(system))
val eventProducerSourceFromSnapshot =
EventProducerSource(entityType, streamIdFromSnapshot, transformation, EventProducerSettings(system))
.withStartingFromSnapshots[Any, Any](identity)

val eventProducerService =
EventProducer.grpcServiceHandler(eventProducerSource)
EventProducer.grpcServiceHandler(Set(eventProducerSource, eventProducerSourceFromSnapshot))

val service: HttpRequest => Future[HttpResponse] =
ServiceHandler.concatOrNotFound(eventProducerService)
Expand Down Expand Up @@ -113,6 +127,23 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
.event shouldBe "B"
}

"load event when StartingFromSnapshots" in new TestFixture {
entity2 ! TestEntity.Persist("c")
entity2 ! TestEntity.Persist("d")
entity2 ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

grpcReadJournalFromSnapshot
.loadEnvelope[String](pid2.id, sequenceNr = 1L)
.futureValue
.event shouldBe "C"

grpcReadJournalFromSnapshot
.loadEnvelope[String](pid2.id, sequenceNr = 2L)
.futureValue
.event shouldBe "D"
}

"load filtered event" in new TestFixture {
entity ! TestEntity.Persist("a*")
entity ! TestEntity.Ping(replyProbe.ref)
Expand Down
Loading