Skip to content

Commit e9c061b

Browse files
committed
Refactor event processing and add bookstore read models
Refactored event handling architecture by restructuring packages, introducing dedicated processor classes, and implementing in-memory checkpoint storage. Added services for building read models, including simulation tools for bookstore event streams and their processing, enabling efficient projection and testing scenarios.
1 parent e6653fa commit e9c061b

File tree

46 files changed

+1085
-313
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1085
-313
lines changed

.idea/dictionaries/project.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.md

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
# krescent
22

3-
This project uses [Gradle](https://gradle.org/).
4-
To build and run the application, use the *Gradle* tool window by clicking the Gradle icon in the right-hand toolbar,
5-
or run it directly from the terminal:
3+
Krescent is a Kotlin library for working with event-sourced systems in an extendable and easy to use and
4+
slightly opinionated manner.
65

7-
* Run `./gradlew run` to build and run the application.
8-
* Run `./gradlew build` to only build the application.
9-
* Run `./gradlew check` to run all checks, including tests.
10-
* Run `./gradlew clean` to clean all build outputs.
6+
The following event sources and projector targets are supported:
117

12-
Note the usage of the Gradle Wrapper (`./gradlew`).
13-
This is the suggested way to use Gradle in production projects.
8+
- KurrentDB (Event Source)
9+
- MongoDB (Projection, CheckpointStore)
1410

15-
[Learn more about the Gradle Wrapper](https://docs.gradle.org/current/userguide/gradle_wrapper.html).
11+
## Installation
1612

17-
[Learn more about Gradle tasks](https://docs.gradle.org/current/userguide/command_line_interface.html#common_tasks).
13+
Add jitpack as a repository in your `build.gradle.kts` file:
1814

19-
This project follows the suggested multi-module setup and consists of the `app` and `utils` subprojects.
20-
The shared build logic was extracted to a convention plugin located in `buildSrc`.
15+
```kotlin
16+
repositories {
17+
maven { url = uri("https://jitpack.io") }
18+
}
19+
```
2120

22-
This project uses a version catalog (see `gradle/libs.versions.toml`) to declare and version dependencies
23-
and both a build cache and a configuration cache (see `gradle.properties`).
21+
Then, add the dependency to your `build.gradle.kts` file:
22+
23+
```kotlin
24+
dependencies {
25+
implementation("com.github.helightdev.krescent:krescent-core:main-SNAPSHOT")
26+
}
27+
```

krescent-core/src/main/kotlin/dev/helight/krescent/EventMessageStreamProcessor.kt

Lines changed: 0 additions & 43 deletions
This file was deleted.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package dev.helight.krescent
2+
3+
import java.util.function.Consumer
4+
5+
interface HandlerChainParticipant {
6+
7+
fun accept(visitor: Consumer<HandlerChainParticipant>) {
8+
visitor.accept(this)
9+
}
10+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package dev.helight.krescent.checkpoint
2+
3+
import kotlinx.serialization.json.JsonElement
4+
import kotlinx.serialization.json.JsonObject
5+
6+
class CheckpointBucket(
7+
val buffer: MutableMap<String, JsonElement>,
8+
) {
9+
10+
operator fun set(key: String, value: JsonElement) {
11+
buffer.put(key, value)
12+
}
13+
14+
operator fun get(key: String): JsonElement? {
15+
return buffer[key]
16+
}
17+
18+
19+
fun buildJsonObject(): JsonElement = JsonObject(buffer)
20+
21+
override fun toString(): String = "CheckpointBucket{buffer=$buffer}"
22+
23+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package dev.helight.krescent.checkpoint
2+
3+
interface CheckpointStorage {
4+
suspend fun storeCheckpoint(checkpoint: StoredCheckpoint)
5+
suspend fun getLatestCheckpoint(namespace: String): StoredCheckpoint?
6+
suspend fun clearCheckpoints()
7+
}

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoints/CheckpointStrategy.kt renamed to krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointStrategy.kt

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
package dev.helight.krescent.checkpoints
1+
package dev.helight.krescent.checkpoint
22

3-
import dev.helight.krescent.EventMessage
3+
import dev.helight.krescent.event.EventMessage
44
import java.time.Instant
55
import kotlin.concurrent.atomics.AtomicLong
66
import kotlin.concurrent.atomics.ExperimentalAtomicApi
@@ -11,12 +11,14 @@ interface CheckpointStrategy {
1111
suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean
1212
}
1313

14-
class NoCheckpointStrategy : CheckpointStrategy {
14+
@Suppress("unused")
15+
object NoCheckpointStrategy : CheckpointStrategy {
1516
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
1617
return false
1718
}
1819
}
1920

21+
@Suppress("unused")
2022
@OptIn(ExperimentalAtomicApi::class)
2123
class FixedEventRateCheckpointStrategy(
2224
private val checkpoint: Long,
@@ -29,8 +31,28 @@ class FixedEventRateCheckpointStrategy(
2931
}
3032
}
3133

34+
@Suppress("unused")
35+
class ManualCheckpointStrategy() : CheckpointStrategy {
36+
37+
private var shouldCheckpoint = false
38+
39+
fun mark() {
40+
shouldCheckpoint = true
41+
}
42+
43+
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
44+
if (shouldCheckpoint) {
45+
shouldCheckpoint = false
46+
return true
47+
}
48+
return false
49+
}
50+
51+
}
52+
53+
@Suppress("unused")
3254
class FixedTimeRateCheckpointStrategy(
33-
private val rate: Duration
55+
private val rate: Duration,
3456
) : CheckpointStrategy {
3557

3658
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
@@ -39,4 +61,11 @@ class FixedTimeRateCheckpointStrategy(
3961
val duration = currentTime.minusMillis(lastCheckpoint.timestamp.toEpochMilli()).toEpochMilli()
4062
return duration >= rate.inWholeMilliseconds
4163
}
64+
}
65+
66+
@Suppress("unused")
67+
object AlwaysCheckpointStrategy : CheckpointStrategy {
68+
override suspend fun tick(eventMessage: EventMessage, lastCheckpoint: StoredCheckpoint?): Boolean {
69+
return true
70+
}
4271
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package dev.helight.krescent.checkpoint
2+
3+
import dev.helight.krescent.HandlerChainParticipant
4+
5+
interface CheckpointSupport {
6+
suspend fun createCheckpoint(bucket: CheckpointBucket)
7+
suspend fun restoreCheckpoint(bucket: CheckpointBucket)
8+
9+
companion object {
10+
suspend fun storagePass(participant: HandlerChainParticipant, bucket: CheckpointBucket) {
11+
if (participant is CheckpointSupport) {
12+
participant.createCheckpoint(bucket)
13+
}
14+
}
15+
16+
suspend fun restorePass(participant: HandlerChainParticipant, bucket: CheckpointBucket) {
17+
if (participant is CheckpointSupport) {
18+
participant.restoreCheckpoint(bucket)
19+
}
20+
}
21+
}
22+
}

krescent-core/src/main/kotlin/dev/helight/krescent/checkpoints/CheckpointingEventSourceConsumer.kt renamed to krescent-core/src/main/kotlin/dev/helight/krescent/checkpoint/CheckpointingEventSourceConsumer.kt

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,25 @@
1-
package dev.helight.krescent.checkpoints
1+
package dev.helight.krescent.checkpoint
22

3-
import dev.helight.krescent.EventMessage
4-
import dev.helight.krescent.EventMessageStreamProcessor
5-
import dev.helight.krescent.StreamingEventSource
6-
import dev.helight.krescent.StreamingToken
7-
import dev.helight.krescent.event.EventSourceConsumer
3+
import dev.helight.krescent.event.EventMessage
4+
import dev.helight.krescent.event.EventMessageStreamProcessor
5+
import dev.helight.krescent.event.SystemStreamHeadEvent
6+
import dev.helight.krescent.event.SystemStreamRestoredEvent
7+
import dev.helight.krescent.source.EventSourceConsumer
8+
import dev.helight.krescent.source.StreamingEventSource
9+
import dev.helight.krescent.source.StreamingToken
810
import kotlinx.coroutines.flow.Flow
11+
import kotlinx.coroutines.flow.emptyFlow
912
import kotlinx.serialization.json.jsonObject
1013
import java.time.Instant
1114

12-
class CheckpointingEventSourceConsumer<T: StreamingToken<T>> (
15+
class CheckpointingEventSourceConsumer<T : StreamingToken<T>>(
1316
val namespace: String,
14-
val revision: Long,
17+
val revision: Int,
1518
val strategy: CheckpointStrategy,
1619
val source: StreamingEventSource<T>,
1720
val checkpointStorage: CheckpointStorage,
18-
val consumer: EventMessageStreamProcessor
21+
val additionalCheckpoints: List<CheckpointSupport>,
22+
val consumer: EventMessageStreamProcessor,
1923
) : EventSourceConsumer {
2024

2125
override suspend fun stream() {
@@ -34,6 +38,12 @@ class CheckpointingEventSourceConsumer<T: StreamingToken<T>> (
3438
handlerLoop(lastCheckpoint, flow)
3539
}
3640

41+
override suspend fun restore() {
42+
val lastCheckpoint = loadLastCheckpoint()
43+
handlerLoop(lastCheckpoint, emptyFlow())
44+
45+
}
46+
3747
private suspend fun loadLastCheckpoint(): StoredCheckpoint? {
3848
var lastCheckpoint = checkpointStorage.getLatestCheckpoint(namespace)
3949

@@ -44,6 +54,9 @@ class CheckpointingEventSourceConsumer<T: StreamingToken<T>> (
4454

4555
if (lastCheckpoint != null) {
4656
load(lastCheckpoint)
57+
consumer.forwardSystemEvent(SystemStreamRestoredEvent())
58+
} else {
59+
consumer.forwardSystemEvent(SystemStreamHeadEvent())
4760
}
4861
return lastCheckpoint
4962
}
@@ -65,6 +78,7 @@ class CheckpointingEventSourceConsumer<T: StreamingToken<T>> (
6578
private suspend fun checkpoint(position: T): StoredCheckpoint {
6679
val bucket = CheckpointBucket(mutableMapOf())
6780
CheckpointSupport.storagePass(consumer, bucket)
81+
additionalCheckpoints.forEach { it.createCheckpoint(bucket) }
6882
return StoredCheckpoint(
6983
namespace = namespace,
7084
revision = revision,
@@ -76,6 +90,7 @@ class CheckpointingEventSourceConsumer<T: StreamingToken<T>> (
7690

7791
private suspend fun load(storedCheckpoint: StoredCheckpoint) {
7892
val bucket = CheckpointBucket(storedCheckpoint.data.jsonObject.toMutableMap())
93+
additionalCheckpoints.forEach { it.restoreCheckpoint(bucket) }
7994
CheckpointSupport.restorePass(consumer, bucket)
8095
}
8196

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package dev.helight.krescent.checkpoint
2+
3+
import dev.helight.krescent.serialization.InstantSerializer
4+
import kotlinx.serialization.Serializable
5+
import kotlinx.serialization.json.JsonElement
6+
import java.time.Instant
7+
8+
@Serializable
9+
data class StoredCheckpoint(
10+
val namespace: String,
11+
val revision: Int,
12+
val position: String,
13+
@Serializable(with = InstantSerializer::class)
14+
val timestamp: Instant,
15+
val data: JsonElement,
16+
)

0 commit comments

Comments
 (0)