@@ -60,20 +60,10 @@ fun Application.main() {
6060 call.respondJson(result)
6161 }
6262
63- fun selectWorlds (queries : Int ): Flow <World > = flow {
64- repeat(queries) {
65- emit(getWorld(dbConnFactory).awaitFirst())
66- }
67- }
68-
6963 get(" /queries" ) {
7064 val queries = call.queries()
7165
72- val result = buildList {
73- selectWorlds(queries).collect {
74- add(it)
75- }
76- }
66+ val result = fetchWorldsConcurrently(dbConnFactory, queries)
7767
7868 call.respondJson(result)
7969 }
@@ -123,19 +113,24 @@ fun Application.main() {
123113 }.sortedBy { it.id }
124114
125115 Mono .usingWhen(dbConnFactory.create(), { connection ->
126- connection.beginTransaction()
127- val statement = connection.createStatement(UPDATE_QUERY )
128- updatedWorlds.forEach { world ->
129- statement.bind(0 , world.randomNumber).bind(1 , world.id).add()
130- }
131- Mono .from(statement.execute())
132- .flatMap { Mono .from(it.rowsUpdated) }
116+ Mono .from(connection.beginTransaction())
117+ .thenMany(
118+ Flux .fromIterable(updatedWorlds)
119+ .concatMap { world ->
120+ Mono .from(
121+ connection.createStatement(UPDATE_QUERY )
122+ .bind(" $1" , world.randomNumber)
123+ .bind(" $2" , world.id)
124+ .execute()
125+ ).flatMap { Mono .from(it.rowsUpdated) }
126+ }
127+ )
133128 .then(Mono .from(connection.commitTransaction()))
134129 },
135130 Connection ::close,
136131 { connection, _ -> connection.rollbackTransaction() },
137132 { connection -> connection.rollbackTransaction() }
138- ).awaitSingle ()
133+ ).awaitFirstOrNull ()
139134
140135 call.respondJson(updatedWorlds)
141136 }
@@ -163,7 +158,7 @@ private fun getWorld(
163158suspend fun fetchWorldsConcurrently (factory : ConnectionFactory , count : Int ): List <World > =
164159 coroutineScope {
165160 (0 until count).map {
166- async { getWorld(factory, ThreadLocalRandom .current() ).awaitSingle() }
161+ async { getWorld(factory).awaitSingle() }
167162 }.awaitAll()
168163 }
169164
0 commit comments