Skip to content

Commit 01b0ecb

Browse files
committed
Add cancellation handling in CheckpointingEventSourceConsumer for graceful termination
1 parent e47dd33 commit 01b0ecb

File tree

1 file changed

+29
-16
lines changed

1 file changed

+29
-16
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointingEventSourceConsumer.kt

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ import dev.helight.krescent.source.EventSourceConsumer
55
import dev.helight.krescent.source.EventSourcingStrategy
66
import dev.helight.krescent.source.StreamingEventSource
77
import dev.helight.krescent.source.StreamingToken
8+
import kotlinx.coroutines.CancellationException
9+
import kotlinx.coroutines.NonCancellable
10+
import kotlinx.coroutines.withContext
811
import kotlinx.datetime.Clock
912
import java.util.logging.Logger
1013

@@ -27,25 +30,35 @@ class CheckpointingEventSourceConsumer(
2730
var lastPosition: StreamingToken<*>? = lastCheckpoint?.position?.let {
2831
source.deserializeToken(it)
2932
}
30-
strategy.source(source, lastPosition, object : EventMessageStreamProcessor {
31-
override suspend fun process(
32-
message: EventMessage,
33-
position: StreamingToken<*>,
34-
) {
35-
consumer.process(message, position)
36-
val tickerResult = checkpointStrategy.tick(message, lastCheckpoint)
37-
if (tickerResult) {
38-
val checkpoint = checkpoint(position)
39-
checkpointStorage.storeCheckpoint(checkpoint)
40-
lastCheckpoint = checkpoint // TODO: Why is not used per lint? This isn't closed and should work
33+
try {
34+
strategy.source(source, lastPosition, object : EventMessageStreamProcessor {
35+
override suspend fun process(
36+
message: EventMessage,
37+
position: StreamingToken<*>,
38+
) {
39+
consumer.process(message, position)
40+
val tickerResult = checkpointStrategy.tick(message, lastCheckpoint)
41+
if (tickerResult) {
42+
val checkpoint = checkpoint(position)
43+
checkpointStorage.storeCheckpoint(checkpoint)
44+
lastCheckpoint = checkpoint // TODO: Why is not used per lint? This isn't closed and should work
45+
}
46+
lastPosition = position
4147
}
42-
lastPosition = position
43-
}
4448

45-
override suspend fun forwardSystemEvent(event: Event) {
46-
consumer.forwardSystemEvent(event)
49+
override suspend fun forwardSystemEvent(event: Event) {
50+
consumer.forwardSystemEvent(event)
51+
}
52+
})
53+
} catch (e: CancellationException) {
54+
withContext(NonCancellable) {
55+
if (checkpointStrategy.tickGracefulTermination() && lastPosition != null) {
56+
val checkpoint = checkpoint(lastPosition)
57+
checkpointStorage.storeCheckpoint(checkpoint)
58+
}
4759
}
48-
})
60+
throw e
61+
}
4962

5063
if (checkpointStrategy.tickGracefulTermination() && lastPosition != null) {
5164
val checkpoint = checkpoint(lastPosition)

0 commit comments

Comments
 (0)