Skip to content

Commit 3ba6527

Browse files
committed
Add payload filtering to ExposedEventSource and introduce StreamPayloadFilter class.
Also converter json to jsonb field and make it indexed
1 parent 774e188 commit 3ba6527

File tree

6 files changed

+41
-8
lines changed

6 files changed

+41
-8
lines changed

.idea/gradle.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

krescent-exposed/src/main/kotlin/dev/helight/krescent/exposed/ExposedEventSource.kt

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import kotlinx.coroutines.flow.Flow
77
import kotlinx.coroutines.flow.channelFlow
88
import org.jetbrains.exposed.sql.*
99
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
10-
import org.jetbrains.exposed.sql.SqlExpressionBuilder.inList
1110
import org.jetbrains.exposed.sql.SqlExpressionBuilder.like
1211
import org.jetbrains.exposed.sql.SqlExpressionBuilder.regexp
12+
import org.jetbrains.exposed.sql.json.contains
1313
import kotlin.math.min
1414
import kotlin.time.ExperimentalTime
1515

@@ -20,6 +20,7 @@ class ExposedEventSource(
2020
val table: KrescentEventLogTable = KrescentEventLogTable(),
2121
val streamIdMatcher: StreamIdMatcher = StreamIdMatcher.EQ,
2222
val eventFilter: StreamEventFilter? = null,
23+
val payloadFilter: StreamPayloadFilter? = null,
2324
val batchSize: Int = 500,
2425
) : StoredEventSource {
2526
override suspend fun getHeadToken(): StreamingToken<*> {
@@ -45,10 +46,8 @@ class ExposedEventSource(
4546
}
4647
}?.let { andWhere { it } }
4748

48-
when (eventFilter) {
49-
null -> null
50-
else -> table.type inList eventFilter.eventNames
51-
}?.let { andWhere { it } }
49+
if (eventFilter != null) andWhere { table.type inList eventFilter.eventNames }
50+
if (payloadFilter != null) andWhere { table.data.contains(payloadFilter.toQuery()) }
5251

5352
return this
5453
}

krescent-exposed/src/main/kotlin/dev/helight/krescent/exposed/KrescentEventLogTable.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import org.jetbrains.exposed.sql.Database
99
import org.jetbrains.exposed.sql.SchemaUtils
1010
import org.jetbrains.exposed.sql.Table
1111
import org.jetbrains.exposed.sql.json.json
12+
import org.jetbrains.exposed.sql.json.jsonb
1213
import org.jetbrains.exposed.sql.kotlin.datetime.KotlinInstantColumnType
1314
import org.jetbrains.exposed.sql.upsert
1415
import kotlin.time.ExperimentalTime
@@ -20,11 +21,11 @@ class KrescentEventLogTable(tableName: String = "krescent") : LongIdTable(tableN
2021
val type = text("type").index()
2122
val timestamp = registerColumn("timestamp", KotlinInstantColumnType()).index()
2223

23-
val data = json("data", {
24+
val data = jsonb("data", {
2425
Json.encodeToString(it)
2526
}, {
2627
Json.decodeFromString(JsonElement.serializer(), it)
27-
})
28+
}).index()
2829

2930
suspend fun create(database: Database) {
3031
jdbcSuspendTransaction(database) { SchemaUtils.create(this@KrescentEventLogTable) }

krescent-exposed/src/main/kotlin/dev/helight/krescent/exposed/StreamIdMatcher.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ class StreamEventFilter(
2727
return StreamEventFilter(includes)
2828
}
2929
}
30+
}
3031

31-
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package dev.helight.krescent.exposed
2+
3+
import kotlinx.serialization.json.Json
4+
import kotlinx.serialization.json.JsonElement
5+
import kotlinx.serialization.json.JsonPrimitive
6+
7+
class StreamPayloadFilter(
8+
val values: Map<String, JsonElement>,
9+
) {
10+
fun toQuery(): String = Json.encodeToString(values)
11+
12+
companion object {
13+
fun field(key: String, value: String) = StreamPayloadFilter(mapOf(key to JsonPrimitive(value)))
14+
fun field(key: String, value: Number) = StreamPayloadFilter(mapOf(key to JsonPrimitive(value)))
15+
fun field(key: String, value: Boolean) = StreamPayloadFilter(mapOf(key to JsonPrimitive(value)))
16+
}
17+
}

krescent-exposed/src/test/kotlin/dev/helight/krescent/exposed/PostgresStreamingSource.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,21 @@ class PostgresStreamingSource : StreamingEventSourceContract {
127127
assertEquals(3, listReceived.count())
128128
}
129129

130+
@Test
131+
fun `Simple event payload filter for string`() = execWithTable { db, table ->
132+
val publisher = ExposedEventPublisher(db, "event-stream", table)
133+
publisher.publish(exampleCatalog.create(EventA(1)))
134+
publisher.publish(exampleCatalog.create(EventB(2)))
135+
publisher.publish(exampleCatalog.create(EventC(2)))
136+
publisher.publish(exampleCatalog.create(EventC(3)))
137+
138+
val payloadFilterReceived = ExposedEventSource(
139+
db, "event-stream", table,
140+
payloadFilter = StreamPayloadFilter.field("num", 2)
141+
).fetchEventsAfter().toList()
142+
assertEquals(2, payloadFilterReceived.count())
143+
}
144+
130145
val exampleCatalog = buildEventCatalog(1) {
131146
event<EventA>("main.a")
132147
event<EventB>("main.b")

0 commit comments

Comments
 (0)