Skip to content

Commit f607f4e

Browse files
committed
Merge branch 'kotlin-vertx-benchmarks-revamp-coroutine-router-support' into kotlin-vertx-benchmarks-revamp
2 parents 11a2473 + bfdcf67 commit f607f4e

File tree

1 file changed

+20
-25
lines changed

1 file changed

+20
-25
lines changed

frameworks/Kotlin/vertx-web-kotlinx/src/main/kotlin/MainVerticle.kt

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import io.vertx.ext.web.Route
88
import io.vertx.ext.web.Router
99
import io.vertx.ext.web.RoutingContext
1010
import io.vertx.kotlin.core.http.httpServerOptionsOf
11+
import io.vertx.kotlin.coroutines.CoroutineRouterSupport
1112
import io.vertx.kotlin.coroutines.CoroutineVerticle
1213
import io.vertx.kotlin.coroutines.coAwait
1314
import io.vertx.kotlin.pgclient.pgConnectOptionsOf
@@ -17,7 +18,6 @@ import io.vertx.sqlclient.Row
1718
import io.vertx.sqlclient.RowSet
1819
import io.vertx.sqlclient.Tuple
1920
import kotlinx.coroutines.Dispatchers
20-
import kotlinx.coroutines.launch
2121
import kotlinx.html.*
2222
import kotlinx.html.stream.appendHTML
2323
import kotlinx.io.buffered
@@ -29,21 +29,7 @@ import kotlinx.serialization.json.io.encodeToSink
2929
import java.time.ZonedDateTime
3030
import java.time.format.DateTimeFormatter
3131

32-
class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
33-
inline fun Route.checkedCoroutineHandlerUnconfined(crossinline requestHandler: suspend (RoutingContext) -> Unit): Route =
34-
handler { ctx ->
35-
/* Some conclusions from the Plaintext test results with trailing `await()`s:
36-
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
37-
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
38-
launch(Dispatchers.Unconfined) {
39-
try {
40-
requestHandler(ctx)
41-
} catch (t: Throwable) {
42-
ctx.fail(t)
43-
}
44-
}
45-
}
46-
32+
class MainVerticle(val hasDb: Boolean) : CoroutineVerticle(), CoroutineRouterSupport {
4733
// `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
4834
lateinit var pgConnection: PgConnection
4935
lateinit var date: String
@@ -118,10 +104,18 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
118104
putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
119105
}
120106

121-
inline fun <reified T : Any> Route.jsonResponseHandler(
122-
serializer: SerializationStrategy<T>, crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
107+
108+
fun Route.coHandlerUnconfined(requestHandler: suspend (RoutingContext) -> Unit): Route =
109+
/* Some conclusions from the Plaintext test results with trailing `await()`s:
110+
1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
111+
1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
112+
coHandler(Dispatchers.Unconfined, requestHandler)
113+
114+
inline fun <reified T : Any> Route.jsonResponseCoHandler(
115+
serializer: SerializationStrategy<T>,
116+
crossinline requestHandler: suspend (RoutingContext) -> @Serializable T
123117
) =
124-
checkedCoroutineHandlerUnconfined {
118+
coHandlerUnconfined {
125119
it.response().run {
126120
putJsonResponseHeader()
127121

@@ -149,6 +143,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
149143
}
150144
}
151145

146+
152147
suspend fun selectRandomWorlds(queries: Int): List<World> {
153148
val rowSets = List(queries) {
154149
selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000()))
@@ -157,21 +152,21 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
157152
}
158153

159154
fun Router.routes() {
160-
get("/json").jsonResponseHandler(Serializers.message) {
155+
get("/json").jsonResponseCoHandler(Serializers.message) {
161156
jsonSerializationMessage
162157
}
163158

164-
get("/db").jsonResponseHandler(Serializers.world) {
159+
get("/db").jsonResponseCoHandler(Serializers.world) {
165160
val rowSet = selectWorldQuery.execute(Tuple.of(randomIntBetween1And10000())).coAwait()
166161
rowSet.single().toWorld()
167162
}
168163

169-
get("/queries").jsonResponseHandler(Serializers.worlds) {
164+
get("/queries").jsonResponseCoHandler(Serializers.worlds) {
170165
val queries = it.request().getQueries()
171166
selectRandomWorlds(queries)
172167
}
173168

174-
get("/fortunes").checkedCoroutineHandlerUnconfined {
169+
get("/fortunes").coHandlerUnconfined {
175170
val fortunes = mutableListOf<Fortune>()
176171
selectFortuneQuery.execute().coAwait()
177172
.mapTo(fortunes) { it.toFortune() }
@@ -208,7 +203,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
208203
}
209204
}
210205

211-
get("/updates").jsonResponseHandler(Serializers.worlds) {
206+
get("/updates").jsonResponseCoHandler(Serializers.worlds) {
212207
val queries = it.request().getQueries()
213208
val worlds = selectRandomWorlds(queries)
214209
val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
@@ -228,7 +223,7 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
228223
updatedWorlds
229224
}
230225

231-
get("/plaintext").checkedCoroutineHandlerUnconfined {
226+
get("/plaintext").coHandlerUnconfined {
232227
it.response().run {
233228
putCommonHeaders()
234229
putHeader(HttpHeaders.CONTENT_TYPE, "text/plain")

0 commit comments

Comments
 (0)