Skip to content

Commit 6f3c2c9

Browse files
committed
use pipelining of r2dbc to improve perf
1 parent d39cb08 commit 6f3c2c9

File tree

4 files changed

+106
-88
lines changed

4 files changed

+106
-88
lines changed

frameworks/Kotlin/ktor/ktor-r2dbc/build.gradle.kts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,13 @@ plugins {
1111
group = "org.jetbrains.ktor"
1212
version = "1.0-SNAPSHOT"
1313

14-
val ktorVersion = "3.1.3"
14+
val ktorVersion = "3.3.3"
1515
val serializationVersion = "1.8.1"
1616
val kotlinxHtmlVersion = "0.12.0"
1717
val coroutinesVersion = "1.10.1"
1818
val logbackVersion = "1.5.13"
19-
val reactorVersion = "3.7.1"
20-
val r2dbcVersion = "1.0.7.RELEASE"
19+
val reactorVersion = "3.8.0"
20+
val r2dbcPstgrsVersion = "1.1.1.RELEASE"
2121
val r2dbcPoolVersion = "1.0.2.RELEASE"
2222
val postgresqlVersion = "42.7.5"
2323

@@ -40,7 +40,7 @@ dependencies {
4040
implementation("io.ktor:ktor-server-html-builder-jvm:$ktorVersion")
4141
implementation("io.ktor:ktor-server-netty-jvm:$ktorVersion")
4242

43-
implementation("org.postgresql:r2dbc-postgresql:$r2dbcVersion")
43+
implementation("org.postgresql:r2dbc-postgresql:$r2dbcPstgrsVersion")
4444
implementation("io.r2dbc:r2dbc-pool:$r2dbcPoolVersion")
4545
implementation("io.projectreactor:reactor-core:$reactorVersion")
4646

frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks/Hello.kt

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@ import io.r2dbc.postgresql.PostgresqlConnectionFactory
1515
import io.r2dbc.postgresql.client.SSLMode
1616
import io.r2dbc.spi.Connection
1717
import io.r2dbc.spi.ConnectionFactory
18-
import kotlinx.coroutines.async
19-
import kotlinx.coroutines.awaitAll
20-
import kotlinx.coroutines.coroutineScope
21-
import kotlinx.coroutines.flow.Flow
22-
import kotlinx.coroutines.flow.flow
2318
import kotlinx.coroutines.reactive.awaitFirst
2419
import kotlinx.coroutines.reactive.awaitFirstOrNull
2520
import kotlinx.coroutines.reactor.awaitSingle
@@ -28,7 +23,7 @@ import reactor.core.publisher.Flux
2823
import reactor.core.publisher.Mono
2924
import java.time.Duration
3025
import java.util.concurrent.ThreadLocalRandom
31-
import kotlin.random.Random
26+
import kotlin.math.min
3227

3328
const val HELLO_WORLD = "Hello, World!"
3429
const val WORLD_QUERY = "SELECT id, randomnumber FROM world WHERE id = $1"
@@ -54,34 +49,18 @@ fun Application.main() {
5449
}
5550

5651
get("/db") {
57-
val request = getWorld(dbConnFactory)
58-
val result = request.awaitFirstOrNull()
59-
60-
call.respondJson(result)
52+
val world = dbConnFactory.fetchWorld()
53+
call.respondJson(world)
6154
}
6255

6356
get("/queries") {
6457
val queries = call.queries()
65-
66-
val result = fetchWorldsConcurrently(dbConnFactory, queries)
67-
68-
call.respondJson(result)
58+
val worlds = dbConnFactory.fetchWorlds(queries)
59+
call.respondJson(worlds)
6960
}
7061

7162
get("/fortunes") {
72-
val result = mutableListOf<Fortune>()
73-
74-
val request = Flux.usingWhen(dbConnFactory.create(), { connection ->
75-
Flux.from(connection.createStatement(FORTUNES_QUERY).execute()).flatMap { r ->
76-
Flux.from(r.map { row, _ ->
77-
Fortune(
78-
row.get(0, Int::class.java)!!, row.get(1, String::class.java)!!
79-
)
80-
})
81-
}
82-
}, { connection -> connection.close() })
83-
84-
request.collectList().awaitFirstOrNull()?.let { result.addAll(it) }
63+
val result = dbConnFactory.fetchFortunes().toMutableList()
8564

8665
result.add(Fortune(0, "Additional fortune added at request time."))
8766
result.sortBy { it.message }
@@ -107,7 +86,7 @@ fun Application.main() {
10786
get("/updates") {
10887
val queries = call.queries()
10988

110-
val worlds = fetchWorldsConcurrently(dbConnFactory, queries)
89+
val worlds = dbConnFactory.fetchWorlds(queries)
11190
val updatedWorlds = worlds.map {
11291
it.copy(randomNumber = ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1))
11392
}.sortedBy { it.id }
@@ -137,30 +116,55 @@ fun Application.main() {
137116
}
138117
}
139118

140-
private fun getWorld(
141-
dbConnFactory: ConnectionFactory, random: ThreadLocalRandom = ThreadLocalRandom.current()
142-
): Mono<World> = Mono.usingWhen(dbConnFactory.create(), { connection ->
143-
Mono.from(connection.createStatement(WORLD_QUERY)
144-
.bind("$1", random.nextInt(DB_ROWS) + 1)
145-
.execute())
146-
.flatMap { result ->
147-
Mono.from(result.map { row, _ ->
148-
World(
149-
row.get(0, Int::class.java)
150-
?: error("id is null"),
151-
row.get(1, Int::class.java)
152-
?: error("randomNumber is null")
153-
)
154-
})
155-
}
156-
}, Connection::close)
119+
private suspend fun ConnectionFactory.fetchWorld(): World =
120+
Mono.usingWhen(create(), { connection ->
121+
selectWorld(connection)
122+
}, Connection::close).awaitSingle()
123+
124+
private suspend fun ConnectionFactory.fetchWorlds(
125+
count: Int
126+
): List<World> {
127+
if (count <= 0) return emptyList()
128+
val concurrency = min(count, 32)
129+
return Mono.usingWhen(create(), { connection ->
130+
Flux.range(0, count)
131+
.flatMap({ selectWorldPublisher(connection) }, concurrency)
132+
.collectList()
133+
}, Connection::close).awaitSingle()
134+
}
157135

158-
suspend fun fetchWorldsConcurrently(factory: ConnectionFactory, count: Int): List<World> =
159-
coroutineScope {
160-
(0 until count).map {
161-
async { getWorld(factory).awaitSingle() }
162-
}.awaitAll()
136+
private fun selectWorld(connection: Connection): Mono<World> =
137+
selectWorldPublisher(connection)
138+
139+
private fun selectWorldPublisher(connection: Connection): Mono<World> {
140+
val worldId = ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1)
141+
return Mono.from(
142+
connection.createStatement(WORLD_QUERY)
143+
.bind("$1", worldId)
144+
.execute()
145+
).flatMap { result ->
146+
Mono.from(result.map { row, _ ->
147+
World(
148+
row.get(0, Int::class.java) ?: error("id is null"),
149+
row.get(1, Int::class.java) ?: error("randomNumber is null")
150+
)
151+
})
163152
}
153+
}
154+
155+
private suspend fun ConnectionFactory.fetchFortunes(): List<Fortune> =
156+
Mono.usingWhen(create(), { connection ->
157+
Flux.from(connection.createStatement(FORTUNES_QUERY).execute())
158+
.flatMap { result ->
159+
Flux.from(result.map { row, _ ->
160+
Fortune(
161+
row.get(0, Int::class.java) ?: error("id is null"),
162+
row.get(1, String::class.java) ?: error("message is null")
163+
)
164+
})
165+
}
166+
.collectList()
167+
}, Connection::close).awaitSingle()
164168

165169
private fun configurePostgresR2DBC(config: ApplicationConfig): ConnectionFactory {
166170
val cfo = PostgresqlConnectionConfiguration.builder()

frameworks/Kotlin/ktor/ktor/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ plugins {
1111
group = "org.jetbrains.ktor"
1212
version = "1.0-SNAPSHOT"
1313

14-
val ktorVersion = "3.1.3"
14+
val ktorVersion = "3.3.3"
1515
val serializationVersion = "1.8.1"
1616
val kotlinxHtmlVersion = "0.12.0"
1717
val hikariVersion = "5.1.0"

frameworks/Kotlin/ktor/ktor/src/main/kotlin/org/jetbrains/ktor/benchmarks/main.kt

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import kotlinx.coroutines.Dispatchers
1313
import kotlinx.coroutines.ExperimentalCoroutinesApi
1414
import kotlinx.coroutines.withContext
1515
import kotlinx.html.*
16-
import java.sql.Connection
1716
import java.util.StringJoiner
18-
import kotlin.random.Random
17+
import java.util.concurrent.ThreadLocalRandom
18+
import kotlinx.coroutines.CoroutineDispatcher
1919

2020
const val HELLO_WORLD = "Hello, World!"
2121
const val WORLD_QUERY = "SELECT id, randomNumber FROM World WHERE id = ?"
@@ -30,7 +30,6 @@ fun Application.main() {
3030
// Create a dedicated dispatcher for database operations
3131
val databaseDispatcher = Dispatchers.IO.limitedParallelism(poolSize)
3232
val helloWorldContent = TextContent(HELLO_WORLD, ContentType.Text.Plain)
33-
val random = Random.Default
3433

3534
install(DefaultHeaders)
3635

@@ -44,36 +43,13 @@ fun Application.main() {
4443
}
4544

4645
get("/db") {
47-
val world = withContext(databaseDispatcher) {
48-
pool.connection.use { connection ->
49-
connection.prepareStatement(WORLD_QUERY).use { statement ->
50-
statement.setInt(1, random.nextInt(DB_ROWS) + 1)
51-
statement.executeQuery().use { rs ->
52-
rs.next()
53-
World(rs.getInt(1), rs.getInt(2))
54-
}
55-
}
56-
}
57-
}
46+
val world = fetchWorld(pool, databaseDispatcher)
5847
call.respondJson(world)
5948
}
6049

61-
fun Connection.selectWorlds(queries: Int): Array<World> =
62-
prepareStatement(WORLD_QUERY).use { statement ->
63-
Array<World>(queries) { i ->
64-
statement.setInt(1, random.nextInt(DB_ROWS) + 1)
65-
statement.executeQuery().use { rs ->
66-
rs.next()
67-
World(rs.getInt(1), rs.getInt(2))
68-
}
69-
}
70-
}
71-
7250
get("/queries") {
7351
val queries = call.queries()
74-
val result = withContext(databaseDispatcher) {
75-
pool.connection.use { it.selectWorlds(queries) }
76-
}
52+
val result = fetchWorlds(pool, queries, databaseDispatcher)
7753
call.respondJson(result)
7854
}
7955

@@ -113,20 +89,18 @@ fun Application.main() {
11389

11490
get("/updates") {
11591
val queries = call.queries()
116-
val result: Array<World>
92+
val result = fetchWorlds(pool, queries, databaseDispatcher)
11793

11894
withContext(databaseDispatcher) {
11995
pool.connection.use { connection ->
120-
result = connection.selectWorlds(queries)
121-
12296
val updateSql = StringJoiner(
12397
", ",
12498
"UPDATE World SET randomNumber = temp.randomNumber FROM (VALUES ",
12599
" ORDER BY 1) AS temp(id, randomNumber) WHERE temp.id = World.id"
126100
)
127101

128-
for (i in result.indices) {
129-
result[i].randomNumber = random.nextInt(DB_ROWS) + 1
102+
for (world in result) {
103+
world.randomNumber = ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1)
130104
updateSql.add("(?, ?)")
131105
}
132106

@@ -146,6 +120,46 @@ fun Application.main() {
146120
}
147121
}
148122

123+
suspend fun fetchWorld(
124+
pool: HikariDataSource,
125+
dispatcher: CoroutineDispatcher
126+
): World = withContext(dispatcher) {
127+
pool.connection.use { connection ->
128+
fetchWorld(connection)
129+
}
130+
}
131+
132+
private fun fetchWorld(connection: java.sql.Connection): World =
133+
connection.prepareStatement(WORLD_QUERY).use { statement ->
134+
statement.setInt(1, ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1))
135+
statement.executeQuery().use { rs ->
136+
rs.next()
137+
World(rs.getInt(1), rs.getInt(2))
138+
}
139+
}
140+
141+
suspend fun fetchWorlds(
142+
pool: HikariDataSource,
143+
queries: Int,
144+
dispatcher: CoroutineDispatcher
145+
): Array<World> = withContext(dispatcher) {
146+
if (queries <= 0) {
147+
emptyArray()
148+
} else {
149+
pool.connection.use { connection ->
150+
connection.prepareStatement(WORLD_QUERY).use { statement ->
151+
Array(queries) {
152+
statement.setInt(1, ThreadLocalRandom.current().nextInt(1, DB_ROWS + 1))
153+
statement.executeQuery().use { rs ->
154+
rs.next()
155+
World(rs.getInt(1), rs.getInt(2))
156+
}
157+
}
158+
}
159+
}
160+
}
161+
}
162+
149163

150164
fun ApplicationCall.queries() =
151165
request.queryParameters["queries"]?.toIntOrNull()?.coerceIn(1, 500) ?: 1

0 commit comments

Comments
 (0)