Skip to content

Commit e582f0f

Browse files
committed
Refactor BalancedReadModelJob to use only a single phase with callback to prevent unnecessary checkpointing roundtrips
1 parent 5bc3c50 commit e582f0f

File tree

2 files changed

+10
-13
lines changed

2 files changed

+10
-13
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJob.kt

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package dev.helight.krescent.supervisor
22

33
import dev.helight.krescent.model.ReadModelBase
4-
import dev.helight.krescent.model.ReadModelBase.Extension.catchup
4+
import dev.helight.krescent.model.ReadModelBase.Extension.strategy
55
import dev.helight.krescent.model.ReadModelBase.Extension.stream
66
import dev.helight.krescent.source.StreamingEventSource
7+
import dev.helight.krescent.source.strategy.StreamingSourcingStrategy
78
import org.slf4j.LoggerFactory
89
import kotlin.math.min
910
import kotlin.math.pow
@@ -46,19 +47,15 @@ class BalancedReadModelJob(
4647
}
4748

4849
override suspend fun run(supervisor: ModelSupervisor) {
49-
if (preventParallelCatchup) try {
50-
val source = sourceSupplier()
51-
val model = modelSupplier()
52-
logger.debug("BalancedReadModelJob starting catchup phase for {}.", model)
53-
model.catchup(source)
54-
} finally {
55-
supervisor.startupMutex.unlock(this)
56-
}
57-
5850
val source = sourceSupplier()
5951
val model = modelSupplier()
60-
logger.debug("BalancedReadModelJob starting streaming phase for {}.", model)
61-
model.stream(source)
52+
if (preventParallelCatchup) {
53+
model.strategy(source, StreamingSourcingStrategy {
54+
supervisor.startupMutex.unlock(this)
55+
})
56+
} else {
57+
model.stream(source)
58+
}
6259
}
6360

6461
override suspend fun onExited(supervisor: ModelSupervisor) {

krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJobTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class BalancedReadModelJobTest {
4343

4444
// Expect book 1 -> 9 copies, book 2 -> 5 copies (as per simulated stream)
4545
assertEquals(mapOf("1" to 9, "2" to 5), target)
46-
assertEquals(3, creationCounter)
46+
assertEquals(2, creationCounter)
4747
}
4848
}
4949

0 commit comments

Comments
 (0)