Skip to content

Commit c550deb

Browse files
committed
Add Redis-based checkpoint storage and locking provider implementations
1 parent 6644192 commit c550deb

File tree

11 files changed

+341
-0
lines changed

11 files changed

+341
-0
lines changed

.idea/gradle.xml

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

gradle/libs.versions.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ testContainers = "1.21.0"
1010
exposed = "0.59.0"
1111
postgresql = "42.7.3"
1212
slf4j = "2.0.9"
13+
redisson = "3.52.0"
1314

1415
[libraries]
1516
kotlinGradlePlugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" }
@@ -31,6 +32,7 @@ exposedKotlinDatetime = { module = "org.jetbrains.exposed:exposed-kotlin-datetim
3132
postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" }
3233
slf4jApi = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
3334
slf4jSimple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
35+
redisson = { module = "org.redisson:redisson", version.ref = "redisson" }
3436

3537
# Libraries can be bundled together for easier import
3638
[bundles]

krescent-redisson/build.gradle.kts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
plugins {
2+
// Apply the shared build logic from a convention plugin.
3+
// The shared code is located in `buildSrc/src/main/kotlin/kotlin-jvm.gradle.kts`.
4+
id("buildsrc.convention.kotlin-jvm")
5+
// Apply Kotlin Serialization plugin from `gradle/libs.versions.toml`.
6+
alias(libs.plugins.kotlinPluginSerialization)
7+
}
8+
9+
dependencies {
10+
// Apply the kotlinx bundle of dependencies from the version catalog (`gradle/libs.versions.toml`).
11+
implementation(libs.bundles.kotlinxEcosystem)
12+
api(libs.redisson)
13+
implementation(project(":krescent-core"))
14+
15+
testImplementation(kotlin("test"))
16+
testImplementation(libs.bundles.testContainers)
17+
testImplementation(libs.slf4jSimple)
18+
testImplementation(project(":krescent-test"))
19+
20+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.checkpoint.CheckpointStorage
4+
import dev.helight.krescent.checkpoint.StoredCheckpoint
5+
import kotlinx.coroutines.future.asDeferred
6+
import org.redisson.api.RedissonClient
7+
import java.util.concurrent.TimeUnit
8+
import kotlin.time.Duration
9+
import kotlin.time.DurationUnit
10+
import kotlin.time.toDuration
11+
12+
class CachedRedisCheckpointStorage(
13+
val client: RedissonClient,
14+
val cacheName: String = "krescent:checkpoints:cache",
15+
val ttl: Duration = 30.toDuration(DurationUnit.DAYS),
16+
) : CheckpointStorage {
17+
18+
override suspend fun storeCheckpoint(checkpoint: StoredCheckpoint) {
19+
val millis = ttl.inWholeMilliseconds
20+
client.getMapCache<String, RedisCheckpointSnapshot>(cacheName)
21+
.putAsync(
22+
checkpoint.namespace,
23+
RedisCheckpointSnapshot.fromStoredCheckpoint(checkpoint),
24+
millis,
25+
TimeUnit.MILLISECONDS
26+
)
27+
.asDeferred().await()
28+
}
29+
30+
override suspend fun getLatestCheckpoint(namespace: String): StoredCheckpoint? {
31+
val snapshot = client.getMapCache<String, RedisCheckpointSnapshot>(cacheName)
32+
.getAsync(namespace)
33+
.asDeferred().await()
34+
return snapshot?.toStoredCheckpoint(namespace)
35+
}
36+
37+
override suspend fun deleteCheckpoint(namespace: String) {
38+
client.getMapCache<String, RedisCheckpointSnapshot>(cacheName)
39+
.removeAsync(namespace)
40+
.asDeferred().await()
41+
}
42+
43+
override suspend fun clearCheckpoints() {
44+
client.getMapCache<String, RedisCheckpointSnapshot>(cacheName)
45+
.clearAsync()
46+
.asDeferred().await()
47+
}
48+
}
49+
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.checkpoint.CheckpointStorage
4+
import dev.helight.krescent.checkpoint.StoredCheckpoint
5+
import kotlinx.coroutines.future.asDeferred
6+
import org.redisson.api.RedissonClient
7+
8+
class PersistentRedisCheckpointStorage(
9+
val client: RedissonClient,
10+
val mapName: String = "krescent:checkpoints",
11+
) : CheckpointStorage {
12+
override suspend fun storeCheckpoint(checkpoint: StoredCheckpoint) {
13+
client.getMap<String, RedisCheckpointSnapshot>(mapName)
14+
.putAsync(checkpoint.namespace, RedisCheckpointSnapshot.fromStoredCheckpoint(checkpoint))
15+
.asDeferred().await()
16+
}
17+
18+
override suspend fun getLatestCheckpoint(namespace: String): StoredCheckpoint? {
19+
val snapshot = client.getMap<String, RedisCheckpointSnapshot>(mapName)
20+
.getAsync(namespace)
21+
.asDeferred().await()
22+
return snapshot?.toStoredCheckpoint(namespace)
23+
}
24+
25+
override suspend fun deleteCheckpoint(namespace: String) {
26+
client.getMap<String, RedisCheckpointSnapshot>(mapName)
27+
.removeAsync(namespace)
28+
.asDeferred().await()
29+
}
30+
31+
override suspend fun clearCheckpoints() {
32+
client.getMap<String, RedisCheckpointSnapshot>(mapName)
33+
.clearAsync()
34+
.asDeferred().await()
35+
}
36+
}
37+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.checkpoint.CheckpointBucket
4+
import dev.helight.krescent.checkpoint.StoredCheckpoint
5+
import kotlinx.datetime.Instant
6+
import java.io.Serializable
7+
8+
internal open class RedisCheckpointSnapshot(
9+
open val version: String,
10+
open val position: String,
11+
open val timestamp: Long,
12+
open val data: ByteArray,
13+
) : Serializable {
14+
fun toStoredCheckpoint(namespace: String): StoredCheckpoint {
15+
return StoredCheckpoint(
16+
version = version,
17+
position = position,
18+
timestamp = Instant.fromEpochMilliseconds(timestamp),
19+
namespace = namespace,
20+
data = CheckpointBucket.fromByteArray(data)
21+
)
22+
}
23+
24+
companion object {
25+
fun fromStoredCheckpoint(checkpoint: StoredCheckpoint): RedisCheckpointSnapshot {
26+
return RedisCheckpointSnapshot(
27+
version = checkpoint.version,
28+
position = checkpoint.position,
29+
timestamp = checkpoint.timestamp.toEpochMilliseconds(),
30+
data = checkpoint.data.encodeToByteArray()
31+
)
32+
}
33+
}
34+
}
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.synchronization.KrescentLock
4+
import dev.helight.krescent.synchronization.KrescentLockProvider
5+
import kotlinx.coroutines.future.asDeferred
6+
import kotlinx.coroutines.withTimeout
7+
import org.redisson.api.RLock
8+
import org.redisson.api.RedissonClient
9+
import java.util.concurrent.TimeUnit
10+
import java.util.concurrent.atomic.AtomicLong
11+
import kotlin.time.Duration
12+
13+
class RedisLockProvider(
14+
val client: RedissonClient,
15+
val prefix: String = "krescent:locks:",
16+
val leaseTime: Duration = Duration.INFINITE,
17+
) : KrescentLockProvider {
18+
19+
private var lockIdCounter: AtomicLong = AtomicLong(0)
20+
21+
override suspend fun getLock(identity: String): KrescentLock {
22+
val rlock = client.getLock(prefix + identity)
23+
return RedisLockImpl(rlock, lockIdCounter.getAndIncrement())
24+
}
25+
26+
override suspend fun getMultiLock(identities: Collection<String>): KrescentLock {
27+
val lockList = identities.map { identity ->
28+
client.getLock(prefix + identity)
29+
}.toTypedArray()
30+
val multiLock = client.getMultiLock(*lockList)
31+
return RedisLockImpl(multiLock, lockIdCounter.getAndIncrement())
32+
}
33+
34+
private inner class RedisLockImpl(
35+
val lock: RLock,
36+
val lockId: Long,
37+
) : KrescentLock {
38+
39+
override suspend fun acquire(timeout: Duration?) {
40+
if (timeout != null) {
41+
withTimeout(timeout) { acquire() }
42+
} else {
43+
acquire()
44+
}
45+
}
46+
47+
private suspend fun acquire() {
48+
lock.lockAsync(
49+
when {
50+
leaseTime.isInfinite() -> -1L
51+
else -> leaseTime.inWholeMilliseconds
52+
}, TimeUnit.MILLISECONDS, lockId
53+
).asDeferred().await()
54+
}
55+
56+
override suspend fun release() {
57+
lock.unlockAsync(lockId).asDeferred().await()
58+
}
59+
60+
override fun equals(other: Any?): Boolean {
61+
if (this === other) return true
62+
if (javaClass != other?.javaClass) return false
63+
other as RedisLockImpl
64+
return lockId == other.lockId
65+
}
66+
67+
override fun hashCode(): Int {
68+
return lockId.hashCode()
69+
}
70+
}
71+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.checkpoint.CheckpointStorage
4+
import dev.helight.krescent.test.CheckpointStoreContract
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.delay
7+
import kotlinx.coroutines.runBlocking
8+
import org.redisson.Redisson
9+
import org.redisson.api.RedissonClient
10+
import org.redisson.config.Config
11+
import org.testcontainers.containers.GenericContainer
12+
import org.testcontainers.junit.jupiter.Container
13+
import org.testcontainers.junit.jupiter.Testcontainers
14+
import java.time.Duration
15+
import java.util.*
16+
17+
@Testcontainers
18+
class CachedRedisCheckpointStorageTest : CheckpointStoreContract {
19+
20+
companion object {
21+
22+
@Container
23+
private val valkey = GenericContainer("valkey/valkey:latest")
24+
.withExposedPorts(6379)
25+
.withStartupTimeout(Duration.ofSeconds(60))
26+
27+
private val connectionString get() = "redis://${valkey.host}:${valkey.getMappedPort(6379)}"
28+
29+
fun connect(): RedissonClient = Redisson.create(Config().apply {
30+
useSingleServer().setAddress(connectionString)
31+
})
32+
}
33+
34+
override fun withCheckpointStorage(block: suspend CoroutineScope.(CheckpointStorage) -> Unit) = runBlocking {
35+
val db = connect()
36+
val mapName = "storage:cached:${UUID.randomUUID()}"
37+
val storage = CachedRedisCheckpointStorage(db, mapName)
38+
try {
39+
this.block(storage)
40+
delay(300)
41+
} finally {
42+
storage.clearCheckpoints()
43+
}
44+
}
45+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.checkpoint.CheckpointStorage
4+
import dev.helight.krescent.test.CheckpointStoreContract
5+
import kotlinx.coroutines.CoroutineScope
6+
import kotlinx.coroutines.delay
7+
import kotlinx.coroutines.runBlocking
8+
import org.redisson.Redisson
9+
import org.redisson.api.RedissonClient
10+
import org.redisson.config.Config
11+
import org.testcontainers.containers.GenericContainer
12+
import org.testcontainers.junit.jupiter.Container
13+
import org.testcontainers.junit.jupiter.Testcontainers
14+
import java.time.Duration
15+
import java.util.*
16+
17+
@Testcontainers
18+
class PersistentRedisCheckpointStorageTest : CheckpointStoreContract {
19+
20+
companion object {
21+
22+
@Container
23+
private val valkey = GenericContainer("valkey/valkey:latest")
24+
.withExposedPorts(6379)
25+
.withStartupTimeout(Duration.ofSeconds(60))
26+
27+
private val connectionString get() = "redis://${valkey.host}:${valkey.getMappedPort(6379)}"
28+
29+
fun connect(): RedissonClient = Redisson.create(Config().apply {
30+
useSingleServer().setAddress(connectionString)
31+
})
32+
}
33+
34+
override fun withCheckpointStorage(block: suspend CoroutineScope.(CheckpointStorage) -> Unit) = runBlocking {
35+
val db = connect()
36+
val mapName = "storage:${UUID.randomUUID()}"
37+
val storage = PersistentRedisCheckpointStorage(db, mapName)
38+
try {
39+
this.block(storage)
40+
delay(300)
41+
} finally {
42+
storage.clearCheckpoints()
43+
}
44+
}
45+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package dev.helight.krescent.redisson
2+
3+
import dev.helight.krescent.synchronization.KrescentLockProvider
4+
import dev.helight.krescent.test.KrescentLockProviderContract
5+
import org.redisson.Redisson
6+
import org.redisson.api.RedissonClient
7+
import org.redisson.config.Config
8+
import org.testcontainers.containers.GenericContainer
9+
import org.testcontainers.junit.jupiter.Container
10+
import org.testcontainers.junit.jupiter.Testcontainers
11+
import java.time.Duration
12+
import java.util.*
13+
14+
@Testcontainers
15+
class RedisLockProviderTest : KrescentLockProviderContract {
16+
17+
companion object {
18+
19+
@Container
20+
private val valkey = GenericContainer("valkey/valkey:latest")
21+
.withExposedPorts(6379)
22+
.withStartupTimeout(Duration.ofSeconds(60))
23+
24+
private val connectionString get() = "redis://${valkey.host}:${valkey.getMappedPort(6379)}"
25+
26+
fun connect(): RedissonClient = Redisson.create(Config().apply {
27+
useSingleServer().setAddress(connectionString)
28+
})
29+
}
30+
31+
override fun getProvider(): KrescentLockProvider {
32+
val db = connect()
33+
val uid = UUID.randomUUID().toString()
34+
return RedisLockProvider(db, "krescent:locks:$uid:")
35+
}
36+
}

0 commit comments

Comments
 (0)