Skip to content

Commit e40ee24

Browse files
committed
Add RedisNotifier and RedisNotifierStreaming for event notification handling
1 parent 01164c5 commit e40ee24

File tree

4 files changed

+162
-1
lines changed

4 files changed

+162
-1
lines changed

krescent-redisson/src/main/kotlin/dev/helight/krescent/redisson/RedisLockProvider.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ class RedisLockProvider(
6363
lock.unlockAsync(claimId).asDeferred().await()
6464
}
6565
}
66-
}
66+
}
67+
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.event.EventMessage
4+
import dev.helight.krescent.source.EventPublisher
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.flow.FlowCollector
7+
import kotlinx.coroutines.future.asDeferred
8+
import org.redisson.api.RedissonClient
9+
import org.redisson.api.listener.MessageListener
10+
import kotlin.time.Duration
11+
12+
class RedisNotifier(
13+
val client: RedissonClient,
14+
val channel: String = "krescent:notifications",
15+
) {
16+
suspend fun notify() {
17+
val topic = client.getTopic(channel)
18+
topic.publishAsync("").asDeferred().await()
19+
}
20+
21+
fun wrap(publisher: EventPublisher): EventPublisher {
22+
return object : EventPublisher {
23+
override suspend fun publish(event: EventMessage) {
24+
publisher.publish(event)
25+
notify()
26+
}
27+
28+
override suspend fun publishAll(events: List<EventMessage>) {
29+
publisher.publishAll(events)
30+
notify()
31+
}
32+
}
33+
}
34+
35+
fun receiveTo(target: FlowCollector<Unit>): Job = CoroutineScope(Dispatchers.Default).launch {
36+
val topic = client.getTopic(this@RedisNotifier.channel)
37+
val listenerId = topic.addListenerAsync(String::class.java, MessageListener { _, _ ->
38+
runBlocking {
39+
target.emit(Unit)
40+
}
41+
}).asDeferred().await()
42+
try {
43+
delay(Duration.INFINITE)
44+
} finally {
45+
withContext(NonCancellable) {
46+
topic.removeListenerAsync(listenerId).asDeferred().await()
47+
}
48+
}
49+
}
50+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package dev.helight.krescent.redisson
2+
3+
import kotlinx.coroutines.delay
4+
import kotlinx.coroutines.flow.MutableSharedFlow
5+
import kotlinx.coroutines.flow.produceIn
6+
import kotlinx.coroutines.launch
7+
import kotlinx.coroutines.runBlocking
8+
import org.redisson.Redisson
9+
import org.redisson.api.RedissonClient
10+
import org.redisson.config.Config
11+
import org.testcontainers.containers.GenericContainer
12+
import org.testcontainers.junit.jupiter.Container
13+
import org.testcontainers.junit.jupiter.Testcontainers
14+
import java.time.Duration
15+
import kotlin.test.Test
16+
import kotlin.test.assertFalse
17+
import kotlin.test.assertTrue
18+
19+
@Testcontainers
20+
class RedisNotifierTest {
21+
companion object {
22+
23+
@Container
24+
private val valkey = GenericContainer("valkey/valkey:latest")
25+
.withExposedPorts(6379)
26+
.withStartupTimeout(Duration.ofSeconds(60))
27+
28+
private val connectionString get() = "redis://${valkey.host}:${valkey.getMappedPort(6379)}"
29+
30+
fun connect(): RedissonClient = Redisson.create(Config().apply {
31+
useSingleServer().setAddress(connectionString)
32+
})
33+
}
34+
35+
@Test
36+
fun `Receive and publish notifications`(): Unit = runBlocking {
37+
val client = connect()
38+
val notifier = RedisNotifier(client)
39+
val flow = MutableSharedFlow<Unit>(0)
40+
val job = notifier.receiveTo(flow)
41+
var hasReceived = false
42+
launch {
43+
val channel = flow.produceIn(this)
44+
channel.receive()
45+
hasReceived = true
46+
channel.cancel()
47+
}
48+
delay(50)
49+
assertFalse(hasReceived)
50+
notifier.notify()
51+
delay(50)
52+
MutableSharedFlow<Unit>(0)
53+
assertTrue(hasReceived)
54+
job.cancel()
55+
}
56+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.source.EventPublisher
4+
import dev.helight.krescent.source.PollingStreamingEventSource.Companion.pollingWithNotifications
5+
import dev.helight.krescent.source.StreamingEventSource
6+
import dev.helight.krescent.source.impl.InMemoryEventStore
7+
import dev.helight.krescent.test.StreamingEventSourceContract
8+
import kotlinx.coroutines.CoroutineScope
9+
import kotlinx.coroutines.flow.MutableSharedFlow
10+
import kotlinx.coroutines.runBlocking
11+
import org.redisson.Redisson
12+
import org.redisson.api.RedissonClient
13+
import org.redisson.config.Config
14+
import org.testcontainers.containers.GenericContainer
15+
import org.testcontainers.junit.jupiter.Container
16+
import org.testcontainers.junit.jupiter.Testcontainers
17+
import java.time.Duration
18+
import java.util.*
19+
20+
@Testcontainers
21+
class RedisNotifierStreamingTest : StreamingEventSourceContract {
22+
23+
companion object {
24+
25+
@Container
26+
private val valkey = GenericContainer("valkey/valkey:latest")
27+
.withExposedPorts(6379)
28+
.withStartupTimeout(Duration.ofSeconds(60))
29+
30+
private val connectionString get() = "redis://${valkey.host}:${valkey.getMappedPort(6379)}"
31+
32+
fun connect(): RedissonClient = Redisson.create(Config().apply {
33+
useSingleServer().setAddress(connectionString)
34+
})
35+
}
36+
37+
override fun execWithStreamingSource(block: suspend CoroutineScope.(StreamingEventSource, EventPublisher) -> Unit) {
38+
val source = InMemoryEventStore()
39+
val redis = connect()
40+
val notifier = RedisNotifier(redis, "krescent:notifications:${UUID.randomUUID()}")
41+
val publisher = notifier.wrap(source)
42+
43+
val notifications = MutableSharedFlow<Unit>(0)
44+
val job = notifier.receiveTo(notifications)
45+
val streaming = source.pollingWithNotifications(notifications)
46+
try {
47+
runBlocking {
48+
this.block(streaming, publisher)
49+
}
50+
} finally {
51+
job.cancel()
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)