Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,32 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
filtered = true,
env.source)

def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
new EventEnvelope[String](
env.offset,
env.persistenceId,
sequenceNr = env.sequenceNr,
env.eventOption,
env.timestamp,
None,
env.entityType,
env.slice,
filtered = false,
source = EnvelopeOrigin.SourceSnapshot)

def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
new EventEnvelope[String](
env.offset,
env.persistenceId,
sequenceNr = env.sequenceNr,
None,
env.timestamp,
None,
env.entityType,
env.slice,
filtered = true,
source = EnvelopeOrigin.SourceHeartbeat)

def createUpdatedDurableState(
pid: Pid,
revision: SeqNr,
Expand Down Expand Up @@ -678,6 +704,82 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L)
}

"accept snapshots" in {
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

val p1 = "p-08071" // slice 101

val startTime = TestClock.nowMicros().instant()

val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1"))
offsetStore.validate(env1).futureValue shouldBe Accepted

// seqNr gap is allowed
val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3"))
offsetStore.validate(env2).futureValue shouldBe Accepted

offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
// duplicate
offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate

offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr

val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10"))
offsetStore.validate(env3).futureValue shouldBe Accepted
offsetStore.addInflight(env3)
offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr)
offsetStore.validate(env3).futureValue shouldBe Duplicate

offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr
offsetStore.getInflight() shouldBe Map.empty

offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate
offsetStore.validate(env3).futureValue shouldBe Duplicate
}

"accept heartbeats" in {
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359

val startTime = TestClock.nowMicros().instant()

val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), ""))
offsetStore.validate(env1).futureValue shouldBe Accepted

// seqNr gap is allowed
val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), ""))
offsetStore.validate(env2).futureValue shouldBe Accepted

offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
// duplicate
offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate

offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr

val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), ""))
offsetStore.validate(env3).futureValue shouldBe Accepted
offsetStore.addInflight(env3)
offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr)
offsetStore.validate(env3).futureValue shouldBe Duplicate

offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr
offsetStore.getInflight() shouldBe Map.empty

offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate
offsetStore.validate(env3).futureValue shouldBe Duplicate
}

"update inflight on error and re-accept element" in {
val projectionId = genRandomProjectionId()
val offsetStore = createOffsetStore(projectionId)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# snapshots, heartbeats, internals
ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#RecordWithOffset*")
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore$RecordWithOffset$")
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ private[projection] object DynamoDBOffsetStore {
offset: TimestampOffset,
strictSeqNr: Boolean,
fromBacktracking: Boolean,
fromPubSub: Boolean,
fromSnapshot: Boolean)
fromPubSub: Boolean)

object State {
val empty: State = State(Map.empty, Map.empty, Map.empty)
Expand Down Expand Up @@ -707,9 +706,6 @@ private[projection] class DynamoDBOffsetStore(
seqNr,
pid)
FutureDuplicate
} else if (recordWithOffset.fromSnapshot) {
// snapshots will mean we are starting from some arbitrary offset after last seen offset
FutureAccepted
} else if (!recordWithOffset.fromBacktracking) {
logUnexpected()
// Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled
Expand All @@ -725,22 +721,20 @@ private[projection] class DynamoDBOffsetStore(
} else if (seqNr == 1) {
// always accept first event if no other event for that pid has been seen
FutureAccepted
} else if (recordWithOffset.fromSnapshot) {
// always accept starting from snapshots when there was no previous event seen
FutureAccepted
} else {
validateEventTimestamp(recordWithOffset)
}
} else {
// strictSeqNr == false is for durable state where each revision might not be visible
// strictSeqNr == false is for durable state, AllowSeqNrGapsMetadata, snapshots, or heartbeats
// where each revision might not be visible
val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L))
val ok = seqNr > prevSeqNr

if (ok) {
FutureAccepted
} else {
logger.trace(
"{} Filtering out earlier revision [{}] for pid [{}], previous revision [{}]",
"{} Filtering out duplicate sequence number [{}] for pid [{}], previous sequence number [{}]",
logPrefix,
seqNr,
pid,
Expand Down Expand Up @@ -867,16 +861,18 @@ private[projection] class DynamoDBOffsetStore(
case eventEnvelope: EventEnvelope[_] if eventEnvelope.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = eventEnvelope.offset.asInstanceOf[TimestampOffset]
val slice = persistenceExt.sliceForPersistenceId(eventEnvelope.persistenceId)
val fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope)
val fromHeartbeat = EnvelopeOrigin.fromHeartbeat(eventEnvelope)
// Allow gaps if envelope has AllowSeqNrGapsMetadata
val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty
val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty &&
!fromSnapshot && !fromHeartbeat
Some(
RecordWithOffset(
Record(slice, eventEnvelope.persistenceId, eventEnvelope.sequenceNr, timestampOffset.timestamp),
timestampOffset,
strictSeqNr,
fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope),
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope),
fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope)))
fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope)))
case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
Expand All @@ -886,8 +882,7 @@ private[projection] class DynamoDBOffsetStore(
timestampOffset,
strictSeqNr = false,
fromBacktracking = EnvelopeOrigin.fromBacktracking(change),
fromPubSub = false,
fromSnapshot = false))
fromPubSub = false))
case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] =>
val timestampOffset = change.offset.asInstanceOf[TimestampOffset]
val slice = persistenceExt.sliceForPersistenceId(change.persistenceId)
Expand All @@ -897,8 +892,7 @@ private[projection] class DynamoDBOffsetStore(
timestampOffset,
strictSeqNr = false,
fromBacktracking = false,
fromPubSub = false,
fromSnapshot = false))
fromPubSub = false))
case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] =>
// in case additional types are added
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,7 @@ private[projection] object DynamoDBProjectionImpl {
else if (EnvelopeOrigin.fromPubSub(env)) "pubsub"
else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking"
else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot"
else if (EnvelopeOrigin.fromHeartbeat(env)) "heartbeat"
else env.source
case _ => "unknown"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ class R2dbcTimestampOffsetStoreSpec
filtered = true,
env.source)

def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
new EventEnvelope[String](
env.offset,
env.persistenceId,
sequenceNr = env.sequenceNr,
env.eventOption,
env.timestamp,
None,
env.entityType,
env.slice,
filtered = false,
source = EnvelopeOrigin.SourceSnapshot)

def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] =
new EventEnvelope[String](
env.offset,
env.persistenceId,
sequenceNr = env.sequenceNr,
None,
env.timestamp,
None,
env.entityType,
env.slice,
filtered = true,
source = EnvelopeOrigin.SourceHeartbeat)

def createUpdatedDurableState(
pid: Pid,
revision: SeqNr,
Expand Down Expand Up @@ -628,6 +654,90 @@ class R2dbcTimestampOffsetStoreSpec
offsetStore.validate(env2).futureValue shouldBe Accepted
}

"accept snapshots" in {
import R2dbcOffsetStore.Validation._
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

val p1 = "p-08071" // slice 101

val startTime = TestClock.nowMicros().instant()

val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1"))
offsetStore.validate(env1).futureValue shouldBe Accepted

// seqNr gap is allowed
val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3"))
offsetStore.validate(env2).futureValue shouldBe Accepted

offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
// duplicate
offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate

offsetStore.readOffset[TimestampOffset]().futureValue
offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr

val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10"))
offsetStore.validate(env3).futureValue shouldBe Accepted
offsetStore.addInflight(env3)
offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr)
offsetStore.validate(env3).futureValue shouldBe Duplicate

offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr
offsetStore.getInflight() shouldBe Map.empty
offsetStore.readOffset[TimestampOffset]().futureValue
offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr

offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate
offsetStore.validate(env3).futureValue shouldBe Duplicate
}

"accept heartbeats" in {
import R2dbcOffsetStore.Validation._
val projectionId = genRandomProjectionId()
val eventTimestampQueryClock = TestClock.nowMicros()
val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock)

val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359

val startTime = TestClock.nowMicros().instant()

val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), ""))
offsetStore.validate(env1).futureValue shouldBe Accepted

// seqNr gap is allowed
val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), ""))
offsetStore.validate(env2).futureValue shouldBe Accepted

offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue
// duplicate
offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate

offsetStore.readOffset[TimestampOffset]().futureValue
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr

val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), ""))
offsetStore.validate(env3).futureValue shouldBe Accepted
offsetStore.addInflight(env3)
offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr)
offsetStore.validate(env3).futureValue shouldBe Duplicate

offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr
offsetStore.getInflight() shouldBe Map.empty
offsetStore.readOffset[TimestampOffset]().futureValue
offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr

offsetStore.validate(env1).futureValue shouldBe Duplicate
offsetStore.validate(env2).futureValue shouldBe Duplicate
offsetStore.validate(env3).futureValue shouldBe Duplicate
}

"update inflight on error and re-accept element" in {
import R2dbcOffsetStore.Validation._
val projectionId = genRandomProjectionId()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# snapshots, heartbeats, internals
ProblemFilters.exclude[Problem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset*")
ProblemFilters.exclude[MissingTypesProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$")
Loading
Loading