Skip to content

Commit 15a1faa

Browse files
Ilya NemtsevIlya Nemtsev
authored andcommitted
improved updated/queries via Flows
1 parent 83abea6 commit 15a1faa

File tree

1 file changed

+19
-14
lines changed
  • frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks

1 file changed

+19
-14
lines changed

frameworks/Kotlin/ktor/ktor-r2dbc/src/main/kotlin/org/jetbrains/ktor/benchmarks/Hello.kt

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import io.r2dbc.spi.ConnectionFactories
2020
import io.r2dbc.spi.ConnectionFactory
2121
import io.r2dbc.spi.ConnectionFactoryOptions
2222
import kotlinx.coroutines.*
23+
import kotlinx.coroutines.flow.*
24+
import kotlinx.coroutines.reactive.asFlow
25+
import kotlinx.coroutines.reactive.awaitFirst
2326
import kotlinx.coroutines.reactive.awaitFirstOrNull
2427
import kotlinx.html.*
2528
import kotlinx.serialization.encodeToString
@@ -62,24 +65,21 @@ fun Application.main() {
6265
call.respondText(Json.encodeToString(result), ContentType.Application.Json)
6366
}
6467

65-
suspend fun selectWorlds(queries: Int, random: Random): List<World> = coroutineScope {
66-
val result = ArrayList<Deferred<World?>>(queries)
67-
68+
fun selectWorlds(queries: Int, random: Random): Flow<World> = flow {
6869
repeat(queries) {
69-
val deferred = async {
70-
getWorld(dbConnFactory, random).awaitFirstOrNull()
71-
}
72-
result.add(deferred)
70+
emit(getWorld(dbConnFactory, random).awaitFirst())
7371
}
74-
75-
result.awaitAll().filterNotNull()
7672
}
7773

7874
get("/queries") {
7975
val queries = call.queries()
8076
val random = Random.Default
8177

82-
val result = selectWorlds(queries, random)
78+
val result = buildList {
79+
selectWorlds(queries, random).collect {
80+
add(it)
81+
}
82+
}
8383

8484
call.respondText(Json.encodeToString(result), ContentType.Application.Json)
8585
}
@@ -127,9 +127,14 @@ fun Application.main() {
127127
val result = coroutineScope {
128128
val worlds = selectWorlds(queries, random)
129129

130-
worlds.forEach { it.randomNumber = random.nextInt(DB_ROWS) + 1 }
130+
val worldsUpdated = buildList {
131+
worlds.collect {
132+
it.randomNumber = random.nextInt(DB_ROWS) + 1
133+
add(it)
134+
}
135+
}
131136

132-
val updateRequests = worlds.map { world ->
137+
val updateRequests = worldsUpdated.map { world ->
133138
Mono.usingWhen(dbConnFactory.create(), { connection ->
134139
Mono.from(
135140
connection.createStatement(UPDATE_QUERY).bind(0, world.randomNumber).bind(1, world.id)
@@ -138,8 +143,8 @@ fun Application.main() {
138143
}, Connection::close)
139144
}
140145

141-
Flux.merge(updateRequests).collectList().awaitFirstOrNull()
142-
worlds
146+
Flux.merge(updateRequests).collectList().awaitFirst()
147+
worldsUpdated
143148
}
144149

145150
call.respondText(Json.encodeToString(result), ContentType.Application.Json)

0 commit comments

Comments
 (0)