@@ -11,7 +11,6 @@ import io.vertx.kotlin.core.http.httpServerOptionsOf
1111import io.vertx.kotlin.coroutines.CoroutineRouterSupport
1212import io.vertx.kotlin.coroutines.CoroutineVerticle
1313import io.vertx.kotlin.coroutines.coAwait
14- import io.vertx.kotlin.coroutines.coroutineRouter
1514import io.vertx.kotlin.pgclient.pgConnectOptionsOf
1615import io.vertx.pgclient.PgConnection
1716import io.vertx.sqlclient.PreparedQuery
@@ -30,7 +29,7 @@ import kotlinx.serialization.json.io.encodeToSink
3029import java.time.ZonedDateTime
3130import java.time.format.DateTimeFormatter
3231
33- class MainVerticle (val hasDb : Boolean ) : CoroutineVerticle() {
32+ class MainVerticle (val hasDb : Boolean ) : CoroutineVerticle(), CoroutineRouterSupport {
3433 // `PgConnection`s as used in the "vertx" portion offers better performance than `PgPool`s.
3534 lateinit var pgConnection: PgConnection
3635 lateinit var date: String
@@ -105,47 +104,45 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
105104 putHeader(HttpHeaders .CONTENT_TYPE , " application/json" )
106105 }
107106
108- inner /* value*/ class CoroutineRouterSupportExt (val coroutineRouterSupport : CoroutineRouterSupport ) {
109- fun Route.coHandlerUnconfined (requestHandler : suspend (RoutingContext ) -> Unit ): Route =
110- with (coroutineRouterSupport) {
111- /* Some conclusions from the Plaintext test results with trailing `await()`s:
112- 1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
113- 1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
114- coHandler(Dispatchers .Unconfined , requestHandler)
115- }
116107
117- inline fun <reified T : Any > Route.jsonResponseCoHandler (
118- serializer : SerializationStrategy <T >,
119- crossinline requestHandler : suspend (RoutingContext ) -> @Serializable T
120- ) =
121- coHandlerUnconfined {
122- it.response().run {
123- putJsonResponseHeader()
124-
125- /*
126- // approach 1
127- end(Json.encodeToString(serializer, requestHandler(it)))/*.coAwait()*/
128- */
129-
130- /*
131- // approach 2
132- // java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
108+ fun Route.coHandlerUnconfined (requestHandler : suspend (RoutingContext ) -> Unit ): Route =
109+ /* Some conclusions from the Plaintext test results with trailing `await()`s:
110+ 1. `launch { /*...*/ }` < `launch(start = CoroutineStart.UNDISPATCHED) { /*...*/ }` < `launch(Dispatchers.Unconfined) { /*...*/ }`.
111+ 1. `launch { /*...*/ }` without `context` or `start` lead to `io.netty.channel.StacklessClosedChannelException` and `io.netty.channel.unix.Errors$NativeIoException: sendAddress(..) failed: Connection reset by peer`. */
112+ coHandler(Dispatchers .Unconfined , requestHandler)
113+
114+ inline fun <reified T : Any > Route.jsonResponseCoHandler (
115+ serializer : SerializationStrategy <T >,
116+ crossinline requestHandler : suspend (RoutingContext ) -> @Serializable T
117+ ) =
118+ coHandlerUnconfined {
119+ it.response().run {
120+ putJsonResponseHeader()
121+
122+ /*
123+ // approach 1
124+ end(Json.encodeToString(serializer, requestHandler(it)))/*.coAwait()*/
125+ */
126+
127+ /*
128+ // approach 2
129+ // java.lang.IllegalStateException: You must set the Content-Length header to be the total size of the message body BEFORE sending any data if you are not using HTTP chunked encoding.
130+ toRawSink().buffered().use { bufferedSink ->
131+ @OptIn(ExperimentalSerializationApi::class)
132+ Json.encodeToSink(serializer, requestHandler(it), bufferedSink)
133+ }
134+ */
135+
136+ // approach 3
137+ end(Buffer .buffer().apply {
133138 toRawSink().buffered().use { bufferedSink ->
134139 @OptIn(ExperimentalSerializationApi ::class )
135140 Json .encodeToSink(serializer, requestHandler(it), bufferedSink)
136141 }
137- */
138-
139- // approach 3
140- end(Buffer .buffer().apply {
141- toRawSink().buffered().use { bufferedSink ->
142- @OptIn(ExperimentalSerializationApi ::class )
143- Json .encodeToSink(serializer, requestHandler(it), bufferedSink)
144- }
145- })
146- }
142+ })
147143 }
148- }
144+ }
145+
149146
150147 suspend fun selectRandomWorlds (queries : Int ): List <World > {
151148 val rowSets = List (queries) {
@@ -154,85 +151,83 @@ class MainVerticle(val hasDb: Boolean) : CoroutineVerticle() {
154151 return rowSets.map { it.single().toWorld() }
155152 }
156153
157- fun Router.routes () = coroutineRouter {
158- with (CoroutineRouterSupportExt (this )) {
159- get(" /json" ).jsonResponseCoHandler(Serializers .message) {
160- jsonSerializationMessage
161- }
154+ fun Router.routes () {
155+ get(" /json" ).jsonResponseCoHandler(Serializers .message) {
156+ jsonSerializationMessage
157+ }
162158
163- get(" /db" ).jsonResponseCoHandler(Serializers .world) {
164- val rowSet = selectWorldQuery.execute(Tuple .of(randomIntBetween1And10000())).coAwait()
165- rowSet.single().toWorld()
166- }
159+ get(" /db" ).jsonResponseCoHandler(Serializers .world) {
160+ val rowSet = selectWorldQuery.execute(Tuple .of(randomIntBetween1And10000())).coAwait()
161+ rowSet.single().toWorld()
162+ }
167163
168- get(" /queries" ).jsonResponseCoHandler(Serializers .worlds) {
169- val queries = it.request().getQueries()
170- selectRandomWorlds(queries)
171- }
164+ get(" /queries" ).jsonResponseCoHandler(Serializers .worlds) {
165+ val queries = it.request().getQueries()
166+ selectRandomWorlds(queries)
167+ }
172168
173- get(" /fortunes" ).coHandlerUnconfined {
174- val fortunes = mutableListOf<Fortune >()
175- selectFortuneQuery.execute().coAwait()
176- .mapTo(fortunes) { it.toFortune() }
169+ get(" /fortunes" ).coHandlerUnconfined {
170+ val fortunes = mutableListOf<Fortune >()
171+ selectFortuneQuery.execute().coAwait()
172+ .mapTo(fortunes) { it.toFortune() }
177173
178- fortunes.add(Fortune (0 , " Additional fortune added at request time." ))
179- fortunes.sortBy { it.message }
174+ fortunes.add(Fortune (0 , " Additional fortune added at request time." ))
175+ fortunes.sortBy { it.message }
180176
181- val htmlString = buildString {
182- append(" <!DOCTYPE html>" )
183- appendHTML(false ).html {
184- head {
185- title(" Fortunes" )
186- }
187- body {
188- table {
177+ val htmlString = buildString {
178+ append(" <!DOCTYPE html>" )
179+ appendHTML(false ).html {
180+ head {
181+ title(" Fortunes" )
182+ }
183+ body {
184+ table {
185+ tr {
186+ th { + " id" }
187+ th { + " message" }
188+ }
189+ for (fortune in fortunes)
189190 tr {
190- th { + " id " }
191- th { + " message" }
191+ td { + fortune.id.toString() }
192+ td { + fortune. message }
192193 }
193- for (fortune in fortunes)
194- tr {
195- td { + fortune.id.toString() }
196- td { + fortune.message }
197- }
198- }
199194 }
200195 }
201196 }
197+ }
202198
203- it.response().run {
204- putCommonHeaders()
205- putHeader(HttpHeaders .CONTENT_TYPE , " text/html; charset=utf-8" )
206- end(htmlString)/* .coAwait()*/
207- }
199+ it.response().run {
200+ putCommonHeaders()
201+ putHeader(HttpHeaders .CONTENT_TYPE , " text/html; charset=utf-8" )
202+ end(htmlString)/* .coAwait()*/
208203 }
204+ }
209205
210- get(" /updates" ).jsonResponseCoHandler(Serializers .worlds) {
211- val queries = it.request().getQueries()
212- val worlds = selectRandomWorlds(queries)
213- val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
206+ get(" /updates" ).jsonResponseCoHandler(Serializers .worlds) {
207+ val queries = it.request().getQueries()
208+ val worlds = selectRandomWorlds(queries)
209+ val updatedWorlds = worlds.map { it.copy(randomNumber = randomIntBetween1And10000()) }
214210
215- // Approach 1
216- // The updated worlds need to be sorted first to avoid deadlocks.
217- updateWordQuery
218- .executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple .of(it.randomNumber, it.id) }).coAwait()
211+ // Approach 1
212+ // The updated worlds need to be sorted first to avoid deadlocks.
213+ updateWordQuery
214+ .executeBatch(updatedWorlds.sortedBy { it.id }.map { Tuple .of(it.randomNumber, it.id) }).coAwait()
219215
220- /*
221- // Approach 2, worse performance
222- updatedWorlds.map {
223- pgPool.preparedQuery(UPDATE_WORLD_SQL).execute(Tuple.of(it.randomNumber, it.id))
224- }.awaitAll()
225- */
216+ /*
217+ // Approach 2, worse performance
218+ updatedWorlds.map {
219+ pgPool.preparedQuery(UPDATE_WORLD_SQL).execute(Tuple.of(it.randomNumber, it.id))
220+ }.awaitAll()
221+ */
226222
227- updatedWorlds
228- }
223+ updatedWorlds
224+ }
229225
230- get(" /plaintext" ).coHandlerUnconfined {
231- it.response().run {
232- putCommonHeaders()
233- putHeader(HttpHeaders .CONTENT_TYPE , " text/plain" )
234- end(" Hello, World!" )/* .coAwait()*/
235- }
226+ get(" /plaintext" ).coHandlerUnconfined {
227+ it.response().run {
228+ putCommonHeaders()
229+ putHeader(HttpHeaders .CONTENT_TYPE , " text/plain" )
230+ end(" Hello, World!" )/* .coAwait()*/
236231 }
237232 }
238233 }
0 commit comments