@@ -62,17 +62,13 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
6262 ctx.setTimeout(asyncTimeoutMillis)
6363 // Must be done on the container thread for Tomcat's sake when using async I/O.
6464 val bodyWriter = servletIo.initWriter(servletResponse)
65- val result = F
66- .attempt(
67- toRequest(servletRequest) .fold(
68- onParseFailure(_, servletResponse, bodyWriter ),
65+ val result =
66+ toRequest(servletRequest)
67+ .fold(
68+ onParseFailure(_, servletResponse),
6969 handleRequest(ctx, _, bodyWriter),
7070 )
71- )
72- .flatMap {
73- case Right (()) => F .delay(ctx.complete)
74- case Left (t) => errorHandler(servletRequest, servletResponse)(t)
75- }
71+ .recoverWith(errorHandler(servletRequest, servletResponse))
7672 dispatcher.unsafeRunAndForget(result)
7773 } catch errorHandler(servletRequest, servletResponse).andThen(dispatcher.unsafeRunSync _)
7874
@@ -87,17 +83,23 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
8783 // It is an error to add a listener to an async context that is
8884 // already completed, so we must take care to add the listener
8985 // before the response can complete.
90-
9186 val timeout =
92- F .async[Response [ F ] ](cb =>
87+ F .async[Unit ](cb =>
9388 gate.complete(ctx.addListener(new AsyncTimeoutHandler (cb))).as(noopCancelToken)
9489 )
9590 val response =
9691 gate.get *>
9792 F .defer(serviceFn(request))
9893 .recoverWith(serviceErrorHandler(request))
99- val servletResponse = ctx.getResponse.asInstanceOf [HttpServletResponse ]
100- F .race(timeout, response).flatMap(r => renderResponse(r.merge, servletResponse, bodyWriter))
94+ F .race(timeout, response).flatMap {
95+ case Left (_) =>
96+ // In Jetty, if onTimeout is called, we need to complete on the
97+ // listener's own thread.
98+ F .unit
99+ case Right (resp) =>
100+ val servletResponse = ctx.getResponse.asInstanceOf [HttpServletResponse ]
101+ renderResponse(resp, servletResponse, bodyWriter) *> F .delay(ctx.complete())
102+ }
101103 }
102104
103105 private def errorHandler (
@@ -124,11 +126,19 @@ class AsyncHttp4sServlet[F[_]] @deprecated("Use AsyncHttp4sServlet.builder", "0.
124126 }
125127 }
126128
127- private class AsyncTimeoutHandler (cb : Callback [Response [ F ] ]) extends AbstractAsyncListener {
129+ private class AsyncTimeoutHandler (cb : Callback [Unit ]) extends AbstractAsyncListener {
128130 override def onTimeout (event : AsyncEvent ): Unit = {
131+ // In Jetty, we must complete on the same thread as the timeout
132+ // handler. This triggers a cancellation of the service so we
133+ // can take over.
134+ cb(Right (()))
135+
136+ val ctx = event.getAsyncContext
129137 val req = event.getAsyncContext.getRequest.asInstanceOf [HttpServletRequest ]
130138 logger.info(s " Request timed out: ${req.getMethod} ${req.getServletPath}${req.getPathInfo}" )
131- cb(Right (Response .timeout[F ]))
139+ val resp = event.getAsyncContext.getResponse.asInstanceOf [HttpServletResponse ]
140+ resp.sendError(Response .timeout.status.code, " Response timed out" )
141+ ctx.complete()
132142 }
133143 }
134144}
0 commit comments