File tree Expand file tree Collapse file tree 1 file changed +9
-4
lines changed
krescent-core/src/main/kotlin/dev/helight/krescent/supervisor Expand file tree Collapse file tree 1 file changed +9
-4
lines changed Original file line number Diff line number Diff line change @@ -29,6 +29,7 @@ class BalancedReadModelJob(
2929 val sourceSupplier : () -> StreamingEventSource = {
3030 source ? : error("BalancedReadModelJob requires a source or sourceSupplier to be provided")
3131 },
32+ val preventParallelCatchup : Boolean = true ,
3233) : ModelJob {
3334
3435 private var currentAttempt = 0
@@ -40,20 +41,24 @@ class BalancedReadModelJob(
4041 }
4142
4243 override suspend fun onBefore (supervisor : ModelSupervisor ) {
43- supervisor.startupMutex.lock(this )
44+ if (preventParallelCatchup) supervisor.startupMutex.lock(this )
4445 lastStart = System .currentTimeMillis()
4546 }
4647
4748 override suspend fun run (supervisor : ModelSupervisor ) {
48- try {
49+ if (preventParallelCatchup) try {
4950 val source = sourceSupplier()
50- modelSupplier().catchup(source)
51+ val model = modelSupplier()
52+ logger.debug(" BalancedReadModelJob starting catchup phase for {}." , model)
53+ model.catchup(source)
5154 } finally {
5255 supervisor.startupMutex.unlock(this )
5356 }
5457
5558 val source = sourceSupplier()
56- modelSupplier().stream(source)
59+ val model = modelSupplier()
60+ logger.debug(" BalancedReadModelJob starting streaming phase for {}." , model)
61+ model.stream(source)
5762 }
5863
5964 override suspend fun onExited (supervisor : ModelSupervisor ) {
You can’t perform that action at this time.
0 commit comments