Skip to content

Commit 5bc3c50

Browse files
committed
Add afterCatchup callback to StreamingSourcingStrategy to execute post-catchup actions
1 parent 4df2e13 commit 5bc3c50

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/source/strategy/StreamingSourcingStrategy.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ import dev.helight.krescent.source.StreamingToken
2020
* - [SystemHintBeginTransactionEvent] at the start of the catch-up phase.
2121
* - [SystemHintEndTransactionEvent] at the end of the catch-up phase, after the live stream has started.
2222
*/
23-
class StreamingSourcingStrategy: EventSourcingStrategy {
23+
class StreamingSourcingStrategy(
24+
val afterCatchup: suspend () -> Unit = {},
25+
) : EventSourcingStrategy {
2426
override suspend fun source(
2527
source: StreamingEventSource,
2628
startToken: StreamingToken<*>?,
@@ -37,9 +39,9 @@ class StreamingSourcingStrategy: EventSourcingStrategy {
3739
consumer.forwardSystemEvent(SystemStreamCaughtUpEvent)
3840
} finally {
3941
consumer.forwardSystemEvent(SystemHintEndTransactionEvent)
42+
afterCatchup()
4043
}
4144

42-
4345
// Begin streaming events until interrupted
4446
source.streamEvents(lastToken).collect {
4547
it.forwardTo(consumer)

0 commit comments

Comments
 (0)