11package dev.helight.krescent.exposed
22
33import dev.helight.krescent.event.EventMessage
4- import dev.helight.krescent.source.StreamingEventSource
4+ import dev.helight.krescent.source.StoredEventSource
55import dev.helight.krescent.source.StreamingToken
6- import kotlinx.coroutines.coroutineScope
7- import kotlinx.coroutines.delay
86import kotlinx.coroutines.flow.Flow
97import kotlinx.coroutines.flow.channelFlow
10- import kotlinx.coroutines.flow.emptyFlow
11- import kotlinx.coroutines.flow.flow
12- import org.jetbrains.exposed.sql.Database
13- import org.jetbrains.exposed.sql.Query
14- import org.jetbrains.exposed.sql.ResultRow
15- import org.jetbrains.exposed.sql.SortOrder
8+ import org.jetbrains.exposed.sql.*
9+ import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
10+ import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList
11+ import org.jetbrains.exposed.sql.SqlExpressionBuilder.like
12+ import org.jetbrains.exposed.sql.SqlExpressionBuilder.regexp
1613import kotlin.math.min
1714import kotlin.time.ExperimentalTime
1815
1916@OptIn(ExperimentalTime ::class )
20- class StreamingExposedEventSource (
21- val table : KrescentTable ,
17+ class ExposedEventSource (
2218 val database : Database ,
19+ val table : KrescentEventsTable = KrescentEventsTable (),
2320 val streamId : String? = null ,
2421 val streamIdMatcher : StreamIdMatcher = StreamIdMatcher .EQ ,
22+ val eventFilter : StreamEventFilter ? = null ,
2523 val batchSize : Int = 20 ,
26- val pollingDelay : Long = 500L ,
27- ) : StreamingEventSource {
24+ ) : StoredEventSource {
2825 override suspend fun getHeadToken (): StreamingToken <* > {
2926 return ExposedStreamingToken .HeadToken ()
3027 }
3128
32- override suspend fun getTailToken (): StreamingToken <* > {
33- return ExposedStreamingToken .TailToken ()
34- }
29+ override suspend fun getTailToken (): StreamingToken <* > = peakEnd()
3530
3631 override suspend fun deserializeToken (encoded : String ): StreamingToken <* > {
3732 return when (encoded) {
3833 " HEAD" -> ExposedStreamingToken .HeadToken ()
39- " TAIL" -> ExposedStreamingToken .TailToken ()
4034 else -> ExposedStreamingToken .PositionToken (encoded.toLong())
4135 }
4236 }
4337
44- private fun Query.withStreamIdFilter (): Query {
45- return when (streamId) {
46- null -> this
38+ private fun Query.withFilterClause (): Query {
39+ when (streamId) {
40+ null -> null
4741 else -> when (streamIdMatcher) {
48- StreamIdMatcher .EQ -> this .where { table.streamId eq streamId }
49- StreamIdMatcher .LIKE -> this .where { table.streamId like streamId }
50- StreamIdMatcher .REGEX -> this .where { table.streamId regexp streamId }
42+ StreamIdMatcher .EQ -> table.streamId eq streamId
43+ StreamIdMatcher .LIKE -> table.streamId like streamId
44+ StreamIdMatcher .REGEX -> table.streamId regexp streamId
5145 }
52- }
46+ }?.let { andWhere { it } }
47+
48+ when (eventFilter) {
49+ null -> null
50+ else -> table.type inList eventFilter.eventNames
51+ }?.let { andWhere { it } }
52+
53+ return this
5354 }
5455
5556 private suspend fun peakEnd (): ExposedStreamingToken {
5657 return jdbcSuspendTransaction(database) {
5758 val last = table
5859 .select(table.id)
5960 .orderBy(table.id, SortOrder .DESC )
60- .withStreamIdFilter ()
61+ .withFilterClause ()
6162 .limit(1 )
6263 .firstOrNull()
6364 when (last) {
@@ -85,17 +86,25 @@ class StreamingExposedEventSource(
8586 }
8687 val list = jdbcSuspendTransaction(database) {
8788 token.begin(table)
88- .withStreamIdFilter ()
89+ .withFilterClause ()
8990 .limit(actualBatchSize)
9091 .map(::mapRowToPair)
9192 .toList()
9293 }
93- val endToken = list.lastOrNull()?.second ? : peakEnd()
94- return BatchResult (
95- events = list,
96- endToken = endToken,
97- reachedEnd = list.size < actualBatchSize || endToken is ExposedStreamingToken .TailToken ,
98- )
94+ val endToken = list.lastOrNull()?.second
95+ return if (endToken == null ) {
96+ BatchResult (
97+ events = list,
98+ endToken = peakEnd(),
99+ reachedEnd = true
100+ )
101+ } else {
102+ BatchResult (
103+ events = list,
104+ endToken = endToken,
105+ reachedEnd = list.size < actualBatchSize
106+ )
107+ }
99108 }
100109
101110 override suspend fun fetchEventsAfter (
@@ -107,7 +116,6 @@ class StreamingExposedEventSource(
107116 is ExposedStreamingToken -> token
108117 else -> throw IllegalArgumentException (" Token must be of type ExposedStreamingToken" )
109118 }
110- if (parsedToken is ExposedStreamingToken .TailToken ) return emptyFlow()
111119 var currentToken = parsedToken
112120 var remaining = limit?.toLong() ? : Long .MAX_VALUE
113121 return channelFlow {
@@ -123,29 +131,6 @@ class StreamingExposedEventSource(
123131 }
124132 }
125133
126- override suspend fun streamEvents (startToken : StreamingToken <* >? ): Flow <Pair <EventMessage , StreamingToken <* >>> =
127- coroutineScope {
128- val parsedToken = when (startToken) {
129- null -> ExposedStreamingToken .HeadToken ()
130- is ExposedStreamingToken .TailToken -> peakEnd()
131- is ExposedStreamingToken -> startToken
132- else -> throw IllegalArgumentException (" Token must be of type ExposedStreamingToken" )
133- }
134- var currentToken: ExposedStreamingToken = parsedToken
135- return @coroutineScope flow {
136- while (true ) {
137- val batch = fetchBatch(currentToken)
138- for (event in batch.events) {
139- emit(event)
140- }
141- currentToken = batch.endToken
142- if (batch.reachedEnd) {
143- delay(pollingDelay)
144- }
145- }
146- }
147- }
148-
149134 private data class BatchResult (
150135 val events : List <Pair <EventMessage , StreamingToken <* >>>,
151136 val endToken : ExposedStreamingToken ,
0 commit comments