diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala index 61aa71d1..65a6b34e 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkHttpClient.scala @@ -178,7 +178,10 @@ object JdkHttpClient { ): Resource[F, Response[F]] = Resource .makeFull { (poll: Poll[F]) => - (Deferred[F, Unit], poll(responseF)).tupled + val maybeCancelableResponseF = + if (JdkVersion.supportsCancellation) poll(responseF) + else responseF + (Deferred[F, Unit], maybeCancelableResponseF).tupled } { case (subscription, response) => subscription.tryGet.flatMap { case None => @@ -274,7 +277,7 @@ object JdkHttpClient { F.delay { val builder = HttpClient.newBuilder() // workaround for https://github.com/http4s/http4s-jdk-http-client/issues/200 - if (Runtime.version().feature() == 11) { + if (JdkVersion.tls13TriggersDeadlock) { val params = javax.net.ssl.SSLContext.getDefault().getDefaultSSLParameters() params.setProtocols(params.getProtocols().filter(_ != "TLSv1.3")) val _ = builder.sslParameters(params) diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkVersion.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkVersion.scala new file mode 100644 index 00000000..d082d139 --- /dev/null +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkVersion.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2019 http4s.org + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.http4s.jdkhttpclient + +private object JdkVersion { + val supportsCancellation: Boolean = Runtime.version().feature() >= 16 + + // see https://github.com/http4s/http4s-jdk-http-client/issues/200 + val tls13TriggersDeadlock: Boolean = Runtime.version().feature() == 11 +} diff --git a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala index ab744632..8605cc2a 100644 --- a/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala +++ b/core/src/main/scala/org/http4s/jdkhttpclient/JdkWSClient.scala @@ -104,11 +104,12 @@ object JdkWSClient { handleReceive(error.asLeft); () } } - webSocket <- poll( - F.fromCompletableFuture( - F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener)) - ) + webSocketF = F.fromCompletableFuture( + F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener)) ) + webSocket <- + if (JdkVersion.supportsCancellation) poll(webSocketF) + else webSocketF sendSem <- Semaphore[F](1L) } yield (webSocket, queue, closedDef, sendSem) } { case (webSocket, queue, _, _) => diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala b/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala index c0c5e07b..b7d7419d 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/CompletableFutureTerminationTest.scala @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ final class CompletableFutureTerminationTest extends CatsEffectSuite { + import CompletableFutureTerminationTest._ private val duration: FiniteDuration = @@ -65,81 +66,105 @@ final class CompletableFutureTerminationTest extends CatsEffectSuite { // // See: https://docs.oracle.com/en/java/javase/14/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscriber.html test("Terminating an effect generated from a CompletableFuture") { - (Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled - .flatMap { case (stallServer, observation, gotRequest) => - // Acquire the `stallServer` semaphore so that the server will not - // return _any_ bytes until we release a permit. - stallServer.acquire *> - // Acquire the `gotRequest` semaphore. The server will release this - // once it gets our Request. We wait until this happens to start our - // timeout logic. - gotRequest.acquire *> - // Start a Http4s Server, it will be terminated at the conclusion of - // this test. - stallingServerR[IO](stallServer, gotRequest).use { (server: Server) => - // Call the server, using the JDK client. We call directly with - // the JDK client because we need to have low level control over - // the result to observe whether or not the - // java.util.concurrent.CompletableFuture is still executing (and - // holding on to resources). - callServer[IO](server).flatMap((cf: CompletableFuture[HttpResponse[String]]) => - // Attach a handler onto the result. This will populate our - // `observation` Deferred value when the CompletableFuture - // finishes for any reason. - // - // We start executing this in the background, so that we - // asynchronously populate our Observation. - observeCompletableFuture(observation, cf).start.flatMap(fiber => - // Wait until we are sure the Http4s Server has received the - // request. - gotRequest.acquire *> - // Lift the CompletableFuture to a IO value and attach a - // (short) timeout to the termination. - // - // Important! The IO result _must_ be terminated via the - // timeout _before any bytes_ have been received by the JDK - // HttpClient in order to validate resource safety. Once we - // start getting bytes back, the CompletableFuture _is - // complete_ and we are in a different context. - // - // Notice that we release stallServer _after_ the - // timeout. _This is the crux of this entire test_. Once - // we release `stallServer`, the Http4s Server will - // attempt to send back an Http Response to our JDK - // client. If the CompletableFuture and associated - // resources were properly cleaned up after the - // timeoutTo terminated the running effect, then the JDK - // client connection will either be closed, or the - // attempt to invoke `complete` on the - // `CompletableFuture` will fail, in both cases - // releasing any resources being held. If not, then it - // will still receive bytes, meaning there is a resource - // leak. - IO.fromCompletableFuture(IO(cf)) - .void - .timeoutTo(duration, stallServer.release) *> - // After the timeout has triggered, wait for the observation to complete. - fiber.join *> - // Check our observation. Whether or not there is an exception - // is not actually relevant to the success case. What _is_ - // important is that there is no result. If there is a result, - // then that means that _after_ `timeoutTo` released - // `stallServer` the CompletableFuture for the Http response - // body still processed data, which indicates a resource leak. - observation.get.flatMap { - case Observation(None, _) => IO.pure(true) - case otherwise => - IO.raiseError(new AssertionError(s"Expected no result, got $otherwise")) + assume( + JdkVersion.supportsCancellation, + "This test checks cancellation behavior, which was only introduced in JDK 16." + ) + + JdkHttpClient.defaultHttpClientResource[IO].use { client => + (Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled + .flatMap { case (stallServer, observation, gotRequest) => + // Acquire the `stallServer` semaphore so that the server will not + // return _any_ bytes until we release a permit. + stallServer.acquire *> + // Acquire the `gotRequest` semaphore. The server will release this + // once it gets our Request. We wait until this happens to start our + // timeout logic. + gotRequest.acquire *> + // Start a Http4s Server, it will be terminated at the conclusion of + // this test. + Async[IO].bracket(stallingServerR[IO](stallServer, gotRequest).allocated) { + case (server: Server, _) => + // Call the server, using the JDK client. We call directly with + // the JDK client because we need to have low level control over + // the result to observe whether or not the + // java.util.concurrent.CompletableFuture is still executing (and + // holding on to resources). + callServer[IO](client, server).flatMap( + (cf: CompletableFuture[HttpResponse[String]]) => + // Attach a handler onto the result. This will populate our + // `observation` Deferred value when the CompletableFuture + // finishes for any reason. + // + // We start executing this in the background, so that we + // asynchronously populate our Observation. + observeCompletableFuture(observation, cf).start.flatMap(fiber => + // Wait until we are sure the Http4s Server has received the + // request. + gotRequest.acquire *> + // Lift the CompletableFuture to a IO value and attach a + // (short) timeout to the termination. + // + // Important! The IO result _must_ be terminated via the + // timeout _before any bytes_ have been received by the JDK + // HttpClient in order to validate resource safety. Once we + // start getting bytes back, the CompletableFuture _is + // complete_ and we are in a different context. + // + // Notice that we release stallServer _after_ the + // timeout. _This is the crux of this entire test_. Once + // we release `stallServer`, the Http4s Server will + // attempt to send back an Http Response to our JDK + // client. If the CompletableFuture and associated + // resources were properly cleaned up after the + // timeoutTo terminated the running effect, then the JDK + // client connection will either be closed, or the + // attempt to invoke `complete` on the + // `CompletableFuture` will fail, in both cases + // releasing any resources being held. If not, then it + // will still receive bytes, meaning there is a resource + // leak. + IO.fromCompletableFuture(IO(cf)) + .void + .timeoutTo(duration, stallServer.release) *> + // After the timeout has triggered, wait for the observation to complete. + fiber.join *> + // Check our observation. Whether or not there is an exception + // is not actually relevant to the success case. What _is_ + // important is that there is no result. If there is a result, + // then that means that _after_ `timeoutTo` released + // `stallServer` the CompletableFuture for the Http response + // body still processed data, which indicates a resource leak. + observation.get.flatMap { + case Observation(None, _) => IO.pure(true) + case otherwise => + IO.raiseError(new AssertionError(s"Expected no result, got $otherwise")) + } + ) + ) + } { case (_, release) => + release.timed + .flatMap { case (duration, _) => + IO { + assert( + clue(duration) < serverTimeout, + "Finalization didn't complete until server shutdown timeout was reached, a connection is likely leaked by the client" + ) } - ) - ) - } - } + } + + } + } + } } } object CompletableFutureTerminationTest { + val a: Boolean = () < 5.seconds + + private val serverTimeout = 5.seconds + /** ADT to contain the result of an invocation to * [[java.util.concurrent.CompletionStage#handleAsync]] * @@ -179,14 +204,12 @@ object CompletableFutureTerminationTest { EmberServerBuilder .default[F] .withHttpApp( - Kleisli( - Function.const( - gotRequest.release *> - semaphore.permit.use(_ => F.pure(Response[F]())) - ) + Kleisli.liftF( + gotRequest.release *> + semaphore.permit.use(_ => F.pure(Response[F]())) ) ) - .withShutdownTimeout(1.second) + .withShutdownTimeout(serverTimeout) .withPort(port"0") .build @@ -219,11 +242,11 @@ object CompletableFutureTerminationTest { * in a [[java.util.concurrent.CompletableFuture]]. */ private def callServer[F[_]]( + client: HttpClient, server: Server )(implicit F: Sync[F]): F[CompletableFuture[HttpResponse[String]]] = for { jURI <- F.catchNonFatal(new URI(server.baseUri.renderString)) - client <- F.delay(HttpClient.newHttpClient) result <- F.delay( client.sendAsync(HttpRequest.newBuilder(jURI).build(), HttpResponse.BodyHandlers.ofString) ) diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala index d471ca44..4d4a731b 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/DeadlockWorkaround.scala @@ -27,13 +27,16 @@ import javax.net.ssl.SSLHandshakeException class DeadlockWorkaround extends CatsEffectSuite { test("fail to connect via TLSv1.3 on Java 11") { - if (Runtime.version().feature() > 11) IO.pure(true) - else - (JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) => - def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException] - testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *> - testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit)) - } + assume( + JdkVersion.tls13TriggersDeadlock, + "Test only applies to JDK 11, which has a deadlock issue that is triggered by using TLSv1.3" + ) + + (JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) => + def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException] + testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *> + testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit)) + } } } diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala index 77ef29b7..a6189c1e 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkHttpClientSpec.scala @@ -42,6 +42,11 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") { } test("timeout request") { + assume( + JdkVersion.supportsCancellation, + "This test checks cancellation behavior, which was only introduced in JDK 16." + ) + val address = server().addresses.head val path = GetRoutes.DelayedPath // 1s delay before response val uri = Uri.fromString(s"http://$address$path").toOption.get @@ -54,4 +59,23 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") { } } } + + test("uncancelable request") { + assume( + !JdkVersion.supportsCancellation, + "This test checks behavior when cancellation is not available, which is pre JDK 16." + ) + + val address = server().addresses.head + val path = GetRoutes.DelayedPath // 1s delay before response + val uri = Uri.fromString(s"http://$address$path").toOption.get + val req = Request[IO](uri = uri) + val res = client().expect[String](req) + res.as(false).timeoutTo(100.millis, IO.pure(true)).timed.flatMap { case (duration, result) => + IO { + assert(clue(duration) >= 1.second) + assert(result) + } + } + } } diff --git a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala index d6604b7f..4b62c7f4 100644 --- a/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala +++ b/core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala @@ -203,6 +203,11 @@ class JdkWSClientSpec extends CatsEffectSuite { } test("connect timeout") { + assume( + JdkVersion.supportsCancellation, + "This test checks cancellation behavior, which was only introduced in JDK 16." + ) + webSocket() .connectHighLevel(WSRequest(echoServerUri() / "delayed")) .use_ @@ -217,6 +222,26 @@ class JdkWSClientSpec extends CatsEffectSuite { } } + test("uncancelable connect") { + assume( + !JdkVersion.supportsCancellation, + "This test checks behavior when cancellation is not available, which is pre JDK 16." + ) + + webSocket() + .connectHighLevel(WSRequest(echoServerUri() / "delayed")) + .use_ + .as(false) + .timeoutTo(100.millis, IO.pure(true)) + .timed + .flatMap { case (duration, result) => + IO { + assert(clue(duration) >= 1.second) + assert(result) + } + } + } + def httpToWsUri(uri: Uri): Uri = uri.copy(scheme = scheme"ws".some) }