Skip to content

Commit 10ede54

Browse files
Use mayInterruptIfRunning=true on CompletableFuture cancellation to prevent resource leaks
Closes #1188
1 parent bc78e0f commit 10ede54

File tree

4 files changed

+141
-77
lines changed

4 files changed

+141
-77
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package org.http4s.jdkhttpclient
2+
3+
import cats.effect.Async
4+
import cats.effect.kernel.Cont
5+
import cats.effect.kernel.MonadCancelThrow
6+
import cats.~>
7+
8+
import java.util.concurrent.CompletableFuture
9+
import java.util.concurrent.CompletionException
10+
11+
private[jdkhttpclient] object CancelableAsync {
12+
13+
/** This is a direct copy-paste of [[cats.effect.kernel.AsyncPlatform.fromCompletableFuture]] with
14+
* `mayInterruptIfRunning` set to `true` instead of `false`.
15+
*
16+
* This is *really* important for JDK HTTP Client, since if you cancel a `CompletableFuture` with
17+
* `mayInterruptIfRunning` set to `false`, it will not cancel the actual HTTP request but the
18+
* CompletableFuture will still report a successful cancellation, leaking the connection which
19+
* will never be cleaned up, taking up resources until the JVM exits and blocking the graceful
20+
* finalization of the JDK HTTP Client.
21+
*/
22+
def fromCompletableFuture[F[_], A](fut: F[CompletableFuture[A]])(implicit F: Async[F]): F[A] =
23+
F.cont {
24+
new Cont[F, A, A] {
25+
def apply[G[_]](implicit
26+
G: MonadCancelThrow[G]
27+
): (Either[Throwable, A] => Unit, G[A], F ~> G) => G[A] = { (resume, get, lift) =>
28+
G.uncancelable { poll =>
29+
G.flatMap(poll(lift(fut))) { cf =>
30+
val go = F.delay {
31+
cf.handle[Unit] {
32+
case (a, null) => resume(Right(a))
33+
case (_, t) =>
34+
resume(Left(t match {
35+
case e: CompletionException if e.getCause ne null => e.getCause
36+
case _ => t
37+
}))
38+
}
39+
}
40+
41+
val await = G.onCancel(
42+
poll(get),
43+
// if cannot cancel, fallback to get
44+
G.ifM(lift(F.delay(cf.cancel(true))))(G.unit, G.void(get))
45+
)
46+
47+
G.productR(lift(go))(await)
48+
}
49+
}
50+
}
51+
}
52+
}
53+
}

core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ object JdkHttpClient {
240240
Client[F] { req =>
241241
for {
242242
req <- convertRequest(req)
243-
res = F.fromCompletableFuture(
243+
res = CancelableAsync.fromCompletableFuture(
244244
F.delay(jdkHttpClient.sendAsync(req, BodyHandlers.ofPublisher))
245245
)
246246
res <- convertResponse(res)

core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ object JdkWSClient {
105105
}
106106
}
107107
webSocket <- poll(
108-
F.fromCompletableFuture(
108+
CancelableAsync.fromCompletableFuture(
109109
F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener))
110110
)
111111
)

core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala

Lines changed: 86 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,18 @@ import java.util.concurrent.TimeUnit
3636
import scala.concurrent.duration._
3737

3838
final class CompletableFutureTerminationTest extends CatsEffectSuite {
39+
3940
import CompletableFutureTerminationTest._
4041

4142
private val duration: FiniteDuration =
4243
FiniteDuration(50L, TimeUnit.MILLISECONDS)
4344

45+
/** If test took longer than serverTimeout, it means we leaked a connection, and the client held
46+
* onto it even after the cancellation attempt, blocking http client finalization. It got
47+
* eventually released after the server force terminated all connections.
48+
*/
49+
override def munitIOTimeout: Duration = serverTimeout
50+
4451
// This test ensures that converting from a
4552
// java.util.concurrent.CompletableFuture to an effect type, such as IO,
4653
// will properly terminate the CompletableFuture if the resulting effect is
@@ -65,81 +72,87 @@ final class CompletableFutureTerminationTest extends CatsEffectSuite {
6572
//
6673
// See: https://docs.oracle.com/en/java/javase/14/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscriber.html
6774
test("Terminating an effect generated from a CompletableFuture") {
68-
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
69-
.flatMap { case (stallServer, observation, gotRequest) =>
70-
// Acquire the `stallServer` semaphore so that the server will not
71-
// return _any_ bytes until we release a permit.
72-
stallServer.acquire *>
73-
// Acquire the `gotRequest` semaphore. The server will release this
74-
// once it gets our Request. We wait until this happens to start our
75-
// timeout logic.
76-
gotRequest.acquire *>
77-
// Start a Http4s Server, it will be terminated at the conclusion of
78-
// this test.
79-
stallingServerR[IO](stallServer, gotRequest).use { (server: Server) =>
80-
// Call the server, using the JDK client. We call directly with
81-
// the JDK client because we need to have low level control over
82-
// the result to observe whether or not the
83-
// java.util.concurrent.CompletableFuture is still executing (and
84-
// holding on to resources).
85-
callServer[IO](server).flatMap((cf: CompletableFuture[HttpResponse[String]]) =>
86-
// Attach a handler onto the result. This will populate our
87-
// `observation` Deferred value when the CompletableFuture
88-
// finishes for any reason.
89-
//
90-
// We start executing this in the background, so that we
91-
// asynchronously populate our Observation.
92-
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
93-
// Wait until we are sure the Http4s Server has received the
94-
// request.
95-
gotRequest.acquire *>
96-
// Lift the CompletableFuture to a IO value and attach a
97-
// (short) timeout to the termination.
75+
JdkHttpClient.defaultHttpClientResource[IO].use { client =>
76+
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
77+
.flatMap { case (stallServer, observation, gotRequest) =>
78+
// Acquire the `stallServer` semaphore so that the server will not
79+
// return _any_ bytes until we release a permit.
80+
stallServer.acquire *>
81+
// Acquire the `gotRequest` semaphore. The server will release this
82+
// once it gets our Request. We wait until this happens to start our
83+
// timeout logic.
84+
gotRequest.acquire *>
85+
// Start a Http4s Server, it will be terminated at the conclusion of
86+
// this test.
87+
stallingServerR[IO](stallServer, gotRequest).use { (server: Server) =>
88+
// Call the server, using the JDK client. We call directly with
89+
// the JDK client because we need to have low level control over
90+
// the result to observe whether or not the
91+
// java.util.concurrent.CompletableFuture is still executing (and
92+
// holding on to resources).
93+
callServer[IO](client, server).flatMap(
94+
(cf: CompletableFuture[HttpResponse[String]]) =>
95+
// Attach a handler onto the result. This will populate our
96+
// `observation` Deferred value when the CompletableFuture
97+
// finishes for any reason.
9898
//
99-
// Important! The IO result _must_ be terminated via the
100-
// timeout _before any bytes_ have been received by the JDK
101-
// HttpClient in order to validate resource safety. Once we
102-
// start getting bytes back, the CompletableFuture _is
103-
// complete_ and we are in a different context.
104-
//
105-
// Notice that we release stallServer _after_ the
106-
// timeout. _This is the crux of this entire test_. Once
107-
// we release `stallServer`, the Http4s Server will
108-
// attempt to send back an Http Response to our JDK
109-
// client. If the CompletableFuture and associated
110-
// resources were properly cleaned up after the
111-
// timeoutTo terminated the running effect, then the JDK
112-
// client connection will either be closed, or the
113-
// attempt to invoke `complete` on the
114-
// `CompletableFuture` will fail, in both cases
115-
// releasing any resources being held. If not, then it
116-
// will still receive bytes, meaning there is a resource
117-
// leak.
118-
IO.fromCompletableFuture(IO(cf))
119-
.void
120-
.timeoutTo(duration, stallServer.release) *>
121-
// After the timeout has triggered, wait for the observation to complete.
122-
fiber.join *>
123-
// Check our observation. Whether or not there is an exception
124-
// is not actually relevant to the success case. What _is_
125-
// important is that there is no result. If there is a result,
126-
// then that means that _after_ `timeoutTo` released
127-
// `stallServer` the CompletableFuture for the Http response
128-
// body still processed data, which indicates a resource leak.
129-
observation.get.flatMap {
130-
case Observation(None, _) => IO.pure(true)
131-
case otherwise =>
132-
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
133-
}
99+
// We start executing this in the background, so that we
100+
// asynchronously populate our Observation.
101+
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
102+
// Wait until we are sure the Http4s Server has received the
103+
// request.
104+
gotRequest.acquire *>
105+
// Lift the CompletableFuture to a IO value and attach a
106+
// (short) timeout to the termination.
107+
//
108+
// Important! The IO result _must_ be terminated via the
109+
// timeout _before any bytes_ have been received by the JDK
110+
// HttpClient in order to validate resource safety. Once we
111+
// start getting bytes back, the CompletableFuture _is
112+
// complete_ and we are in a different context.
113+
//
114+
// Notice that we release stallServer _after_ the
115+
// timeout. _This is the crux of this entire test_. Once
116+
// we release `stallServer`, the Http4s Server will
117+
// attempt to send back an Http Response to our JDK
118+
// client. If the CompletableFuture and associated
119+
// resources were properly cleaned up after the
120+
// timeoutTo terminated the running effect, then the JDK
121+
// client connection will either be closed, or the
122+
// attempt to invoke `complete` on the
123+
// `CompletableFuture` will fail, in both cases
124+
// releasing any resources being held. If not, then it
125+
// will still receive bytes, meaning there is a resource
126+
// leak.
127+
CancelableAsync
128+
.fromCompletableFuture(IO(cf))
129+
.void
130+
.timeoutTo(duration, stallServer.release) *>
131+
// After the timeout has triggered, wait for the observation to complete.
132+
fiber.join *>
133+
// Check our observation. Whether or not there is an exception
134+
// is not actually relevant to the success case. What _is_
135+
// important is that there is no result. If there is a result,
136+
// then that means that _after_ `timeoutTo` released
137+
// `stallServer` the CompletableFuture for the Http response
138+
// body still processed data, which indicates a resource leak.
139+
observation.get.flatMap {
140+
case Observation(None, _) => IO.pure(true)
141+
case otherwise =>
142+
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
143+
}
144+
)
134145
)
135-
)
136-
}
137-
}
146+
}
147+
}
148+
}
138149
}
139150
}
140151

141152
object CompletableFutureTerminationTest {
142153

154+
private val serverTimeout = 5.seconds
155+
143156
/** ADT to contain the result of an invocation to
144157
* [[java.util.concurrent.CompletionStage#handleAsync]]
145158
*
@@ -179,14 +192,12 @@ object CompletableFutureTerminationTest {
179192
EmberServerBuilder
180193
.default[F]
181194
.withHttpApp(
182-
Kleisli(
183-
Function.const(
184-
gotRequest.release *>
185-
semaphore.permit.use(_ => F.pure(Response[F]()))
186-
)
195+
Kleisli.liftF(
196+
gotRequest.release *>
197+
semaphore.permit.use(_ => F.pure(Response[F]()))
187198
)
188199
)
189-
.withShutdownTimeout(1.second)
200+
.withShutdownTimeout(serverTimeout)
190201
.withPort(port"0")
191202
.build
192203

@@ -218,11 +229,11 @@ object CompletableFutureTerminationTest {
218229
* in a [[java.util.concurrent.CompletableFuture]].
219230
*/
220231
private def callServer[F[_]](
232+
client: HttpClient,
221233
server: Server
222234
)(implicit F: Sync[F]): F[CompletableFuture[HttpResponse[String]]] =
223235
for {
224236
jURI <- F.catchNonFatal(new URI(server.baseUri.renderString))
225-
client <- F.delay(HttpClient.newHttpClient)
226237
result <- F.delay(
227238
client.sendAsync(HttpRequest.newBuilder(jURI).build(), HttpResponse.BodyHandlers.ofString)
228239
)

0 commit comments

Comments
 (0)