Skip to content

Commit 83e31cb

Browse files
committed
Add MinimizedCheckpointStrategy and refactor checkpointing logic in event sourcing
1 parent adba43d commit 83e31cb

File tree

7 files changed

+127
-50
lines changed

7 files changed

+127
-50
lines changed
Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,61 @@
11
package dev.helight.krescent.checkpoint
22

33
import dev.helight.krescent.event.EventMessage
4+
import dev.helight.krescent.source.StreamingToken
45
import kotlinx.datetime.Clock
56
import kotlin.concurrent.atomics.AtomicLong
67
import kotlin.concurrent.atomics.ExperimentalAtomicApi
78
import kotlin.concurrent.atomics.incrementAndFetch
89
import kotlin.time.Duration
910

1011
interface CheckpointStrategy {
11-
suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean
12-
suspend fun afterTermination(): Boolean = false
12+
suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean = false
13+
suspend fun afterTermination(context: CheckpointContext): Boolean = false
14+
suspend fun afterCallback(context: CheckpointContext): Boolean = false
1315
}
1416

17+
data class CheckpointContext(
18+
val lastCheckpoint: StoredCheckpoint?,
19+
val lastPosition: StreamingToken<*>?,
20+
)
21+
1522
@Suppress("unused")
16-
object NoCheckpointStrategy : CheckpointStrategy {
17-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
18-
return false
19-
}
20-
}
23+
object NoCheckpointStrategy : CheckpointStrategy
2124

2225
@Suppress("unused")
2326
object TerminationCheckpointStrategy : CheckpointStrategy {
24-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
27+
override suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean {
2528
return false
2629
}
2730

28-
override suspend fun afterTermination(): Boolean {
31+
override suspend fun afterTermination(context: CheckpointContext): Boolean {
2932
return true
3033
}
3134
}
3235

36+
@Suppress("unused")
37+
class MinimizedCheckpointStrategy : CheckpointStrategy {
38+
39+
private var lastPosition: StreamingToken<*>? = null
40+
41+
override suspend fun afterTermination(context: CheckpointContext): Boolean {
42+
return checkpointIfChanged(context)
43+
}
44+
45+
override suspend fun afterCallback(context: CheckpointContext): Boolean {
46+
return checkpointIfChanged(context)
47+
}
48+
49+
private fun checkpointIfChanged(context: CheckpointContext): Boolean {
50+
if (context.lastPosition == null) return false
51+
if (lastPosition == null) {
52+
lastPosition = context.lastPosition
53+
return true
54+
}
55+
return lastPosition!!.compareUnsafe(context.lastPosition) != 0
56+
}
57+
}
58+
3359

3460
@Suppress("unused")
3561
@OptIn(ExperimentalAtomicApi::class)
@@ -40,11 +66,11 @@ class FixedEventRateCheckpointStrategy(
4066

4167
private val counter = AtomicLong(0)
4268

43-
override suspend fun afterTermination(): Boolean {
69+
override suspend fun afterTermination(context: CheckpointContext): Boolean {
4470
return checkpointOnTermination
4571
}
4672

47-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
73+
override suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean {
4874
return counter.incrementAndFetch() % checkpoint == 0L
4975
}
5076
}
@@ -58,7 +84,7 @@ class ManualCheckpointStrategy : CheckpointStrategy {
5884
shouldCheckpoint = true
5985
}
6086

61-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
87+
override suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean {
6288
if (shouldCheckpoint) {
6389
shouldCheckpoint = false
6490
return true
@@ -74,22 +100,22 @@ class FixedTimeRateCheckpointStrategy(
74100
val checkpointOnTermination: Boolean = true,
75101
) : CheckpointStrategy {
76102

77-
override suspend fun afterTermination(): Boolean {
103+
override suspend fun afterTermination(context: CheckpointContext): Boolean {
78104
return checkpointOnTermination
79105
}
80106

81-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
82-
if (lastCheckpoint == null) return true
107+
override suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean {
108+
if (context.lastCheckpoint == null) return true
83109
val currentTime = Clock.System.now()
84-
val duration = currentTime - (lastCheckpoint.timestamp)
110+
val duration = currentTime - (context.lastCheckpoint.timestamp)
85111
return duration >= rate
86112
}
87113
}
88114

89115
@Suppress("unused")
90116
object AlwaysCheckpointStrategy : CheckpointStrategy {
91117

92-
override suspend fun afterMessage(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
118+
override suspend fun afterMessage(eventMessage: EventMessage, context: CheckpointContext): Boolean {
93119
return true
94120
}
95121
}

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointingEventSourceConsumer.kt

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package dev.helight.krescent.checkpoint
22

33
import dev.helight.krescent.event.*
4-
import dev.helight.krescent.source.EventSourceConsumer
5-
import dev.helight.krescent.source.EventSourcingStrategy
6-
import dev.helight.krescent.source.StreamingEventSource
7-
import dev.helight.krescent.source.StreamingToken
4+
import dev.helight.krescent.source.*
85
import kotlinx.coroutines.CancellationException
96
import kotlinx.coroutines.NonCancellable
107
import kotlinx.coroutines.withContext
@@ -23,47 +20,51 @@ class CheckpointingEventSourceConsumer(
2320
) : EventSourceConsumer {
2421

2522
private val logger = LoggerFactory.getLogger(CheckpointingEventSourceConsumer::class.java)
23+
private var lastCheckpoint: StoredCheckpoint? = null
24+
private var lastPosition: StreamingToken<*>? = null
25+
private val context: CheckpointContext
26+
get() = CheckpointContext(
27+
lastCheckpoint = lastCheckpoint,
28+
lastPosition = lastPosition
29+
)
2630

2731
@Suppress("UNCHECKED_CAST")
2832
override suspend fun strategy(strategy: EventSourcingStrategy) {
29-
var lastCheckpoint = loadLastCheckpoint()
30-
var lastPosition: StreamingToken<*>? = lastCheckpoint?.position?.let {
31-
source.deserializeToken(it)
33+
loadLastCheckpoint()
34+
if (strategy is CallbackEventSourcingStrategy) {
35+
strategy.addThenChain {
36+
if (checkpointStrategy.afterCallback(context)) tryCheckpoint()
37+
}
3238
}
39+
3340
try {
3441
strategy.source(source, lastPosition, object : EventMessageStreamProcessor {
3542
override suspend fun process(
3643
message: EventMessage,
3744
position: StreamingToken<*>,
3845
) {
3946
consumer.process(message, position)
40-
val tickerResult = checkpointStrategy.afterMessage(message, lastCheckpoint)
41-
if (tickerResult) {
42-
val checkpoint = checkpoint(position)
43-
checkpointStorage.storeCheckpoint(checkpoint)
44-
lastCheckpoint = checkpoint // TODO: Why is not used per lint? This isn't closed and should work
45-
}
4647
lastPosition = position
48+
if (checkpointStrategy.afterMessage(message, context)) tryCheckpoint()
4749
}
4850

4951
override suspend fun forwardSystemEvent(event: Event) {
5052
consumer.forwardSystemEvent(event)
5153
}
5254
})
5355
} catch (e: CancellationException) {
54-
withContext(NonCancellable) {
55-
if (checkpointStrategy.afterTermination() && lastPosition != null) {
56-
val checkpoint = checkpoint(lastPosition)
57-
checkpointStorage.storeCheckpoint(checkpoint)
58-
}
59-
}
56+
withContext(NonCancellable) { if (checkpointStrategy.afterTermination(context)) tryCheckpoint() }
6057
throw e
6158
}
6259

63-
if (checkpointStrategy.afterTermination() && lastPosition != null) {
64-
val checkpoint = checkpoint(lastPosition)
65-
checkpointStorage.storeCheckpoint(checkpoint)
66-
}
60+
if (checkpointStrategy.afterTermination(context)) tryCheckpoint()
61+
}
62+
63+
private suspend fun tryCheckpoint() {
64+
if (lastPosition == null) return
65+
val checkpoint = checkpoint(lastPosition!!)
66+
checkpointStorage.storeCheckpoint(checkpoint)
67+
lastCheckpoint = checkpoint
6768
}
6869

6970
private suspend fun loadLastCheckpoint(): StoredCheckpoint? {
@@ -90,6 +91,8 @@ class CheckpointingEventSourceConsumer(
9091
logger.info("No valid checkpoint found for namespace '$namespace', starting from scratch")
9192
consumer.forwardSystemEvent(SystemStreamHeadEvent)
9293
}
94+
this.lastCheckpoint = lastCheckpoint
95+
this.lastPosition = lastCheckpoint?.position?.let { source.deserializeToken(it) }
9396
return lastCheckpoint
9497
}
9598

krescent-core/src/main/kotlin/dev/helight/krescent/model/EventModelBuilder.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package dev.helight.krescent.model
22

3-
import dev.helight.krescent.checkpoint.CheckpointStorage
4-
import dev.helight.krescent.checkpoint.CheckpointStrategy
5-
import dev.helight.krescent.checkpoint.CheckpointSupport
6-
import dev.helight.krescent.checkpoint.CheckpointingEventSourceConsumer
3+
import dev.helight.krescent.checkpoint.*
74
import dev.helight.krescent.event.EventCatalog
85
import dev.helight.krescent.event.EventStreamProcessor
96
import dev.helight.krescent.event.VirtualEventIngest
@@ -105,7 +102,7 @@ class EventModelBuilder(
105102
*/
106103
fun useCheckpoints(
107104
checkpointStorage: CheckpointStorage,
108-
strategy: CheckpointStrategy,
105+
strategy: CheckpointStrategy = MinimizedCheckpointStrategy(),
109106
) {
110107
checkpointConfig = CheckpointConfiguration(checkpointStorage, strategy)
111108
}

krescent-core/src/main/kotlin/dev/helight/krescent/source/EventSourcingStrategy.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,16 @@ interface EventSourcingStrategy {
1515
) = consumer.process(this.first, this.second)
1616
}
1717

18-
interface WriteCompatibleEventSourcingStrategy {
18+
interface CallbackEventSourcingStrategy {
1919
var then: suspend () -> Unit
20-
}
20+
21+
fun addThenChain(callback: suspend () -> Unit) {
22+
val previous = then
23+
then = {
24+
previous()
25+
callback()
26+
}
27+
}
28+
}
29+
30+
interface WriteCompatibleEventSourcingStrategy : CallbackEventSourcingStrategy

krescent-core/src/main/kotlin/dev/helight/krescent/source/StreamingToken.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,7 @@ interface StreamingToken<SELF : StreamingToken<SELF>> : Comparable<SELF> {
1111
* @return A string representing the serialized token.
1212
*/
1313
fun serialize(): String
14+
15+
@Suppress("UNCHECKED_CAST")
16+
fun compareUnsafe(other: StreamingToken<*>) = this.compareTo(other as SELF)
1417
}

krescent-core/src/main/kotlin/dev/helight/krescent/source/strategy/StreamingSourcingStrategy.kt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.helight.krescent.source.strategy
22

33
import dev.helight.krescent.event.*
4+
import dev.helight.krescent.source.CallbackEventSourcingStrategy
45
import dev.helight.krescent.source.EventSourcingStrategy
56
import dev.helight.krescent.source.StreamingEventSource
67
import dev.helight.krescent.source.StreamingToken
@@ -21,8 +22,8 @@ import dev.helight.krescent.source.StreamingToken
2122
* - [SystemHintEndTransactionEvent] at the end of the catch-up phase, after the live stream has started.
2223
*/
2324
class StreamingSourcingStrategy(
24-
val afterCatchup: suspend () -> Unit = {},
25-
) : EventSourcingStrategy {
25+
override var then: suspend () -> Unit = {},
26+
) : EventSourcingStrategy, CallbackEventSourcingStrategy {
2627
override suspend fun source(
2728
source: StreamingEventSource,
2829
startToken: StreamingToken<*>?,
@@ -39,8 +40,8 @@ class StreamingSourcingStrategy(
3940
consumer.forwardSystemEvent(SystemStreamCaughtUpEvent)
4041
} finally {
4142
consumer.forwardSystemEvent(SystemHintEndTransactionEvent)
42-
afterCatchup()
4343
}
44+
then()
4445

4546
// Begin streaming events until interrupted
4647
source.streamEvents(lastToken).collect {

krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookCountReadModelTest.kt

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package dev.helight.krescent.bookstore
22

33
import dev.helight.krescent.checkpoint.AlwaysCheckpointStrategy
4+
import dev.helight.krescent.checkpoint.MinimizedCheckpointStrategy
45
import dev.helight.krescent.checkpoint.impl.InMemoryCheckpointStorage
56
import dev.helight.krescent.event.Event
67
import dev.helight.krescent.model.EventModelBase.Extension.withConfiguration
78
import dev.helight.krescent.model.ReadModelBase.Extension.catchup
89
import dev.helight.krescent.model.ReadModelBase.Extension.restoreOnly
10+
import dev.helight.krescent.model.ReadModelBase.Extension.stream
911
import dev.helight.krescent.model.ReducingReadModel
1012
import dev.helight.krescent.source.impl.InMemoryEventStore
13+
import kotlinx.coroutines.async
14+
import kotlinx.coroutines.cancelAndJoin
15+
import kotlinx.coroutines.delay
1116
import kotlinx.coroutines.runBlocking
1217
import kotlinx.serialization.Serializable
1318
import java.util.concurrent.atomic.AtomicInteger
@@ -47,6 +52,38 @@ class BookCountReadModelTest {
4752
assertNotNull(late)
4853
assertEquals(9, late.target["1"])
4954
}
55+
56+
@Test
57+
fun `Test minimized checkpointing`(): Unit = runBlocking {
58+
val source = InMemoryEventStore()
59+
val checkpointStorage = InMemoryCheckpointStorage()
60+
source.publishAll(bookstoreSimulatedEventStream)
61+
62+
val initial = BooksAvailableReadModel().withConfiguration {
63+
useCheckpoints(checkpointStorage, MinimizedCheckpointStrategy())
64+
}.restoreOnly()
65+
assertNull(initial)
66+
assertNull(checkpointStorage.getLatestCheckpoint("books.counts"))
67+
68+
// Check if it happens at termination / catchup
69+
val caughtUp = BooksAvailableReadModel().withConfiguration {
70+
useCheckpoints(checkpointStorage, MinimizedCheckpointStrategy())
71+
}.catchup(source)
72+
assertEquals(9, caughtUp.target["1"])
73+
assertNotNull(checkpointStorage.getLatestCheckpoint("books.counts"))
74+
75+
// Check if it happens at the callback as well
76+
checkpointStorage.clearCheckpoints()
77+
assertNull(checkpointStorage.getLatestCheckpoint("books.counts"))
78+
val job = async {
79+
BooksAvailableReadModel().withConfiguration {
80+
useCheckpoints(checkpointStorage, MinimizedCheckpointStrategy())
81+
}.stream(source)
82+
}
83+
delay(200)
84+
assertNotNull(checkpointStorage.getLatestCheckpoint("books.counts"))
85+
job.cancelAndJoin()
86+
}
5087
}
5188

5289
class BooksAvailableReadModel(

0 commit comments

Comments
 (0)