Skip to content

Commit adbe8ef

Browse files
authored
Merge pull request #4 from SLNE-Development/copilot/add-spring-and-redis-connection
Add global Redis connection and Redis Streams support
2 parents cd8393d + 94fe07a commit adbe8ef

File tree

5 files changed

+549
-0
lines changed

5 files changed

+549
-0
lines changed

README.md

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ runBlocking {
3636
- 🎯 Annotation-based event subscription
3737
- 🔧 Type-safe event handling with Kotlin Serialization
3838
- ⚡ High-performance event invocation using MethodHandles
39+
- 🔄 **Redis Streams support** for reliable event delivery with persistence
40+
- 🎛️ **Global Redis connection** management via RedisApi
3941

4042
## Requirements
4143

@@ -129,6 +131,65 @@ Examples:
129131
- `redis://password@localhost:6379` - Local Redis with password
130132
- `redis://[email protected]:6379/0` - Remote Redis with password and database
131133

134+
## Global Redis Connection (RedisApi)
135+
136+
surf-redis provides a centralized way to manage Redis connections via the `RedisApi` singleton:
137+
138+
```kotlin
139+
import de.slne.redis.RedisApi
140+
141+
// Initialize global connection
142+
RedisApi.init(url = "redis://localhost:6379")
143+
144+
// Alternative syntax
145+
RedisApi(url = "redis://localhost:6379").connect()
146+
147+
// Check connection status
148+
if (RedisApi.isConnected()) {
149+
println("Connected to: ${RedisApi.getUrl()}")
150+
}
151+
152+
// Create connections
153+
val connection = RedisApi.createConnection()
154+
val pubSubConnection = RedisApi.createPubSubConnection()
155+
156+
// Close connection when done
157+
RedisApi.disconnect()
158+
```
159+
160+
The `RedisApi` automatically initializes with a default URL (`redis://localhost:6379`) if not explicitly configured.
161+
162+
## Redis Streams
163+
164+
For more reliable event delivery with message persistence, use `RedisStreamEventBus`:
165+
166+
### Benefits of Redis Streams
167+
168+
- 📦 **Message Persistence**: Events are stored and not lost if no consumer is online
169+
- 👥 **Consumer Groups**: Multiple instances can share the load
170+
-**Message Acknowledgment**: Ensures events are processed
171+
- 🔄 **Reprocessing**: Failed events can be reprocessed
172+
173+
### Usage
174+
175+
```kotlin
176+
import de.slne.redis.stream.RedisStreamEventBus
177+
178+
// Create stream-based event bus
179+
val streamBus = RedisStreamEventBus(
180+
streamName = "my-events",
181+
consumerGroup = "my-app",
182+
consumerName = "instance-1"
183+
)
184+
185+
// Use exactly like RedisEventBus
186+
streamBus.registerListener(MyListener())
187+
streamBus.publish(MyEvent())
188+
streamBus.close()
189+
```
190+
191+
Redis Streams provide stronger guarantees than pub/sub, making them ideal for critical events that must not be lost.
192+
132193
## How It Works
133194

134195
1. **Event Publishing**: When you call `eventBus.publish(event)`, the event is serialized using Kotlin Serialization and published asynchronously to a Redis channel using coroutines.

build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ dependencies {
1515
// Lettuce Redis client
1616
implementation("io.lettuce:lettuce-core:6.3.0.RELEASE")
1717

18+
// Logging
19+
implementation("org.slf4j:slf4j-api:2.0.9")
20+
1821
// Kotlin
1922
implementation(kotlin("stdlib"))
2023
implementation(kotlin("reflect"))
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package de.slne.redis
2+
3+
import io.lettuce.core.RedisClient
4+
import io.lettuce.core.api.StatefulRedisConnection
5+
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
6+
7+
/**
8+
* Global Redis API for managing Redis connections.
9+
* Provides a centralized way to initialize and access Redis connections.
10+
*
11+
* Usage:
12+
* ```
13+
* RedisApi.init(url = "redis://localhost:6379")
14+
* // or
15+
* RedisApi(url = "redis://localhost:6379").connect()
16+
* ```
17+
*/
18+
object RedisApi {
19+
private var redisClient: RedisClient? = null
20+
private var redisUrl: String = "redis://localhost:6379"
21+
private var isInitialized = false
22+
23+
/**
24+
* Initialize the Redis API with a connection URL.
25+
* This should be called once at application startup.
26+
*
27+
* @param url The Redis connection URL (e.g., "redis://localhost:6379")
28+
* @return This RedisApi instance for chaining
29+
*/
30+
fun init(url: String = "redis://localhost:6379"): RedisApi {
31+
if (isInitialized && redisUrl == url) {
32+
return this
33+
}
34+
35+
// Close existing connection if any
36+
if (isInitialized) {
37+
disconnect()
38+
}
39+
40+
redisUrl = url
41+
connect()
42+
return this
43+
}
44+
45+
/**
46+
* Alternative syntax for initialization.
47+
*
48+
* @param url The Redis connection URL
49+
* @return This RedisApi instance
50+
*/
51+
operator fun invoke(url: String): RedisApi {
52+
return init(url)
53+
}
54+
55+
/**
56+
* Connect to Redis using the configured URL.
57+
* Called automatically by init().
58+
*
59+
* @return This RedisApi instance for chaining
60+
*/
61+
fun connect(): RedisApi {
62+
if (!isInitialized) {
63+
redisClient = RedisClient.create(redisUrl)
64+
isInitialized = true
65+
}
66+
return this
67+
}
68+
69+
/**
70+
* Disconnect from Redis and clean up resources.
71+
*/
72+
fun disconnect() {
73+
redisClient?.shutdown()
74+
redisClient = null
75+
isInitialized = false
76+
}
77+
78+
/**
79+
* Get the Redis client instance.
80+
* Automatically initializes with default URL if not already initialized.
81+
*
82+
* @return The RedisClient instance
83+
*/
84+
fun getClient(): RedisClient {
85+
if (!isInitialized) {
86+
init()
87+
}
88+
return redisClient ?: throw IllegalStateException("Redis client not initialized")
89+
}
90+
91+
/**
92+
* Get the configured Redis URL.
93+
*
94+
* @return The Redis connection URL
95+
*/
96+
fun getUrl(): String = redisUrl
97+
98+
/**
99+
* Check if Redis is initialized and connected.
100+
*
101+
* @return True if initialized, false otherwise
102+
*/
103+
fun isConnected(): Boolean = isInitialized && redisClient != null
104+
105+
/**
106+
* Create a new stateful connection for Redis commands.
107+
*
108+
* @return A new StatefulRedisConnection
109+
*/
110+
fun createConnection(): StatefulRedisConnection<String, String> {
111+
return getClient().connect()
112+
}
113+
114+
/**
115+
* Create a new stateful pub/sub connection for Redis pub/sub.
116+
*
117+
* @return A new StatefulRedisPubSubConnection
118+
*/
119+
fun createPubSubConnection(): StatefulRedisPubSubConnection<String, String> {
120+
return getClient().connectPubSub()
121+
}
122+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package de.slne.redis.example
2+
3+
import de.slne.redis.RedisApi
4+
import de.slne.redis.event.RedisEventBus
5+
import de.slne.redis.event.Subscribe
6+
import de.slne.redis.stream.RedisStreamEventBus
7+
import kotlinx.coroutines.delay
8+
import kotlinx.coroutines.runBlocking
9+
10+
/**
11+
* Example demonstrating the new features:
12+
* 1. Global RedisApi connection
13+
* 2. Redis Streams support
14+
* 3. Direct usage without Spring
15+
*/
16+
17+
fun main() = runBlocking {
18+
println("=== surf-redis Feature Demo ===\n")
19+
20+
// Feature 1: Global Redis Connection via RedisApi
21+
println("1. Initializing global Redis connection via RedisApi")
22+
RedisApi.init(url = "redis://localhost:6379")
23+
println(" Redis connected: ${RedisApi.isConnected()}")
24+
println(" Redis URL: ${RedisApi.getUrl()}\n")
25+
26+
// Alternative syntax as requested
27+
// RedisApi(url = "redis://localhost:6379").connect()
28+
29+
// Feature 2: Traditional Event Bus (pub/sub)
30+
println("2. Using traditional RedisEventBus (pub/sub)")
31+
val eventBus = RedisEventBus("redis://localhost:6379")
32+
val listener1 = DemoListener("Listener1")
33+
eventBus.registerListener(listener1)
34+
35+
eventBus.publish(PlayerJoinEvent("Alice", "uuid-1", "Server-1"))
36+
delay(100) // Give time for async handling
37+
println()
38+
39+
// Feature 3: Redis Streams for reliable delivery
40+
println("3. Using RedisStreamEventBus (Redis Streams)")
41+
val streamBus = RedisStreamEventBus(
42+
streamName = "demo-events",
43+
consumerGroup = "demo-group",
44+
consumerName = "demo-consumer-1"
45+
)
46+
val listener2 = DemoListener("StreamListener")
47+
streamBus.registerListener(listener2)
48+
49+
// Give time for consumer to start
50+
delay(500)
51+
52+
streamBus.publish(PlayerJoinEvent("Bob", "uuid-2", "Server-2"))
53+
delay(100) // Give time for async handling
54+
println()
55+
56+
// Feature 4: Multiple events
57+
println("4. Publishing multiple events")
58+
eventBus.publish(ChatMessageEvent("Alice", "Hello everyone!", "Server-1"))
59+
streamBus.publish(ChatMessageEvent("Bob", "Hi Alice!", "Server-2"))
60+
delay(100)
61+
println()
62+
63+
// Clean up
64+
println("5. Cleaning up...")
65+
eventBus.close()
66+
streamBus.close()
67+
RedisApi.disconnect()
68+
println("Demo completed successfully!")
69+
}
70+
71+
class DemoListener(private val name: String) {
72+
@Subscribe
73+
fun onPlayerJoin(event: PlayerJoinEvent) {
74+
println(" [$name] Player ${event.playerName} joined ${event.serverName}")
75+
}
76+
77+
@Subscribe
78+
fun onChatMessage(event: ChatMessageEvent) {
79+
println(" [$name] [${event.serverName}] ${event.playerName}: ${event.message}")
80+
}
81+
}

0 commit comments

Comments
 (0)