Skip to content

Commit 069c059

Browse files
committed
Add PostgresLockProvider implementation and related tests for KrescentLock in new package
1 parent 21bf08b commit 069c059

File tree

5 files changed

+152
-1
lines changed

5 files changed

+152
-1
lines changed

krescent-exposed/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ plugins {
88

99
dependencies {
1010
// Apply the kotlinx bundle of dependencies from the version catalog (`gradle/libs.versions.toml`).
11-
implementation(libs.bundles.exposed)
1211
implementation(libs.bundles.kotlinxEcosystem)
1312
implementation(project(":krescent-core"))
13+
api(libs.bundles.exposed)
1414

1515
testImplementation(kotlin("test"))
1616
testImplementation(project(":krescent-test"))

krescent-postgres/build.gradle.kts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
plugins {
2+
// Apply the shared build logic from a convention plugin.
3+
// The shared code is located in `buildSrc/src/main/kotlin/kotlin-jvm.gradle.kts`.
4+
id("buildsrc.convention.kotlin-jvm")
5+
// Apply Kotlin Serialization plugin from `gradle/libs.versions.toml`.
6+
alias(libs.plugins.kotlinPluginSerialization)
7+
}
8+
9+
dependencies {
10+
// Apply the kotlinx bundle of dependencies from the version catalog (`gradle/libs.versions.toml`).
11+
implementation(libs.bundles.exposed)
12+
implementation(libs.bundles.kotlinxEcosystem)
13+
implementation(project(":krescent-core"))
14+
implementation(project(":krescent-exposed"))
15+
implementation(libs.postgresql)
16+
17+
testImplementation(kotlin("test"))
18+
testImplementation(project(":krescent-test"))
19+
testImplementation(libs.bundles.testContainers)
20+
21+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package dev.helight.krescent.postgres
2+
3+
import dev.helight.krescent.synchronization.KrescentLock
4+
import dev.helight.krescent.synchronization.KrescentLockProvider
5+
import kotlinx.coroutines.*
6+
import kotlinx.coroutines.selects.onTimeout
7+
import kotlinx.coroutines.selects.select
8+
import org.jetbrains.exposed.sql.Database
9+
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
10+
import java.nio.ByteBuffer
11+
import java.security.MessageDigest
12+
import java.util.*
13+
import kotlin.time.Duration
14+
15+
class PostgresLockProvider(
16+
val database: Database,
17+
val leaseTime: Duration = Duration.INFINITE,
18+
) : KrescentLockProvider {
19+
20+
override suspend fun getLock(identity: String): KrescentLock {
21+
return PostgresLock(listOf(identity))
22+
}
23+
24+
override suspend fun getMultiLock(identities: Collection<String>): KrescentLock {
25+
return PostgresLock(identities)
26+
}
27+
28+
private inner class PostgresLock(
29+
val identities: Collection<String>,
30+
) : KrescentLock {
31+
32+
private val open = Stack<CompletableDeferred<Unit>>()
33+
34+
override suspend fun acquire(timeout: Duration?) {
35+
val releaser = CompletableDeferred<Unit>()
36+
launchPostgresLockJob(database, identities, releaser, timeout ?: Duration.INFINITE, leaseTime).await()
37+
open.push(releaser)
38+
}
39+
40+
override suspend fun release() {
41+
val releaser = open.pop() ?: return
42+
releaser.complete(Unit)
43+
}
44+
}
45+
}
46+
47+
private fun hashKey(key: String): Long {
48+
val bytes = key.toByteArray()
49+
val digest = MessageDigest.getInstance("MD5").digest(bytes)
50+
return ByteBuffer.wrap(digest.sliceArray(0..7)).long
51+
}
52+
53+
@OptIn(ExperimentalCoroutinesApi::class)
54+
private fun launchPostgresLockJob(
55+
database: Database,
56+
identities: Collection<String>,
57+
releaser: CompletableDeferred<Unit>,
58+
timeout: Duration,
59+
leaseTime: Duration,
60+
): Deferred<Unit> {
61+
val started = CompletableDeferred<Unit>()
62+
CoroutineScope(Dispatchers.IO).launch {
63+
newSuspendedTransaction(db = database) {
64+
try {
65+
val setLockTimeout = when (timeout) {
66+
Duration.INFINITE -> "SET LOCAL lock_timeout = 0"
67+
else -> "SET LOCAL lock_timeout = ${timeout.inWholeMilliseconds}"
68+
}
69+
exec(
70+
identities.joinToString(
71+
prefix = "DO $$ BEGIN $setLockTimeout; ",
72+
separator = ";",
73+
postfix = "; END $$"
74+
) { "PERFORM pg_advisory_xact_lock(${hashKey(it)})" })
75+
started.complete(Unit)
76+
} catch (e: Exception) {
77+
started.completeExceptionally(e)
78+
return@newSuspendedTransaction
79+
}
80+
select {
81+
releaser.onAwait {}
82+
onTimeout(leaseTime) {}
83+
}
84+
}
85+
}
86+
return started
87+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package dev.helight.krescent.postgres
2+
3+
import dev.helight.krescent.synchronization.KrescentLockProvider
4+
import dev.helight.krescent.test.KrescentLockProviderContract
5+
import org.jetbrains.exposed.sql.Database
6+
import org.testcontainers.containers.GenericContainer
7+
import org.testcontainers.junit.jupiter.Container
8+
import org.testcontainers.junit.jupiter.Testcontainers
9+
import java.time.Duration
10+
11+
@Testcontainers
12+
class PostgresLockTest : KrescentLockProviderContract {
13+
14+
companion object {
15+
16+
@Container
17+
private val postgres = GenericContainer("postgres:latest")
18+
.withExposedPorts(5432)
19+
.withEnv("POSTGRES_USER", "root")
20+
.withEnv("POSTGRES_PASSWORD", "example")
21+
.withStartupTimeout(Duration.ofSeconds(60))
22+
23+
private val connectionString get() = "jdbc:postgresql://${postgres.host}:${postgres.getMappedPort(5432)}/root"
24+
25+
fun connect(): Database = Database.connect(
26+
url = connectionString,
27+
driver = "org.postgresql.Driver",
28+
user = "root",
29+
password = "example"
30+
)
31+
}
32+
33+
override val latency: Long
34+
get() = 50L
35+
36+
override fun getProvider(): KrescentLockProvider {
37+
38+
return PostgresLockProvider(
39+
database = connect()
40+
)
41+
}
42+
}

settings.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ include(":krescent-core")
2323
include(":krescent-kurrent")
2424
include(":krescent-mongo")
2525
include(":krescent-exposed")
26+
include(":krescent-postgres")
2627
include(":krescent-redisson")
2728

2829
include(":krescent-test")

0 commit comments

Comments
 (0)