Skip to content

Commit d12a1d4

Browse files
committed
Add StreamId filtering with EQ, LIKE, and REGEX matchers and corresponding test cases
1 parent 196eea8 commit d12a1d4

File tree

3 files changed

+88
-23
lines changed

3 files changed

+88
-23
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package dev.helight.krescent.exposed
2+
3+
enum class StreamIdMatcher {
4+
EQ,
5+
LIKE,
6+
REGEX
7+
}

krescent-exposed/src/main/kotlin/dev/helight/krescent/exposed/StreamingExposedEventSource.kt

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import kotlinx.coroutines.flow.channelFlow
1010
import kotlinx.coroutines.flow.emptyFlow
1111
import kotlinx.coroutines.flow.flow
1212
import org.jetbrains.exposed.sql.Database
13+
import org.jetbrains.exposed.sql.Query
1314
import org.jetbrains.exposed.sql.ResultRow
1415
import org.jetbrains.exposed.sql.SortOrder
1516
import kotlin.math.min
@@ -20,6 +21,7 @@ class StreamingExposedEventSource(
2021
val table: KrescentTable,
2122
val database: Database,
2223
val streamId: String? = null,
24+
val streamIdMatcher: StreamIdMatcher = StreamIdMatcher.EQ,
2325
val batchSize: Int = 20,
2426
val pollingDelay: Long = 500L,
2527
) : StreamingEventSource {
@@ -39,17 +41,23 @@ class StreamingExposedEventSource(
3941
}
4042
}
4143

44+
private fun Query.withStreamIdFilter(): Query {
45+
return when (streamId) {
46+
null -> this
47+
else -> when (streamIdMatcher) {
48+
StreamIdMatcher.EQ -> this.where { table.streamId eq streamId }
49+
StreamIdMatcher.LIKE -> this.where { table.streamId like streamId }
50+
StreamIdMatcher.REGEX -> this.where { table.streamId regexp streamId }
51+
}
52+
}
53+
}
54+
4255
private suspend fun peakEnd(): ExposedStreamingToken {
4356
return jdbcSuspendTransaction(database) {
4457
val last = table
4558
.select(table.id)
4659
.orderBy(table.id, SortOrder.DESC)
47-
.let {
48-
when (streamId) {
49-
null -> it
50-
else -> it.where { table.streamId eq streamId }
51-
}
52-
}
60+
.withStreamIdFilter()
5361
.limit(1)
5462
.firstOrNull()
5563
when (last) {
@@ -76,12 +84,11 @@ class StreamingExposedEventSource(
7684
else -> min(batchSize, maxSize)
7785
}
7886
val list = jdbcSuspendTransaction(database) {
79-
token.begin(table).let {
80-
when (streamId) {
81-
null -> it
82-
else -> it.where { table.streamId eq streamId }
83-
}
84-
}.limit(actualBatchSize).map(::mapRowToPair).toList()
87+
token.begin(table)
88+
.withStreamIdFilter()
89+
.limit(actualBatchSize)
90+
.map(::mapRowToPair)
91+
.toList()
8592
}
8693
val endToken = list.lastOrNull()?.second ?: peakEnd()
8794
return BatchResult(

krescent-exposed/src/test/kotlin/dev/helight/krescent/exposed/PostgresStreamingSource.kt

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
11
package dev.helight.krescent.exposed
22

3+
import dev.helight.krescent.event.EventMessage
34
import dev.helight.krescent.source.EventPublisher
45
import dev.helight.krescent.source.StreamingEventSource
56
import dev.helight.krescent.test.StreamingEventSourceContract
67
import kotlinx.coroutines.CoroutineScope
78
import kotlinx.coroutines.delay
9+
import kotlinx.coroutines.flow.toList
810
import kotlinx.coroutines.runBlocking
11+
import kotlinx.serialization.json.buildJsonObject
12+
import kotlinx.serialization.json.put
913
import org.jetbrains.exposed.sql.Database
1014
import org.testcontainers.containers.GenericContainer
1115
import org.testcontainers.junit.jupiter.Container
1216
import org.testcontainers.junit.jupiter.Testcontainers
1317
import java.time.Duration
1418
import java.util.*
19+
import kotlin.test.Test
20+
import kotlin.test.assertEquals
1521

1622
@Testcontainers
1723
class PostgresStreamingSource : StreamingEventSourceContract {
@@ -35,20 +41,65 @@ class PostgresStreamingSource : StreamingEventSourceContract {
3541
)
3642
}
3743

44+
fun execWithTable(block: suspend CoroutineScope.(Database, KrescentTable) -> Unit) = runBlocking {
45+
val db = connect()
46+
val tableName = "krescent_${UUID.randomUUID()}"
47+
val table = KrescentTable(tableName)
48+
table.create(db)
49+
try {
50+
this.block(db, table)
51+
delay(300)
52+
} finally {
53+
runCatching { table.drop(db) }
54+
}
55+
}
56+
3857
override fun execWithStreamingSource(block: suspend CoroutineScope.(StreamingEventSource, EventPublisher) -> Unit) =
39-
runBlocking {
40-
val db = connect()
41-
val tableName = "krescent_${UUID.randomUUID()}"
42-
val table = KrescentTable(tableName)
43-
table.create(db)
58+
execWithTable { db, table ->
4459
val source = StreamingExposedEventSource(table, db, pollingDelay = 100L)
4560
val publisher = ExposedEventPublisher(table, db, "default")
46-
try {
47-
this.block(source, publisher)
48-
} finally {
49-
runCatching { table.drop(db) }
50-
}
51-
delay(300)
61+
this.block(source, publisher)
5262
}
5363

64+
65+
@Test
66+
fun `Like matching event stream`() = execWithTable { db, table ->
67+
ExposedEventPublisher(table, db, "user-ALICE-conversation-1")
68+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 1) }))
69+
70+
ExposedEventPublisher(table, db, "user-BOB-conversation-1")
71+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 2) }))
72+
73+
ExposedEventPublisher(table, db, "user-ALICE-conversation-2")
74+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 3) }))
75+
76+
val aliceEvents = StreamingExposedEventSource(table, db, "user-ALICE-conversation-%", StreamIdMatcher.LIKE)
77+
.fetchEventsAfter().toList()
78+
val bobEvents = StreamingExposedEventSource(table, db, "user-BOB-conversation-%", StreamIdMatcher.LIKE)
79+
.fetchEventsAfter().toList()
80+
81+
assertEquals(2, aliceEvents.size)
82+
assertEquals(1, bobEvents.size)
83+
}
84+
85+
@Test
86+
fun `Regex matching event stream`() = execWithTable { db, table ->
87+
ExposedEventPublisher(table, db, "user-ALICE-conversation-1")
88+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 1) }))
89+
90+
ExposedEventPublisher(table, db, "user-BOB-conversation-1")
91+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 2) }))
92+
93+
ExposedEventPublisher(table, db, "user-ALICE-conversation-2")
94+
.publish(EventMessage(type = "created", payload = buildJsonObject { put("number", 3) }))
95+
96+
val aliceEvents = StreamingExposedEventSource(table, db, "^user-ALICE-conversation-.*", StreamIdMatcher.REGEX)
97+
.fetchEventsAfter().toList()
98+
val bobEvents = StreamingExposedEventSource(table, db, "^user-BOB-conversation-.*", StreamIdMatcher.REGEX)
99+
.fetchEventsAfter().toList()
100+
101+
assertEquals(2, aliceEvents.size)
102+
assertEquals(1, bobEvents.size)
103+
}
104+
54105
}

0 commit comments

Comments
 (0)