Skip to content

Commit adba43d

Browse files
committed
Rename checkpoint strategy methods for clarity and consistency and add TerminationCheckpointStrategy
1 parent b07b67c commit adba43d

File tree

2 files changed

+28
-16
lines changed

2 files changed

+28
-16
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointStrategy.kt

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,31 +8,43 @@ import kotlin.concurrent.atomics.incrementAndFetch
88
import kotlin.time.Duration
99

1010
interface CheckpointStrategy {
11-
suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean
12-
suspend fun tickGracefulTermination(): Boolean = false
11+
suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean
12+
suspend fun afterTermination(): Boolean = false
1313
}
1414

1515
@Suppress("unused")
1616
object NoCheckpointStrategy : CheckpointStrategy {
17-
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
17+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
1818
return false
1919
}
2020
}
2121

22+
@Suppress("unused")
23+
object TerminationCheckpointStrategy : CheckpointStrategy {
24+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
25+
return false
26+
}
27+
28+
override suspend fun afterTermination(): Boolean {
29+
return true
30+
}
31+
}
32+
33+
2234
@Suppress("unused")
2335
@OptIn(ExperimentalAtomicApi::class)
2436
class FixedEventRateCheckpointStrategy(
2537
private val checkpoint: Long,
26-
val checkpointOnGracefulTermination: Boolean = true,
38+
val checkpointOnTermination: Boolean = true,
2739
) : CheckpointStrategy {
2840

2941
private val counter = AtomicLong(0)
3042

31-
override suspend fun tickGracefulTermination(): Boolean {
32-
return checkpointOnGracefulTermination
43+
override suspend fun afterTermination(): Boolean {
44+
return checkpointOnTermination
3345
}
3446

35-
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
47+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
3648
return counter.incrementAndFetch() % checkpoint == 0L
3749
}
3850
}
@@ -46,7 +58,7 @@ class ManualCheckpointStrategy : CheckpointStrategy {
4658
shouldCheckpoint = true
4759
}
4860

49-
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
61+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
5062
if (shouldCheckpoint) {
5163
shouldCheckpoint = false
5264
return true
@@ -59,14 +71,14 @@ class ManualCheckpointStrategy : CheckpointStrategy {
5971
@Suppress("unused")
6072
class FixedTimeRateCheckpointStrategy(
6173
private val rate: Duration,
62-
val checkpointOnGracefulTermination: Boolean = true,
74+
val checkpointOnTermination: Boolean = true,
6375
) : CheckpointStrategy {
6476

65-
override suspend fun tickGracefulTermination(): Boolean {
66-
return checkpointOnGracefulTermination
77+
override suspend fun afterTermination(): Boolean {
78+
return checkpointOnTermination
6779
}
6880

69-
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
81+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
7082
if (lastCheckpoint == null) return true
7183
val currentTime = Clock.System.now()
7284
val duration = currentTime - (lastCheckpoint.timestamp)
@@ -77,7 +89,7 @@ class FixedTimeRateCheckpointStrategy(
7789
@Suppress("unused")
7890
object AlwaysCheckpointStrategy : CheckpointStrategy {
7991

80-
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
92+
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
8193
return true
8294
}
8395
}

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointingEventSourceConsumer.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class CheckpointingEventSourceConsumer(
3737
position: StreamingToken<*>,
3838
) {
3939
consumer.process(message, position)
40-
val tickerResult = checkpointStrategy.tick(message, lastCheckpoint)
40+
val tickerResult = checkpointStrategy.afterMessage(message, lastCheckpoint)
4141
if (tickerResult) {
4242
val checkpoint = checkpoint(position)
4343
checkpointStorage.storeCheckpoint(checkpoint)
@@ -52,15 +52,15 @@ class CheckpointingEventSourceConsumer(
5252
})
5353
} catch (e: CancellationException) {
5454
withContext(NonCancellable) {
55-
if (checkpointStrategy.tickGracefulTermination() && lastPosition != null) {
55+
if (checkpointStrategy.afterTermination() && lastPosition != null) {
5656
val checkpoint = checkpoint(lastPosition)
5757
checkpointStorage.storeCheckpoint(checkpoint)
5858
}
5959
}
6060
throw e
6161
}
6262

63-
if (checkpointStrategy.tickGracefulTermination() && lastPosition != null) {
63+
if (checkpointStrategy.afterTermination() && lastPosition != null) {
6464
val checkpoint = checkpoint(lastPosition)
6565
checkpointStorage.storeCheckpoint(checkpoint)
6666
}

0 commit comments

Comments
 (0)