@@ -6,6 +6,7 @@ import dev.helight.krescent.source.EventSourcingStrategy
66import dev.helight.krescent.source.StreamingEventSource
77import dev.helight.krescent.source.StreamingToken
88import kotlinx.datetime.Clock
9+ import java.util.logging.Logger
910
1011class CheckpointingEventSourceConsumer (
1112 val namespace : String ,
@@ -18,6 +19,8 @@ class CheckpointingEventSourceConsumer(
1819 val rebuildOnInvalidCheckpoint : Boolean = true ,
1920) : EventSourceConsumer {
2021
22+ private val logger = Logger .getLogger(" CheckpointingEventSourceConsumer" )
23+
2124 @Suppress(" UNCHECKED_CAST" )
2225 override suspend fun strategy (strategy : EventSourcingStrategy ) {
2326 var lastCheckpoint = loadLastCheckpoint()
@@ -55,18 +58,23 @@ class CheckpointingEventSourceConsumer(
5558
5659 // Invalidate outdated checkpoints
5760 if (lastCheckpoint != null && lastCheckpoint.version != version) {
61+ logger.info(" Checkpoint version mismatch, expected '$version ' but found '${lastCheckpoint.version} '" )
5862 lastCheckpoint = null
63+ if (! rebuildOnInvalidCheckpoint) throw CheckpointValidationException ()
5964 }
6065
6166 if (lastCheckpoint != null && ! validateCheckpoint(lastCheckpoint)) {
67+ logger.info(" Checkpoint validation failed for namespace '$namespace '" )
6268 lastCheckpoint = null
6369 if (! rebuildOnInvalidCheckpoint) throw CheckpointValidationException ()
6470 }
6571
6672 if (lastCheckpoint != null ) {
73+ logger.info(" Valid checkpoint found for namespace '$namespace ', restoring state" )
6774 loadCheckpoint(lastCheckpoint)
6875 consumer.forwardSystemEvent(SystemStreamRestoredEvent )
6976 } else {
77+ logger.info(" No valid checkpoint found for namespace '$namespace ', starting from scratch" )
7078 consumer.forwardSystemEvent(SystemStreamHeadEvent )
7179 }
7280 return lastCheckpoint
0 commit comments