Skip to content

Commit 264e5fd

Browse files
authored
feat: Offset validation of heartbeat envelopes (#1413)
* accept in same way as snapshots and AllowSeqNrGapsMetadata * simplified by handling all non-strict in same way * corresponding for dynamodb * mima
1 parent 4c04722 commit 264e5fd

File tree

8 files changed

+242
-34
lines changed

8 files changed

+242
-34
lines changed

akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,32 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
152152
filtered = true,
153153
env.source)
154154

155+
def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
156+
new EventEnvelope[String](
157+
env.offset,
158+
env.persistenceId,
159+
sequenceNr = env.sequenceNr,
160+
env.eventOption,
161+
env.timestamp,
162+
None,
163+
env.entityType,
164+
env.slice,
165+
filtered = false,
166+
source = EnvelopeOrigin.SourceSnapshot)
167+
168+
def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
169+
new EventEnvelope[String](
170+
env.offset,
171+
env.persistenceId,
172+
sequenceNr = env.sequenceNr,
173+
None,
174+
env.timestamp,
175+
None,
176+
env.entityType,
177+
env.slice,
178+
filtered = true,
179+
source = EnvelopeOrigin.SourceHeartbeat)
180+
155181
def createUpdatedDurableState(
156182
pid: Pid,
157183
revision: SeqNr,
@@ -678,6 +704,82 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
678704
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
679705
}
680706

707+
"accept snapshots" in {
708+
val projectionId = genRandomProjectionId()
709+
val eventTimestampQueryClock = TestClock.nowMicros()
710+
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
711+
712+
val p1 = "p-08071" // slice 101
713+
714+
val startTime = TestClock.nowMicros().instant()
715+
716+
val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1"))
717+
offsetStore.validate(env1).futureValue shouldBe Accepted
718+
719+
// seqNr gap is allowed
720+
val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3"))
721+
offsetStore.validate(env2).futureValue shouldBe Accepted
722+
723+
offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
724+
// duplicate
725+
offsetStore.validate(env1).futureValue shouldBe Duplicate
726+
offsetStore.validate(env2).futureValue shouldBe Duplicate
727+
728+
offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr
729+
730+
val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10"))
731+
offsetStore.validate(env3).futureValue shouldBe Accepted
732+
offsetStore.addInflight(env3)
733+
offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr)
734+
offsetStore.validate(env3).futureValue shouldBe Duplicate
735+
736+
offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
737+
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr
738+
offsetStore.getInflight() shouldBe Map.empty
739+
740+
offsetStore.validate(env1).futureValue shouldBe Duplicate
741+
offsetStore.validate(env2).futureValue shouldBe Duplicate
742+
offsetStore.validate(env3).futureValue shouldBe Duplicate
743+
}
744+
745+
"accept heartbeats" in {
746+
val projectionId = genRandomProjectionId()
747+
val eventTimestampQueryClock = TestClock.nowMicros()
748+
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
749+
750+
val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359
751+
752+
val startTime = TestClock.nowMicros().instant()
753+
754+
val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), ""))
755+
offsetStore.validate(env1).futureValue shouldBe Accepted
756+
757+
// seqNr gap is allowed
758+
val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), ""))
759+
offsetStore.validate(env2).futureValue shouldBe Accepted
760+
761+
offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
762+
// duplicate
763+
offsetStore.validate(env1).futureValue shouldBe Duplicate
764+
offsetStore.validate(env2).futureValue shouldBe Duplicate
765+
766+
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr
767+
768+
val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), ""))
769+
offsetStore.validate(env3).futureValue shouldBe Accepted
770+
offsetStore.addInflight(env3)
771+
offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr)
772+
offsetStore.validate(env3).futureValue shouldBe Duplicate
773+
774+
offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
775+
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr
776+
offsetStore.getInflight() shouldBe Map.empty
777+
778+
offsetStore.validate(env1).futureValue shouldBe Duplicate
779+
offsetStore.validate(env2).futureValue shouldBe Duplicate
780+
offsetStore.validate(env3).futureValue shouldBe Duplicate
781+
}
782+
681783
"update inflight on error and re-accept element" in {
682784
val projectionId = genRandomProjectionId()
683785
val offsetStore = createOffsetStore(projectionId)
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# snapshots, heartbeats, internals
2+
ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#RecordWithOffset*")
3+
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore$RecordWithOffset$")

akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,7 @@ private[projection] object DynamoDBOffsetStore {
6969
offset: TimestampOffset,
7070
strictSeqNr: Boolean,
7171
fromBacktracking: Boolean,
72-
fromPubSub: Boolean,
73-
fromSnapshot: Boolean)
72+
fromPubSub: Boolean)
7473

7574
object State {
7675
val empty: State = State(Map.empty, Map.empty, Map.empty)
@@ -707,9 +706,6 @@ private[projection] class DynamoDBOffsetStore(
707706
seqNr,
708707
pid)
709708
FutureDuplicate
710-
} else if (recordWithOffset.fromSnapshot) {
711-
// snapshots will mean we are starting from some arbitrary offset after last seen offset
712-
FutureAccepted
713709
} else if (!recordWithOffset.fromBacktracking) {
714710
logUnexpected()
715711
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
@@ -725,22 +721,20 @@ private[projection] class DynamoDBOffsetStore(
725721
} else if (seqNr == 1) {
726722
// always accept first event if no other event for that pid has been seen
727723
FutureAccepted
728-
} else if (recordWithOffset.fromSnapshot) {
729-
// always accept starting from snapshots when there was no previous event seen
730-
FutureAccepted
731724
} else {
732725
validateEventTimestamp(recordWithOffset)
733726
}
734727
} else {
735-
// strictSeqNr == false is for durable state where each revision might not be visible
728+
// strictSeqNr == false is for durable state, AllowSeqNrGapsMetadata, snapshots, or heartbeats
729+
// where each revision might not be visible
736730
val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
737731
val ok = seqNr > prevSeqNr
738732

739733
if (ok) {
740734
FutureAccepted
741735
} else {
742736
logger.trace(
743-
"{} Filtering out earlier revision [{}] for pid [{}], previous revision [{}]",
737+
"{} Filtering out duplicate sequence number [{}] for pid [{}], previous sequence number [{}]",
744738
logPrefix,
745739
seqNr,
746740
pid,
@@ -867,16 +861,18 @@ private[projection] class DynamoDBOffsetStore(
867861
case eventEnvelope: EventEnvelope[_] if eventEnvelope.offset.isInstanceOf[TimestampOffset] =>
868862
val timestampOffset = eventEnvelope.offset.asInstanceOf[TimestampOffset]
869863
val slice = persistenceExt.sliceForPersistenceId(eventEnvelope.persistenceId)
864+
val fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope)
865+
val fromHeartbeat = EnvelopeOrigin.fromHeartbeat(eventEnvelope)
870866
// Allow gaps if envelope has AllowSeqNrGapsMetadata
871-
val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty
867+
val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty &&
868+
!fromSnapshot && !fromHeartbeat
872869
Some(
873870
RecordWithOffset(
874871
Record(slice, eventEnvelope.persistenceId, eventEnvelope.sequenceNr, timestampOffset.timestamp),
875872
timestampOffset,
876873
strictSeqNr,
877874
fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope),
878-
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope),
879-
fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope)))
875+
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope)))
880876
case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
881877
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
882878
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
@@ -886,8 +882,7 @@ private[projection] class DynamoDBOffsetStore(
886882
timestampOffset,
887883
strictSeqNr = false,
888884
fromBacktracking = EnvelopeOrigin.fromBacktracking(change),
889-
fromPubSub = false,
890-
fromSnapshot = false))
885+
fromPubSub = false))
891886
case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
892887
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
893888
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
@@ -897,8 +892,7 @@ private[projection] class DynamoDBOffsetStore(
897892
timestampOffset,
898893
strictSeqNr = false,
899894
fromBacktracking = false,
900-
fromPubSub = false,
901-
fromSnapshot = false))
895+
fromPubSub = false))
902896
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
903897
// in case additional types are added
904898
throw new IllegalArgumentException(

akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,7 @@ private[projection] object DynamoDBProjectionImpl {
987987
else if (EnvelopeOrigin.fromPubSub(env)) "pubsub"
988988
else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking"
989989
else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot"
990+
else if (EnvelopeOrigin.fromHeartbeat(env)) "heartbeat"
990991
else env.source
991992
case _ => "unknown"
992993
}

akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,32 @@ class R2dbcTimestampOffsetStoreSpec
117117
filtered = true,
118118
env.source)
119119

120+
def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
121+
new EventEnvelope[String](
122+
env.offset,
123+
env.persistenceId,
124+
sequenceNr = env.sequenceNr,
125+
env.eventOption,
126+
env.timestamp,
127+
None,
128+
env.entityType,
129+
env.slice,
130+
filtered = false,
131+
source = EnvelopeOrigin.SourceSnapshot)
132+
133+
def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
134+
new EventEnvelope[String](
135+
env.offset,
136+
env.persistenceId,
137+
sequenceNr = env.sequenceNr,
138+
None,
139+
env.timestamp,
140+
None,
141+
env.entityType,
142+
env.slice,
143+
filtered = true,
144+
source = EnvelopeOrigin.SourceHeartbeat)
145+
120146
def createUpdatedDurableState(
121147
pid: Pid,
122148
revision: SeqNr,
@@ -628,6 +654,90 @@ class R2dbcTimestampOffsetStoreSpec
628654
offsetStore.validate(env2).futureValue shouldBe Accepted
629655
}
630656

657+
"accept snapshots" in {
658+
import R2dbcOffsetStore.Validation._
659+
val projectionId = genRandomProjectionId()
660+
val eventTimestampQueryClock = TestClock.nowMicros()
661+
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
662+
663+
val p1 = "p-08071" // slice 101
664+
665+
val startTime = TestClock.nowMicros().instant()
666+
667+
val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1"))
668+
offsetStore.validate(env1).futureValue shouldBe Accepted
669+
670+
// seqNr gap is allowed
671+
val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3"))
672+
offsetStore.validate(env2).futureValue shouldBe Accepted
673+
674+
offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
675+
// duplicate
676+
offsetStore.validate(env1).futureValue shouldBe Duplicate
677+
offsetStore.validate(env2).futureValue shouldBe Duplicate
678+
679+
offsetStore.readOffset[TimestampOffset]().futureValue
680+
offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr
681+
682+
val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10"))
683+
offsetStore.validate(env3).futureValue shouldBe Accepted
684+
offsetStore.addInflight(env3)
685+
offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr)
686+
offsetStore.validate(env3).futureValue shouldBe Duplicate
687+
688+
offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
689+
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr
690+
offsetStore.getInflight() shouldBe Map.empty
691+
offsetStore.readOffset[TimestampOffset]().futureValue
692+
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr
693+
694+
offsetStore.validate(env1).futureValue shouldBe Duplicate
695+
offsetStore.validate(env2).futureValue shouldBe Duplicate
696+
offsetStore.validate(env3).futureValue shouldBe Duplicate
697+
}
698+
699+
"accept heartbeats" in {
700+
import R2dbcOffsetStore.Validation._
701+
val projectionId = genRandomProjectionId()
702+
val eventTimestampQueryClock = TestClock.nowMicros()
703+
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)
704+
705+
val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359
706+
707+
val startTime = TestClock.nowMicros().instant()
708+
709+
val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), ""))
710+
offsetStore.validate(env1).futureValue shouldBe Accepted
711+
712+
// seqNr gap is allowed
713+
val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), ""))
714+
offsetStore.validate(env2).futureValue shouldBe Accepted
715+
716+
offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
717+
// duplicate
718+
offsetStore.validate(env1).futureValue shouldBe Duplicate
719+
offsetStore.validate(env2).futureValue shouldBe Duplicate
720+
721+
offsetStore.readOffset[TimestampOffset]().futureValue
722+
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr
723+
724+
val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), ""))
725+
offsetStore.validate(env3).futureValue shouldBe Accepted
726+
offsetStore.addInflight(env3)
727+
offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr)
728+
offsetStore.validate(env3).futureValue shouldBe Duplicate
729+
730+
offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
731+
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr
732+
offsetStore.getInflight() shouldBe Map.empty
733+
offsetStore.readOffset[TimestampOffset]().futureValue
734+
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr
735+
736+
offsetStore.validate(env1).futureValue shouldBe Duplicate
737+
offsetStore.validate(env2).futureValue shouldBe Duplicate
738+
offsetStore.validate(env3).futureValue shouldBe Duplicate
739+
}
740+
631741
"update inflight on error and re-accept element" in {
632742
import R2dbcOffsetStore.Validation._
633743
val projectionId = genRandomProjectionId()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# snapshots, heartbeats, internals
2+
ProblemFilters.exclude[Problem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset*")
3+
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$")

0 commit comments

Comments
 (0)