diff --git a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala index 13388e048..4e72f5bf8 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/akka/projection/dynamodb/DynamoDBTimestampOffsetStoreSpec.scala @@ -152,6 +152,32 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) filtered = true, env.source) + def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] = + new EventEnvelope[String]( + env.offset, + env.persistenceId, + sequenceNr = env.sequenceNr, + env.eventOption, + env.timestamp, + None, + env.entityType, + env.slice, + filtered = false, + source = EnvelopeOrigin.SourceSnapshot) + + def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] = + new EventEnvelope[String]( + env.offset, + env.persistenceId, + sequenceNr = env.sequenceNr, + None, + env.timestamp, + None, + env.entityType, + env.slice, + filtered = true, + source = EnvelopeOrigin.SourceHeartbeat) + def createUpdatedDurableState( pid: Pid, revision: SeqNr, @@ -678,6 +704,82 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config) offsetStore.getInflight() shouldBe Map("p1" -> 4L, "p3" -> 8L) } + "accept snapshots" in { + val projectionId = genRandomProjectionId() + val eventTimestampQueryClock = TestClock.nowMicros() + val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + + val p1 = "p-08071" // slice 101 + + val startTime = TestClock.nowMicros().instant() + + val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1")) + offsetStore.validate(env1).futureValue shouldBe Accepted + + // seqNr gap is allowed + val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3")) + offsetStore.validate(env2).futureValue shouldBe Accepted + + offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue + // duplicate + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + + offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr + + val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10")) + offsetStore.validate(env3).futureValue shouldBe Accepted + offsetStore.addInflight(env3) + offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr) + offsetStore.validate(env3).futureValue shouldBe Duplicate + + offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue + offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr + offsetStore.getInflight() shouldBe Map.empty + + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + offsetStore.validate(env3).futureValue shouldBe Duplicate + } + + "accept heartbeats" in { + val projectionId = genRandomProjectionId() + val eventTimestampQueryClock = TestClock.nowMicros() + val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + + val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359 + + val startTime = TestClock.nowMicros().instant() + + val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), "")) + offsetStore.validate(env1).futureValue shouldBe Accepted + + // seqNr gap is allowed + val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), "")) + offsetStore.validate(env2).futureValue shouldBe Accepted + + offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue + // duplicate + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + + offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr + + val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), "")) + offsetStore.validate(env3).futureValue shouldBe Accepted + offsetStore.addInflight(env3) + offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr) + offsetStore.validate(env3).futureValue shouldBe Duplicate + + offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue + offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr + offsetStore.getInflight() shouldBe Map.empty + + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + offsetStore.validate(env3).futureValue shouldBe Duplicate + } + "update inflight on error and re-accept element" in { val projectionId = genRandomProjectionId() val offsetStore = createOffsetStore(projectionId) diff --git a/akka-projection-dynamodb/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes b/akka-projection-dynamodb/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes new file mode 100644 index 000000000..e785dac60 --- /dev/null +++ b/akka-projection-dynamodb/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes @@ -0,0 +1,3 @@ +# snapshots, heartbeats, internals +ProblemFilters.exclude[Problem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore#RecordWithOffset*") +ProblemFilters.exclude[MissingTypesProblem]("akka.projection.dynamodb.internal.DynamoDBOffsetStore$RecordWithOffset$") diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala index c37e724f7..fc05461ca 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBOffsetStore.scala @@ -69,8 +69,7 @@ private[projection] object DynamoDBOffsetStore { offset: TimestampOffset, strictSeqNr: Boolean, fromBacktracking: Boolean, - fromPubSub: Boolean, - fromSnapshot: Boolean) + fromPubSub: Boolean) object State { val empty: State = State(Map.empty, Map.empty, Map.empty) @@ -707,9 +706,6 @@ private[projection] class DynamoDBOffsetStore( seqNr, pid) FutureDuplicate - } else if (recordWithOffset.fromSnapshot) { - // snapshots will mean we are starting from some arbitrary offset after last seen offset - FutureAccepted } else if (!recordWithOffset.fromBacktracking) { logUnexpected() // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled @@ -725,14 +721,12 @@ private[projection] class DynamoDBOffsetStore( } else if (seqNr == 1) { // always accept first event if no other event for that pid has been seen FutureAccepted - } else if (recordWithOffset.fromSnapshot) { - // always accept starting from snapshots when there was no previous event seen - FutureAccepted } else { validateEventTimestamp(recordWithOffset) } } else { - // strictSeqNr == false is for durable state where each revision might not be visible + // strictSeqNr == false is for durable state, AllowSeqNrGapsMetadata, snapshots, or heartbeats + // where each revision might not be visible val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) val ok = seqNr > prevSeqNr @@ -740,7 +734,7 @@ private[projection] class DynamoDBOffsetStore( FutureAccepted } else { logger.trace( - "{} Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", + "{} Filtering out duplicate sequence number [{}] for pid [{}], previous sequence number [{}]", logPrefix, seqNr, pid, @@ -867,16 +861,18 @@ private[projection] class DynamoDBOffsetStore( case eventEnvelope: EventEnvelope[_] if eventEnvelope.offset.isInstanceOf[TimestampOffset] => val timestampOffset = eventEnvelope.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(eventEnvelope.persistenceId) + val fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope) + val fromHeartbeat = EnvelopeOrigin.fromHeartbeat(eventEnvelope) // Allow gaps if envelope has AllowSeqNrGapsMetadata - val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty + val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty && + !fromSnapshot && !fromHeartbeat Some( RecordWithOffset( Record(slice, eventEnvelope.persistenceId, eventEnvelope.sequenceNr, timestampOffset.timestamp), timestampOffset, strictSeqNr, fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope), - fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope), - fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope))) + fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope))) case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -886,8 +882,7 @@ private[projection] class DynamoDBOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = EnvelopeOrigin.fromBacktracking(change), - fromPubSub = false, - fromSnapshot = false)) + fromPubSub = false)) case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -897,8 +892,7 @@ private[projection] class DynamoDBOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = false, - fromPubSub = false, - fromSnapshot = false)) + fromPubSub = false)) case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] => // in case additional types are added throw new IllegalArgumentException( diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala index 6d9d5496c..49360f2f2 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/DynamoDBProjectionImpl.scala @@ -987,6 +987,7 @@ private[projection] object DynamoDBProjectionImpl { else if (EnvelopeOrigin.fromPubSub(env)) "pubsub" else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking" else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot" + else if (EnvelopeOrigin.fromHeartbeat(env)) "heartbeat" else env.source case _ => "unknown" } diff --git a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala index 052c24db5..165aafa00 100644 --- a/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala +++ b/akka-projection-r2dbc-integration/src/test/scala/akka/projection/r2dbc/R2dbcTimestampOffsetStoreSpec.scala @@ -117,6 +117,32 @@ class R2dbcTimestampOffsetStoreSpec filtered = true, env.source) + def snapshotEnvelope(env: EventEnvelope[String]): EventEnvelope[String] = + new EventEnvelope[String]( + env.offset, + env.persistenceId, + sequenceNr = env.sequenceNr, + env.eventOption, + env.timestamp, + None, + env.entityType, + env.slice, + filtered = false, + source = EnvelopeOrigin.SourceSnapshot) + + def heartbeatEnvelope(env: EventEnvelope[String]): EventEnvelope[String] = + new EventEnvelope[String]( + env.offset, + env.persistenceId, + sequenceNr = env.sequenceNr, + None, + env.timestamp, + None, + env.entityType, + env.slice, + filtered = true, + source = EnvelopeOrigin.SourceHeartbeat) + def createUpdatedDurableState( pid: Pid, revision: SeqNr, @@ -628,6 +654,90 @@ class R2dbcTimestampOffsetStoreSpec offsetStore.validate(env2).futureValue shouldBe Accepted } + "accept snapshots" in { + import R2dbcOffsetStore.Validation._ + val projectionId = genRandomProjectionId() + val eventTimestampQueryClock = TestClock.nowMicros() + val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + + val p1 = "p-08071" // slice 101 + + val startTime = TestClock.nowMicros().instant() + + val env1 = snapshotEnvelope(createEnvelope(p1, 1L, startTime.plusMillis(1), "s1")) + offsetStore.validate(env1).futureValue shouldBe Accepted + + // seqNr gap is allowed + val env2 = snapshotEnvelope(createEnvelope(p1, 3L, startTime.plusMillis(2), "s3")) + offsetStore.validate(env2).futureValue shouldBe Accepted + + offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue + // duplicate + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + + offsetStore.readOffset[TimestampOffset]().futureValue + offsetStore.getState().byPid(p1).seqNr shouldBe env2.sequenceNr + + val env3 = snapshotEnvelope(createEnvelope(p1, 10L, startTime.plusMillis(3), "s10")) + offsetStore.validate(env3).futureValue shouldBe Accepted + offsetStore.addInflight(env3) + offsetStore.getInflight() shouldBe Map(p1 -> env3.sequenceNr) + offsetStore.validate(env3).futureValue shouldBe Duplicate + + offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue + offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr + offsetStore.getInflight() shouldBe Map.empty + offsetStore.readOffset[TimestampOffset]().futureValue + offsetStore.getState().byPid(p1).seqNr shouldBe env3.sequenceNr + + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + offsetStore.validate(env3).futureValue shouldBe Duplicate + } + + "accept heartbeats" in { + import R2dbcOffsetStore.Validation._ + val projectionId = genRandomProjectionId() + val eventTimestampQueryClock = TestClock.nowMicros() + val offsetStore = createOffsetStore(projectionId, eventTimestampQueryClock = eventTimestampQueryClock) + + val hbPid1 = "test-entity|_hb-26498e81-66ad-4aa9-b985-b34602c5f32d-1" // slice 359 + + val startTime = TestClock.nowMicros().instant() + + val env1 = heartbeatEnvelope(createEnvelope(hbPid1, 1L, startTime.plusMillis(1), "")) + offsetStore.validate(env1).futureValue shouldBe Accepted + + // seqNr gap is allowed + val env2 = heartbeatEnvelope(createEnvelope(hbPid1, 3L, startTime.plusMillis(2), "")) + offsetStore.validate(env2).futureValue shouldBe Accepted + + offsetStore.saveOffset(OffsetPidSeqNr(env2.offset, env2.persistenceId, env2.sequenceNr)).futureValue + // duplicate + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + + offsetStore.readOffset[TimestampOffset]().futureValue + offsetStore.getState().byPid(hbPid1).seqNr shouldBe env2.sequenceNr + + val env3 = heartbeatEnvelope(createEnvelope(hbPid1, 10L, startTime.plusMillis(3), "")) + offsetStore.validate(env3).futureValue shouldBe Accepted + offsetStore.addInflight(env3) + offsetStore.getInflight() shouldBe Map(hbPid1 -> env3.sequenceNr) + offsetStore.validate(env3).futureValue shouldBe Duplicate + + offsetStore.saveOffset(OffsetPidSeqNr(env3.offset, env3.persistenceId, env3.sequenceNr)).futureValue + offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr + offsetStore.getInflight() shouldBe Map.empty + offsetStore.readOffset[TimestampOffset]().futureValue + offsetStore.getState().byPid(hbPid1).seqNr shouldBe env3.sequenceNr + + offsetStore.validate(env1).futureValue shouldBe Duplicate + offsetStore.validate(env2).futureValue shouldBe Duplicate + offsetStore.validate(env3).futureValue shouldBe Duplicate + } + "update inflight on error and re-accept element" in { import R2dbcOffsetStore.Validation._ val projectionId = genRandomProjectionId() diff --git a/akka-projection-r2dbc/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes b/akka-projection-r2dbc/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes new file mode 100644 index 000000000..489e998ff --- /dev/null +++ b/akka-projection-r2dbc/src/main/mima-filters/1.6.19.backwards.excludes/heartbeat-source.excludes @@ -0,0 +1,3 @@ +# snapshots, heartbeats, internals +ProblemFilters.exclude[Problem]("akka.projection.r2dbc.internal.R2dbcOffsetStore#RecordWithOffset*") +ProblemFilters.exclude[MissingTypesProblem]("akka.projection.r2dbc.internal.R2dbcOffsetStore$RecordWithOffset$") diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 48ec3f825..a535c8903 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -77,8 +77,7 @@ private[projection] object R2dbcOffsetStore { offset: TimestampOffset, strictSeqNr: Boolean, fromBacktracking: Boolean, - fromPubSub: Boolean, - fromSnapshot: Boolean) + fromPubSub: Boolean) final case class RecordWithProjectionKey(record: Record, projectionKey: String) object State { @@ -934,9 +933,6 @@ private[projection] class R2dbcOffsetStore( seqNr, pid) FutureDuplicate - } else if (recordWithOffset.fromSnapshot) { - // snapshots will mean we are starting from some arbitrary offset after last seen offset - FutureAccepted } else if (!recordWithOffset.fromBacktracking) { // Rejected will trigger replay of missed events, if replay-on-rejected-sequence-numbers is enabled // and SourceProvider supports it. @@ -953,14 +949,12 @@ private[projection] class R2dbcOffsetStore( } else if (seqNr == 1) { // always accept first event if no other event for that pid has been seen FutureAccepted - } else if (recordWithOffset.fromSnapshot) { - // always accept starting from snapshots when there was no previous event seen - FutureAccepted } else { validateEventTimestamp(currentState, recordWithOffset) } } else { - // strictSeqNr == false is for durable state where each revision might not be visible + // strictSeqNr == false is for durable state, AllowSeqNrGapsMetadata, snapshots, or heartbeats + // where each revision might not be visible val prevSeqNr = currentInflight.getOrElse(pid, currentState.byPid.get(pid).map(_.seqNr).getOrElse(0L)) val ok = seqNr > prevSeqNr @@ -968,7 +962,7 @@ private[projection] class R2dbcOffsetStore( FutureAccepted } else { logger.trace( - "{} Filtering out earlier revision [{}] for pid [{}], previous revision [{}]", + "{} Filtering out duplicate sequence number [{}] for pid [{}], previous sequence number [{}]", logPrefix, seqNr, pid, @@ -1364,16 +1358,18 @@ private[projection] class R2dbcOffsetStore( case eventEnvelope: EventEnvelope[_] if eventEnvelope.offset.isInstanceOf[TimestampOffset] => val timestampOffset = eventEnvelope.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(eventEnvelope.persistenceId) + val fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope) + val fromHeartbeat = EnvelopeOrigin.fromHeartbeat(eventEnvelope) // Allow gaps if envelope has AllowSeqNrGapsMetadata - val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty + val strictSeqNr = eventEnvelope.metadata[AllowSeqNrGapsMetadata.type].isEmpty && + !fromSnapshot && !fromHeartbeat Some( RecordWithOffset( Record(slice, eventEnvelope.persistenceId, eventEnvelope.sequenceNr, timestampOffset.timestamp), timestampOffset, strictSeqNr, fromBacktracking = EnvelopeOrigin.fromBacktracking(eventEnvelope), - fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope), - fromSnapshot = EnvelopeOrigin.fromSnapshot(eventEnvelope))) + fromPubSub = EnvelopeOrigin.fromPubSub(eventEnvelope))) case change: UpdatedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -1383,8 +1379,7 @@ private[projection] class R2dbcOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = EnvelopeOrigin.fromBacktracking(change), - fromPubSub = false, - fromSnapshot = false)) + fromPubSub = false)) case change: DeletedDurableState[_] if change.offset.isInstanceOf[TimestampOffset] => val timestampOffset = change.offset.asInstanceOf[TimestampOffset] val slice = persistenceExt.sliceForPersistenceId(change.persistenceId) @@ -1394,8 +1389,7 @@ private[projection] class R2dbcOffsetStore( timestampOffset, strictSeqNr = false, fromBacktracking = false, - fromPubSub = false, - fromSnapshot = false)) + fromPubSub = false)) case change: DurableStateChange[_] if change.offset.isInstanceOf[TimestampOffset] => // in case additional types are added throw new IllegalArgumentException( diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 12fffcdfa..f2a9f06a2 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -1063,6 +1063,7 @@ private[projection] object R2dbcProjectionImpl { else if (EnvelopeOrigin.fromPubSub(env)) "pubsub" else if (EnvelopeOrigin.fromBacktracking(env)) "backtracking" else if (EnvelopeOrigin.fromSnapshot(env)) "snapshot" + else if (EnvelopeOrigin.fromHeartbeat(env)) "heartbeat" else env.source case _ => "unknown" }