Skip to content

Commit 93aac71

Browse files
committed
Add Support for Postgres and modernize gradle setup
1 parent fc3c8b9 commit 93aac71

File tree

24 files changed

+748
-131
lines changed

24 files changed

+748
-131
lines changed

.idea/copilot.data.migration.agent.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/copilot.data.migration.edit.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/dataSources.xml

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/gradle.xml

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.idea/sqldialects.xml

Lines changed: 6 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gradle/libs.versions.toml

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,14 @@
1-
# Version catalog is a central place for you to declare and version dependencies
2-
# https://docs.gradle.org/current/userguide/platforms.html#sub:version-catalog
3-
# https://docs.gradle.org/current/userguide/platforms.html#sub::toml-dependencies-format
4-
51
[versions]
62
kotlin = "2.1.20"
73
kotlinxDatetime = "0.6.1"
84
kotlinxSerialization = "1.7.3"
9-
kotlinxCoroutines = "1.9.0"
5+
kotlinxCoroutines = "1.10.2"
106
kurrentClient = "1.0.1"
117
orgReactiveStream = "1.0.4"
128
mongoDriver = "5.5.0"
139
testContainers = "1.21.0"
10+
exposed = "1.0.0-rc-1"
11+
postgresql = "42.7.3"
1412

1513
[libraries]
1614
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
@@ -24,12 +22,19 @@ mongoDriver = { module = "org.mongodb:mongodb-driver-kotlin-coroutine", version.
2422
mongoBsonX = { module = "org.mongodb:bson-kotlinx", version.ref = "mongoDriver" }
2523
testContainersJunit = { module = "org.testcontainers:junit-jupiter", version.ref = "testContainers" }
2624
testContainers = { module = "org.testcontainers:testcontainers", version.ref = "testContainers" }
25+
exposedCore = { module = "org.jetbrains.exposed:exposed-core", version.ref = "exposed" }
26+
exposedDao = { module = "org.jetbrains.exposed:exposed-dao", version.ref = "exposed" }
27+
exposedJdbc = { module = "org.jetbrains.exposed:exposed-jdbc", version.ref = "exposed" }
28+
exposedJson = { module = "org.jetbrains.exposed:exposed-json", version.ref = "exposed" }
29+
exposedKotlinDatetime = { module = "org.jetbrains.exposed:exposed-kotlin-datetime", version.ref = "exposed" }
30+
postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" }
2731

2832
# Libraries can be bundled together for easier import
2933
[bundles]
3034
kotlinxEcosystem = ["kotlinxDatetime", "kotlinxSerializationJson", "kotlinxSerializationCbor", "kotlinxCoroutines"]
3135
kurrent = ["kurrentClient", "orgReactiveStream"]
3236
mongo = ["mongoDriver", "mongoBsonX"]
37+
exposed = ["exposedCore", "exposedDao", "exposedJdbc", "exposedJson", "exposedKotlinDatetime"]
3338
testContainers = ["testContainersJunit", "testContainers"]
3439

3540
[plugins]

krescent-core/src/main/kotlin/dev/helight/krescent/CoroutineUtils.kt

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,14 @@ class FlowBuffer<T> {
6767
* @param first The first flow to collect
6868
* @param second The second flow to collect
6969
* @param activateCallback A callback to be called when both flows are collecting values
70+
* @param liveCallback A callback to be called when the flow switches to live mode
7071
*/
7172
@OptIn(ExperimentalAtomicApi::class)
7273
fun <T> joinSequentialFlows(
7374
first: Flow<T>,
7475
second: Flow<T>,
7576
activateCallback: () -> Unit = { },
77+
liveCallback: () -> Unit = { },
7678
): Flow<T> = channelFlow {
7779
val mutex = Mutex()
7880
val secondBuffer = ArrayDeque<T>()
@@ -101,10 +103,70 @@ fun <T> joinSequentialFlows(
101103
send(secondBuffer.removeFirst())
102104
}
103105
switchToLive = true
106+
liveCallback()
104107
}
105108
secondJob.join()
106109
}
107110

111+
/**
112+
* Creates a backfilling flow with a live tail from two flows.
113+
* The `live` flow will be buffered until the `catchup` flow is fully collected.
114+
* Once the `catchup` flow is complete, the buffered values from the `live` flow will be sent,
115+
* followed by any new values from the `live` flow.
116+
*
117+
* @param catchup The flow to backfill from
118+
* @param live The live flow to collect from
119+
* @param comparator A comparator used for deduplication of values by age or position
120+
* @param activateCallback A callback to be called when both flows are collecting values
121+
* @param liveCallback A callback to be called when the flow switches to live mode
122+
*/
123+
fun <T> createCatchupFlow(
124+
catchup: Flow<T>,
125+
live: Flow<T>,
126+
comparator: Comparator<T>,
127+
activateCallback: () -> Unit = { },
128+
liveCallback: () -> Unit = { },
129+
): Flow<T> = channelFlow {
130+
val mutex = Mutex()
131+
val liveBuffer = mutableListOf<T>()
132+
var lastCatchup: T? = null
133+
var switchToLive = false
134+
135+
val catchupJob = launch {
136+
catchup.collect { value ->
137+
send(value)
138+
lastCatchup = value
139+
}
140+
}
141+
val liveJob = launch {
142+
live.collect { value ->
143+
mutex.withLock {
144+
if (switchToLive) {
145+
send(value)
146+
} else {
147+
liveBuffer.add(value)
148+
}
149+
}
150+
}
151+
}
152+
activateCallback()
153+
catchupJob.join()
154+
mutex.withLock {
155+
if (liveBuffer.isNotEmpty()) {
156+
if (lastCatchup != null) {
157+
liveBuffer.removeAll { comparator.compare(it, lastCatchup) <= 0 }
158+
}
159+
}
160+
for (value in liveBuffer) {
161+
send(value)
162+
}
163+
liveBuffer.clear()
164+
switchToLive = true
165+
liveCallback()
166+
}
167+
liveJob.join()
168+
}
169+
108170
/**
109171
* Runs a sequence of tasks where each task is executed in order without interruption due to exceptions.
110172
* If any task throws an exception, it collects all exceptions and throws a single `UninterruptedChainException` with a list of errors.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package dev.helight.krescent.source
2+
3+
import dev.helight.krescent.event.EventMessage
4+
import kotlinx.coroutines.coroutineScope
5+
import kotlinx.coroutines.delay
6+
import kotlinx.coroutines.flow.Flow
7+
import kotlinx.coroutines.flow.flow
8+
9+
class PollingEventSource<T : StoredEventSource>(
10+
val delegate: T,
11+
val pollingLimit: Int? = null,
12+
val pollingDelay: Long = 500L,
13+
) : StreamingEventSource, StoredEventSource by delegate {
14+
15+
override suspend fun streamEvents(startToken: StreamingToken<*>?): Flow<Pair<EventMessage, StreamingToken<*>>> =
16+
coroutineScope {
17+
var currentToken: StreamingToken<*> = startToken ?: delegate.getHeadToken()
18+
return@coroutineScope flow {
19+
while (true) {
20+
val subFlow = fetchEventsAfter(currentToken, pollingLimit)
21+
var counter = 0
22+
subFlow.collect {
23+
emit(it)
24+
currentToken = it.second
25+
counter++
26+
}
27+
val reachedEnd = counter < (pollingLimit ?: Int.MAX_VALUE)
28+
if (reachedEnd) {
29+
delay(pollingDelay)
30+
}
31+
}
32+
}
33+
}
34+
35+
}
36+
37+
fun <T : StoredEventSource> T.asPollingSource(
38+
pollingLimit: Int? = null,
39+
pollingDelay: Long = 500L,
40+
): PollingEventSource<T> = PollingEventSource(
41+
delegate = this,
42+
pollingLimit = pollingLimit,
43+
pollingDelay = pollingDelay,
44+
)

krescent-core/src/main/kotlin/dev/helight/krescent/source/StreamingEventSource.kt

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@ import kotlinx.coroutines.flow.Flow
55
import kotlinx.datetime.Instant
66

77
/**
8-
* Interface for an event source that provides a replayable stream of events.
8+
* Interface for an event source that provides access to stored events.
99
* Supports token-based positioning for fetching events.
1010
*/
11-
interface StreamingEventSource {
12-
11+
interface StoredEventSource {
1312
/**
1413
* Gets a token for the head of the stream, which is *before* the first event in the stream.
1514
* Fetching after the head token will return the first event in the stream.
@@ -40,8 +39,18 @@ interface StreamingEventSource {
4039
* @param limit Maximum number of events to fetch, null for no limit
4140
* @return A pair of the flow of events and the updated token for the next fetch
4241
*/
43-
suspend fun fetchEventsAfter(token: StreamingToken<*>? = null, limit: Int? = null): Flow<Pair<EventMessage, StreamingToken<*>>>
42+
suspend fun fetchEventsAfter(
43+
token: StreamingToken<*>? = null,
44+
limit: Int? = null,
45+
): Flow<Pair<EventMessage, StreamingToken<*>>>
4446

47+
}
48+
49+
/**
50+
* Interface for an event source that provides a replayable stream of events.
51+
* Supports token-based positioning for fetching events.
52+
*/
53+
interface StreamingEventSource : StoredEventSource {
4554
/**
4655
* Creates a flow of all events in the stream starting from the specified token.
4756
* Then it will continue to stream new events as they are published.
@@ -52,7 +61,7 @@ interface StreamingEventSource {
5261
suspend fun streamEvents(startToken: StreamingToken<*>? = null): Flow<Pair<EventMessage, StreamingToken<*>>>
5362
}
5463

55-
interface ExtendedQueryableStreamingEventSource : StreamingEventSource {
64+
interface ExtendedQueryableStoredEventSource : StoredEventSource {
5665
/**
5766
* Gets a token pointing before the first event at or after the specified timestamp.
5867
*
@@ -67,4 +76,5 @@ interface ExtendedQueryableStreamingEventSource : StreamingEventSource {
6776
*/
6877
suspend fun getTokenForEventId(eventId: String): StreamingToken<*>?
6978

70-
}
79+
}
80+

krescent-core/src/main/kotlin/dev/helight/krescent/source/impl/InMemoryEventStore.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ package dev.helight.krescent.source.impl
22

33
import dev.helight.krescent.event.EventMessage
44
import dev.helight.krescent.joinSequentialFlows
5-
import dev.helight.krescent.source.EventPublisher
6-
import dev.helight.krescent.source.ExtendedQueryableStreamingEventSource
7-
import dev.helight.krescent.source.StreamingToken
8-
import dev.helight.krescent.source.SubscribingEventSource
5+
import dev.helight.krescent.source.*
96
import kotlinx.coroutines.flow.*
107
import kotlinx.coroutines.sync.Mutex
118
import kotlinx.coroutines.sync.withLock
@@ -17,7 +14,7 @@ import java.nio.charset.StandardCharsets
1714

1815
class InMemoryEventStore(
1916
private val events: MutableList<EventMessage> = mutableListOf(),
20-
) : ExtendedQueryableStreamingEventSource, SubscribingEventSource, EventPublisher {
17+
) : StreamingEventSource, ExtendedQueryableStoredEventSource, SubscribingEventSource, EventPublisher {
2118

2219
constructor(vararg events: EventMessage) : this(events.toMutableList())
2320

0 commit comments

Comments
 (0)