Skip to content

Commit 4d94dcc

Browse files
committed
Refactor ReadModelBase extension methods for improved readability and return-type consistency.
1 parent 3d0d626 commit 4d94dcc

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

krescent-core/src/main/kotlin/dev/helight/krescent/model/ReadModelBase.kt

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
package dev.helight.krescent.model
22

33
import dev.helight.krescent.event.EventCatalog
4+
import dev.helight.krescent.event.SystemStreamRestoredEvent
5+
import dev.helight.krescent.model.EventModelBase.Extension.withConfiguration
46
import dev.helight.krescent.source.EventSourcingStrategy
57
import dev.helight.krescent.source.StoredEventSource
68
import dev.helight.krescent.source.StreamingEventSource
79
import dev.helight.krescent.source.UpgradingStreamingEventSource
810
import dev.helight.krescent.source.impl.InMemoryEventStore
9-
import dev.helight.krescent.source.strategy.CatchupSourcingStrategy
10-
import dev.helight.krescent.source.strategy.NoSourcingStrategy
11-
import dev.helight.krescent.source.strategy.StreamingSourcingStrategy
1211

1312
/**
1413
* Base class for read models that provides common extension functions.
@@ -30,14 +29,30 @@ abstract class ReadModelBase(
3029
model.strategy(strategy)
3130
}
3231

33-
suspend fun <M : ReadModelBase> M.catchup(source: StoredEventSource) =
34-
this.strategy(UpgradingStreamingEventSource(source), CatchupSourcingStrategy())
32+
suspend fun <M : ReadModelBase> M.catchup(source: StoredEventSource): M {
33+
val model = build(UpgradingStreamingEventSource(source))
34+
model.catchup()
35+
return this
36+
}
3537

36-
suspend fun <M : ReadModelBase> M.stream(source: StreamingEventSource) =
37-
this.strategy(source, StreamingSourcingStrategy())
38+
suspend fun <M : ReadModelBase> M.stream(source: StreamingEventSource) {
39+
val model = build(source)
40+
model.stream()
41+
}
3842

39-
suspend fun <M : ReadModelBase> M.restoreOnly() =
40-
this.strategy(InMemoryEventStore(), NoSourcingStrategy())
43+
suspend fun <M : ReadModelBase> M.restoreOnly(): M? {
44+
var hasRestored = false
45+
withConfiguration {
46+
registerProcessor {
47+
if (it is SystemStreamRestoredEvent) {
48+
hasRestored = true
49+
}
50+
}
51+
}
52+
val model = build(InMemoryEventStore())
53+
model.restore()
54+
return if (hasRestored) this else null
55+
}
4156

4257
}
4358
}

0 commit comments

Comments
 (0)