Skip to content

Commit 83850e0

Browse files
committed
Add CheckpointingSituations test for graceful stream interruption with terminal checkpointing
1 parent 01b0ecb commit 83850e0

File tree

2 files changed

+59
-1
lines changed

2 files changed

+59
-1
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointStrategy.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,22 @@ object NoCheckpointStrategy : CheckpointStrategy {
2323
@OptIn(ExperimentalAtomicApi::class)
2424
class FixedEventRateCheckpointStrategy(
2525
private val checkpoint: Long,
26+
val checkpointOnGracefulTermination: Boolean = true,
2627
) : CheckpointStrategy {
2728

2829
private val counter = AtomicLong(0)
2930

31+
override suspend fun tickGracefulTermination(): Boolean {
32+
return checkpointOnGracefulTermination
33+
}
34+
3035
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
3136
return counter.incrementAndFetch() % checkpoint == 0L
3237
}
3338
}
3439

3540
@Suppress("unused")
36-
class ManualCheckpointStrategy() : CheckpointStrategy {
41+
class ManualCheckpointStrategy : CheckpointStrategy {
3742

3843
private var shouldCheckpoint = false
3944

@@ -54,8 +59,13 @@ class ManualCheckpointStrategy() : CheckpointStrategy {
5459
@Suppress("unused")
5560
class FixedTimeRateCheckpointStrategy(
5661
private val rate: Duration,
62+
val checkpointOnGracefulTermination: Boolean = true,
5763
) : CheckpointStrategy {
5864

65+
override suspend fun tickGracefulTermination(): Boolean {
66+
return checkpointOnGracefulTermination
67+
}
68+
5969
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
6070
if (lastCheckpoint == null) return true
6171
val currentTime = Clock.System.now()
@@ -66,6 +76,7 @@ class FixedTimeRateCheckpointStrategy(
6676

6777
@Suppress("unused")
6878
object AlwaysCheckpointStrategy : CheckpointStrategy {
79+
6980
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
7081
return true
7182
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package dev.helight.krescent.situations
2+
3+
import dev.helight.krescent.bookstore.BookAddedEvent
4+
import dev.helight.krescent.bookstore.bookstoreEventCatalog
5+
import dev.helight.krescent.checkpoint.FixedEventRateCheckpointStrategy
6+
import dev.helight.krescent.checkpoint.impl.InMemoryCheckpointStorage
7+
import dev.helight.krescent.model.buildEventModel
8+
import dev.helight.krescent.source.impl.InMemoryEventStore
9+
import kotlinx.coroutines.async
10+
import kotlinx.coroutines.cancelAndJoin
11+
import kotlinx.coroutines.delay
12+
import kotlinx.coroutines.runBlocking
13+
import kotlin.test.Test
14+
import kotlin.test.assertEquals
15+
16+
class CheckpointingSituations {
17+
18+
@Test
19+
fun `Graceful stream interruption with terminal checkpointing`() = runBlocking {
20+
val store = InMemoryEventStore()
21+
val checkpoints = InMemoryCheckpointStorage()
22+
var counter = 0
23+
val job = async {
24+
store.buildEventModel("my-model", 1, bookstoreEventCatalog) {
25+
useCheckpoints(checkpoints, FixedEventRateCheckpointStrategy(100))
26+
handler {
27+
if (it is BookAddedEvent) counter++
28+
}
29+
}.stream()
30+
}
31+
store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1)))
32+
store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1)))
33+
delay(50)
34+
job.cancelAndJoin()
35+
store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1)))
36+
assertEquals(counter, 2)
37+
store.buildEventModel("my-model", 1, bookstoreEventCatalog) {
38+
useCheckpoints(checkpoints, FixedEventRateCheckpointStrategy(100))
39+
handler {
40+
if (it is BookAddedEvent) counter++
41+
}
42+
}.catchup()
43+
assertEquals(counter, 3)
44+
}
45+
46+
47+
}

0 commit comments

Comments
 (0)