Skip to content

Commit 972986b

Browse files
committed
Add support for $all-stream-based event retrieval for KurrentDB.
1 parent 660a076 commit 972986b

File tree

6 files changed

+491
-70
lines changed

6 files changed

+491
-70
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package dev.helight.krescent.kurrent
2+
3+
import dev.helight.krescent.event.EventMessage
4+
import dev.helight.krescent.source.StreamingEventSource
5+
import dev.helight.krescent.source.StreamingToken
6+
import io.kurrent.dbclient.*
7+
import kotlinx.coroutines.*
8+
import kotlinx.coroutines.flow.Flow
9+
import kotlinx.coroutines.flow.channelFlow
10+
import kotlinx.coroutines.flow.take
11+
import kotlinx.coroutines.future.await
12+
import org.intellij.lang.annotations.Language
13+
14+
/**
15+
* Represents an event source for streaming events from the $all stream in KurrentDB.
16+
*
17+
* You can set up server-side filtering using the [filter] builder block, which allows you to specify
18+
* a custom filter for the events being streamed, see [Server-Side-Filtering](https://docs.kurrent.io/clients/grpc/subscriptions.html#server-side-filtering).
19+
*
20+
* Beware that the $all stream will contain events from streams that are technically already deleted or truncated. While
21+
* in practice this will not be that large of an issue, you can influence the events that are actually passed on to
22+
* the event handlers by specifying an additional client-side [streamFilter].
23+
*
24+
* @property client The KurrentDBClient used for interacting with the database.
25+
* @property credentials Optional user credentials for authentication.
26+
* @property streamFilter Optional client-side filter for the events being streamed.
27+
* @property filter Optional server-side filter for the events being streamed.
28+
*/
29+
class AllStreamKurrentEventSource(
30+
val client: KurrentDBClient,
31+
val credentials: UserCredentials? = null,
32+
val streamFilter: KurrentSubscriptionStreamFilter? = null,
33+
val filter: (SubscriptionFilterBuilder.() -> Unit)? = null,
34+
) : StreamingEventSource {
35+
36+
companion object {
37+
/**
38+
* @see [AllStreamKurrentEventSource]
39+
*/
40+
fun eventTypePrefix(
41+
client: KurrentDBClient,
42+
vararg prefix: String,
43+
credentials: UserCredentials? = null,
44+
streamFilter: KurrentSubscriptionStreamFilter? = null,
45+
): AllStreamKurrentEventSource = AllStreamKurrentEventSource(client, credentials, streamFilter, filter = {
46+
prefix.forEach { addEventTypePrefix(it) }
47+
})
48+
49+
/**
50+
* @see [AllStreamKurrentEventSource]
51+
*/
52+
fun streamNamePrefix(
53+
client: KurrentDBClient,
54+
vararg prefix: String,
55+
credentials: UserCredentials? = null,
56+
streamFilter: KurrentSubscriptionStreamFilter? = null,
57+
): AllStreamKurrentEventSource = AllStreamKurrentEventSource(client, credentials, streamFilter, filter = {
58+
prefix.forEach { addStreamNamePrefix(it) }
59+
})
60+
61+
/**
62+
* @see [AllStreamKurrentEventSource]
63+
*/
64+
fun streamNameRegex(
65+
client: KurrentDBClient,
66+
@Language("RegExp") regex: String,
67+
credentials: UserCredentials? = null,
68+
streamFilter: KurrentSubscriptionStreamFilter? = null,
69+
): AllStreamKurrentEventSource = AllStreamKurrentEventSource(client, credentials, streamFilter, filter = {
70+
withStreamNameRegularExpression(regex)
71+
})
72+
73+
/**
74+
* @see [AllStreamKurrentEventSource]
75+
*/
76+
fun eventTypeRegex(
77+
client: KurrentDBClient,
78+
@Language("RegExp") regex: String,
79+
credentials: UserCredentials? = null,
80+
streamFilter: KurrentSubscriptionStreamFilter? = null,
81+
): AllStreamKurrentEventSource = AllStreamKurrentEventSource(client, credentials, streamFilter, filter = {
82+
withEventTypeRegularExpression(regex)
83+
})
84+
}
85+
86+
override suspend fun getHeadToken(): KurrentLogPositionToken = KurrentLogPositionToken.HeadToken()
87+
88+
override suspend fun getTailToken(): KurrentLogPositionToken = KurrentLogPositionToken.TailToken()
89+
90+
override suspend fun deserializeToken(encoded: String): KurrentLogPositionToken = when (encoded) {
91+
"HEAD" -> KurrentLogPositionToken.HeadToken()
92+
"TAIL" -> KurrentLogPositionToken.TailToken()
93+
else -> KurrentLogPositionToken.decodePosition(encoded)
94+
}
95+
96+
override suspend fun fetchEventsAfter(
97+
token: StreamingToken<*>?,
98+
limit: Int?,
99+
): Flow<Pair<EventMessage, StreamingToken<*>>> {
100+
return when (limit) {
101+
null -> subscribeToAll(token, cancelOnCaughtUp = true)
102+
else -> subscribeToAll(token, cancelOnCaughtUp = true).take(limit)
103+
}
104+
}
105+
106+
override suspend fun streamEvents(startToken: StreamingToken<*>?): Flow<Pair<EventMessage, StreamingToken<*>>> {
107+
return subscribeToAll(startToken)
108+
}
109+
110+
private suspend fun subscribeToAll(
111+
startToken: StreamingToken<*>?,
112+
cancelOnCaughtUp: Boolean = false,
113+
): Flow<Pair<EventMessage, StreamingToken<*>>> {
114+
if (startToken != null && startToken !is KurrentLogPositionToken) {
115+
throw IllegalArgumentException("Token must be of type KurrentStreamingToken")
116+
}
117+
118+
val token = startToken ?: getHeadToken()
119+
val subscribeToAllOptions = SubscribeToAllOptions.get()
120+
if (credentials != null) subscribeToAllOptions.authenticated(credentials)
121+
token.applyTo(subscribeToAllOptions)
122+
if (filter != null) {
123+
subscribeToAllOptions.filter(SubscriptionFilter.newBuilder().apply {
124+
filter()
125+
}.build())
126+
}
127+
128+
val startPosition = if (token is KurrentLogPositionToken.PositionToken) token.position else null
129+
return channelFlow {
130+
launch(Dispatchers.IO) {
131+
val subscription = client.subscribeToAll(object : SubscriptionListener() {
132+
override fun onEvent(
133+
subscription: Subscription,
134+
event: ResolvedEvent,
135+
) {
136+
val evt = event.originalEvent
137+
val token = KurrentLogPositionToken.PositionToken(evt.position)
138+
if (startPosition == token.position) return
139+
if (streamFilter != null && !streamFilter.filter(event)) return
140+
val message = KurrentMessageFactory.decode(evt)
141+
runBlocking {
142+
send(message to token)
143+
}
144+
}
145+
146+
override fun onCancelled(subscription: Subscription, exception: Throwable) {
147+
cancel("Subscription error", exception)
148+
}
149+
150+
override fun onCaughtUp(subscription: Subscription?) {
151+
if (cancelOnCaughtUp) {
152+
cancel("Caught up to the stream", null)
153+
}
154+
}
155+
}, subscribeToAllOptions).await()
156+
try {
157+
delay(Long.MAX_VALUE)
158+
} finally {
159+
subscription.stop()
160+
}
161+
}
162+
}
163+
}
164+
}
165+

krescent-kurrent/src/main/kotlin/dev/helight/krescent/kurrent/KurrentEventSource.kt

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,15 @@ class KurrentEventSource(
3333

3434
override suspend fun deserializeToken(encoded: String): KurrentStreamingToken {
3535
return when (encoded) {
36-
"HEAD" -> KurrentStreamingToken.HeadStreamingToken()
37-
"TAIL" -> KurrentStreamingToken.TailStreamingToken()
38-
else -> KurrentStreamingToken.RevisionStreamingToken(encoded.toLong())
36+
"HEAD" -> KurrentStreamingToken.HeadToken()
37+
"TAIL" -> KurrentStreamingToken.TailToken()
38+
else -> KurrentStreamingToken.RevisionToken(encoded.toLong())
3939
}
4040
}
4141

42-
override suspend fun getHeadToken(): KurrentStreamingToken {
43-
return KurrentStreamingToken.HeadStreamingToken()
44-
}
42+
override suspend fun getHeadToken(): KurrentStreamingToken = KurrentStreamingToken.HeadToken()
4543

46-
override suspend fun getTailToken(): KurrentStreamingToken {
47-
return KurrentStreamingToken.TailStreamingToken()
48-
}
44+
override suspend fun getTailToken(): KurrentStreamingToken = KurrentStreamingToken.TailToken()
4945

5046
override suspend fun fetchEventsAfter(
5147
token: StreamingToken<*>?,
@@ -55,20 +51,20 @@ class KurrentEventSource(
5551
throw IllegalArgumentException("Token must be of type KurrentStreamingToken")
5652
}
5753

58-
var readStreamOptions = ReadStreamOptions.get().forwards()
59-
if (credentials != null) readStreamOptions = readStreamOptions.authenticated(credentials)
54+
val readStreamOptions = ReadStreamOptions.get().forwards()
55+
if (credentials != null) readStreamOptions.authenticated(credentials)
6056
if (limit != null) readStreamOptions.maxCount(limit.toLong())
61-
if (resolvedLinks) readStreamOptions = readStreamOptions.resolveLinkTos()
62-
readStreamOptions = (token ?: getHeadToken()).applyToReadOption(readStreamOptions)
63-
val startRevision = if (token is KurrentStreamingToken.RevisionStreamingToken) token.revision else null
57+
if (resolvedLinks) readStreamOptions.resolveLinkTos()
58+
(token ?: getHeadToken()).applyTo(readStreamOptions)
59+
val startRevision = if (token is KurrentStreamingToken.RevisionToken) token.revision else null
6460

6561
return channelFlow {
6662
launch(Dispatchers.IO) {
6763
try {
6864
val result = client.readStream(streamId, readStreamOptions).await()
6965
for (t in result.events) {
7066
val evt = t.originalEvent
71-
val token = KurrentStreamingToken.RevisionStreamingToken(evt.revision)
67+
val token = KurrentStreamingToken.RevisionToken(evt.revision)
7268
val message = KurrentMessageFactory.decode(evt)
7369
// The docs say the revision is exclusive, but in my testing it seems inclusive.
7470
// So we just check if the revision is the same, and if it is, we skip it.
@@ -88,11 +84,11 @@ class KurrentEventSource(
8884
}
8985

9086
val token = startToken ?: getHeadToken()
91-
var subscribeToStreamOptions = SubscribeToStreamOptions.get()
92-
if (credentials != null) subscribeToStreamOptions = subscribeToStreamOptions.authenticated(credentials)
93-
if (resolvedLinks) subscribeToStreamOptions = subscribeToStreamOptions.resolveLinkTos()
94-
subscribeToStreamOptions = token.applyToSubscribeOption(subscribeToStreamOptions)
95-
val startRevision = if (token is KurrentStreamingToken.RevisionStreamingToken) token.revision else null
87+
val subscribeToStreamOptions = SubscribeToStreamOptions.get()
88+
if (credentials != null) subscribeToStreamOptions.authenticated(credentials)
89+
if (resolvedLinks) subscribeToStreamOptions.resolveLinkTos()
90+
token.applyTo(subscribeToStreamOptions)
91+
val startRevision = if (token is KurrentStreamingToken.RevisionToken) token.revision else null
9692

9793
return channelFlow {
9894
// I move this to the IO dispatcher since I don't trust the kurrentdb client after the fromRevision incident.
@@ -103,9 +99,9 @@ class KurrentEventSource(
10399
event: ResolvedEvent,
104100
) {
105101
val evt = event.originalEvent
106-
val token = KurrentStreamingToken.RevisionStreamingToken(evt.revision)
107-
val message = KurrentMessageFactory.decode(evt)
102+
val token = KurrentStreamingToken.RevisionToken(evt.revision)
108103
if (startRevision == token.revision) return
104+
val message = KurrentMessageFactory.decode(evt)
109105
runBlocking {
110106
send(message to token)
111107
}
@@ -127,19 +123,20 @@ class KurrentEventSource(
127123
override suspend fun publish(event: EventMessage): Unit = coroutineScope {
128124
sendMutex.withLock {
129125
val eventData = KurrentMessageFactory.encode(event)
130-
var appendOptions = AppendToStreamOptions.get().streamState(StreamState.AnyStreamState())
131-
if (credentials != null) appendOptions = appendOptions.authenticated(credentials)
126+
val appendOptions = AppendToStreamOptions.get().streamState(StreamState.AnyStreamState())
127+
if (credentials != null) appendOptions.authenticated(credentials)
132128
client.appendToStream(streamId, appendOptions, eventData).await()
133129
}
134130
}
135131

136132
override suspend fun publishAll(events: List<EventMessage>) {
137133
sendMutex.withLock {
138134
val eventData = events.map { KurrentMessageFactory.encode(it) }
139-
var appendOptions = AppendToStreamOptions.get().streamState(StreamState.AnyStreamState())
140-
if (credentials != null) appendOptions = appendOptions.authenticated(credentials)
135+
val appendOptions = AppendToStreamOptions.get().streamState(StreamState.AnyStreamState())
136+
if (credentials != null) appendOptions.authenticated(credentials)
141137
client.appendToStream(streamId, appendOptions, *eventData.toTypedArray()).await()
142138
}
143139
}
144140
}
145141

142+
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package dev.helight.krescent.kurrent
2+
3+
import dev.helight.krescent.source.StreamingToken
4+
import io.kurrent.dbclient.Position
5+
import io.kurrent.dbclient.SubscribeToAllOptions
6+
7+
sealed class KurrentLogPositionToken : StreamingToken<KurrentLogPositionToken> {
8+
9+
fun applyTo(subscribeToAllOptions: SubscribeToAllOptions): SubscribeToAllOptions = when (this) {
10+
is PositionToken -> subscribeToAllOptions.fromPosition(position)
11+
is TailToken -> subscribeToAllOptions.fromEnd()
12+
is HeadToken -> subscribeToAllOptions.fromStart()
13+
}
14+
15+
override fun serialize(): String = when (this) {
16+
is PositionToken -> "${position.commitUnsigned}/${position.prepareUnsigned}"
17+
is TailToken -> "TAIL"
18+
is HeadToken -> "HEAD"
19+
}
20+
21+
override fun compareTo(other: KurrentLogPositionToken): Int {
22+
return when (this) {
23+
is PositionToken -> when (other) {
24+
is PositionToken -> position.compareTo(other.position)
25+
is TailToken -> -1
26+
is HeadToken -> 1
27+
}
28+
29+
is TailToken -> when (other) {
30+
is PositionToken -> 1
31+
is TailToken -> 0
32+
is HeadToken -> 1
33+
}
34+
35+
is HeadToken -> when (other) {
36+
is PositionToken -> -1
37+
is TailToken -> -1
38+
is HeadToken -> 0
39+
}
40+
}
41+
}
42+
43+
class TailToken : KurrentLogPositionToken()
44+
class HeadToken : KurrentLogPositionToken()
45+
data class PositionToken(val position: Position) : KurrentLogPositionToken()
46+
47+
companion object {
48+
fun decodePosition(encoded: String): KurrentLogPositionToken {
49+
val split = encoded.split('/')
50+
if (split.size != 2) {
51+
throw IllegalArgumentException("Invalid position format: $encoded")
52+
}
53+
val position = Position(
54+
split[0].toLong(),
55+
split[1].toLong()
56+
)
57+
return PositionToken(position)
58+
}
59+
}
60+
}

0 commit comments

Comments
 (0)