Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@ object JdkHttpClient {
): Resource[F, Response[F]] =
Resource
.makeFull { (poll: Poll[F]) =>
(Deferred[F, Unit], poll(responseF)).tupled
val maybeCancelableResponseF =
if (JdkVersion.supportsCancellation) poll(responseF)
else responseF
(Deferred[F, Unit], maybeCancelableResponseF).tupled
} { case (subscription, response) =>
subscription.tryGet.flatMap {
case None =>
Expand Down Expand Up @@ -274,7 +277,7 @@ object JdkHttpClient {
F.delay {
val builder = HttpClient.newBuilder()
// workaround for https://github.com/http4s/http4s-jdk-http-client/issues/200
if (Runtime.version().feature() == 11) {
if (JdkVersion.tls13TriggersDeadlock) {
val params = javax.net.ssl.SSLContext.getDefault().getDefaultSSLParameters()
params.setProtocols(params.getProtocols().filter(_ != "TLSv1.3"))
val _ = builder.sslParameters(params)
Expand Down
24 changes: 24 additions & 0 deletions core/src/main/scala/org/http4s/jdkhttpclient/JdkVersion.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2019 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.jdkhttpclient

private object JdkVersion {
val supportsCancellation: Boolean = Runtime.version().feature() >= 16

// see https://github.com/http4s/http4s-jdk-http-client/issues/200
val tls13TriggersDeadlock: Boolean = Runtime.version().feature() == 11
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ object JdkWSClient {
handleReceive(error.asLeft); ()
}
}
webSocket <- poll(
F.fromCompletableFuture(
F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener))
)
webSocketF = F.fromCompletableFuture(
F.delay(wsBuilder.buildAsync(URI.create(req.uri.renderString), wsListener))
)
webSocket <-
if (JdkVersion.supportsCancellation) poll(webSocketF)
else webSocketF
sendSem <- Semaphore[F](1L)
} yield (webSocket, queue, closedDef, sendSem)
} { case (webSocket, queue, _, _) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit
import scala.concurrent.duration._

final class CompletableFutureTerminationTest extends CatsEffectSuite {

import CompletableFutureTerminationTest._

private val duration: FiniteDuration =
Expand Down Expand Up @@ -65,81 +66,105 @@ final class CompletableFutureTerminationTest extends CatsEffectSuite {
//
// See: https://docs.oracle.com/en/java/javase/14/docs/api/java.net.http/java/net/http/HttpResponse.BodySubscriber.html
test("Terminating an effect generated from a CompletableFuture") {
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
.flatMap { case (stallServer, observation, gotRequest) =>
// Acquire the `stallServer` semaphore so that the server will not
// return _any_ bytes until we release a permit.
stallServer.acquire *>
// Acquire the `gotRequest` semaphore. The server will release this
// once it gets our Request. We wait until this happens to start our
// timeout logic.
gotRequest.acquire *>
// Start a Http4s Server, it will be terminated at the conclusion of
// this test.
stallingServerR[IO](stallServer, gotRequest).use { (server: Server) =>
// Call the server, using the JDK client. We call directly with
// the JDK client because we need to have low level control over
// the result to observe whether or not the
// java.util.concurrent.CompletableFuture is still executing (and
// holding on to resources).
callServer[IO](server).flatMap((cf: CompletableFuture[HttpResponse[String]]) =>
// Attach a handler onto the result. This will populate our
// `observation` Deferred value when the CompletableFuture
// finishes for any reason.
//
// We start executing this in the background, so that we
// asynchronously populate our Observation.
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
// Wait until we are sure the Http4s Server has received the
// request.
gotRequest.acquire *>
// Lift the CompletableFuture to a IO value and attach a
// (short) timeout to the termination.
//
// Important! The IO result _must_ be terminated via the
// timeout _before any bytes_ have been received by the JDK
// HttpClient in order to validate resource safety. Once we
// start getting bytes back, the CompletableFuture _is
// complete_ and we are in a different context.
//
// Notice that we release stallServer _after_ the
// timeout. _This is the crux of this entire test_. Once
// we release `stallServer`, the Http4s Server will
// attempt to send back an Http Response to our JDK
// client. If the CompletableFuture and associated
// resources were properly cleaned up after the
// timeoutTo terminated the running effect, then the JDK
// client connection will either be closed, or the
// attempt to invoke `complete` on the
// `CompletableFuture` will fail, in both cases
// releasing any resources being held. If not, then it
// will still receive bytes, meaning there is a resource
// leak.
IO.fromCompletableFuture(IO(cf))
.void
.timeoutTo(duration, stallServer.release) *>
// After the timeout has triggered, wait for the observation to complete.
fiber.join *>
// Check our observation. Whether or not there is an exception
// is not actually relevant to the success case. What _is_
// important is that there is no result. If there is a result,
// then that means that _after_ `timeoutTo` released
// `stallServer` the CompletableFuture for the Http response
// body still processed data, which indicates a resource leak.
observation.get.flatMap {
case Observation(None, _) => IO.pure(true)
case otherwise =>
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
assume(
JdkVersion.supportsCancellation,
"This test checks cancellation behavior, which was only introduced in JDK 16."
)

JdkHttpClient.defaultHttpClientResource[IO].use { client =>
(Semaphore[IO](1L), Deferred[IO, Observation[HttpResponse[String]]], Semaphore[IO](1L)).tupled
.flatMap { case (stallServer, observation, gotRequest) =>
// Acquire the `stallServer` semaphore so that the server will not
// return _any_ bytes until we release a permit.
stallServer.acquire *>
// Acquire the `gotRequest` semaphore. The server will release this
// once it gets our Request. We wait until this happens to start our
// timeout logic.
gotRequest.acquire *>
// Start a Http4s Server, it will be terminated at the conclusion of
// this test.
Async[IO].bracket(stallingServerR[IO](stallServer, gotRequest).allocated) {
case (server: Server, _) =>
// Call the server, using the JDK client. We call directly with
// the JDK client because we need to have low level control over
// the result to observe whether or not the
// java.util.concurrent.CompletableFuture is still executing (and
// holding on to resources).
callServer[IO](client, server).flatMap(
(cf: CompletableFuture[HttpResponse[String]]) =>
// Attach a handler onto the result. This will populate our
// `observation` Deferred value when the CompletableFuture
// finishes for any reason.
//
// We start executing this in the background, so that we
// asynchronously populate our Observation.
observeCompletableFuture(observation, cf).start.flatMap(fiber =>
// Wait until we are sure the Http4s Server has received the
// request.
gotRequest.acquire *>
// Lift the CompletableFuture to a IO value and attach a
// (short) timeout to the termination.
//
// Important! The IO result _must_ be terminated via the
// timeout _before any bytes_ have been received by the JDK
// HttpClient in order to validate resource safety. Once we
// start getting bytes back, the CompletableFuture _is
// complete_ and we are in a different context.
//
// Notice that we release stallServer _after_ the
// timeout. _This is the crux of this entire test_. Once
// we release `stallServer`, the Http4s Server will
// attempt to send back an Http Response to our JDK
// client. If the CompletableFuture and associated
// resources were properly cleaned up after the
// timeoutTo terminated the running effect, then the JDK
// client connection will either be closed, or the
// attempt to invoke `complete` on the
// `CompletableFuture` will fail, in both cases
// releasing any resources being held. If not, then it
// will still receive bytes, meaning there is a resource
// leak.
IO.fromCompletableFuture(IO(cf))
.void
.timeoutTo(duration, stallServer.release) *>
// After the timeout has triggered, wait for the observation to complete.
fiber.join *>
// Check our observation. Whether or not there is an exception
// is not actually relevant to the success case. What _is_
// important is that there is no result. If there is a result,
// then that means that _after_ `timeoutTo` released
// `stallServer` the CompletableFuture for the Http response
// body still processed data, which indicates a resource leak.
observation.get.flatMap {
case Observation(None, _) => IO.pure(true)
case otherwise =>
IO.raiseError(new AssertionError(s"Expected no result, got $otherwise"))
}
)
)
} { case (_, release) =>
release.timed
.flatMap { case (duration, _) =>
IO {
assert(
clue(duration) < serverTimeout,
"Finalization didn't complete until server shutdown timeout was reached, a connection is likely leaked by the client"
)
}
)
)
}
}
}

}
}
}
}
}

object CompletableFutureTerminationTest {

val a: Boolean = () < 5.seconds

private val serverTimeout = 5.seconds

/** ADT to contain the result of an invocation to
* [[java.util.concurrent.CompletionStage#handleAsync]]
*
Expand Down Expand Up @@ -179,14 +204,12 @@ object CompletableFutureTerminationTest {
EmberServerBuilder
.default[F]
.withHttpApp(
Kleisli(
Function.const(
gotRequest.release *>
semaphore.permit.use(_ => F.pure(Response[F]()))
)
Kleisli.liftF(
gotRequest.release *>
semaphore.permit.use(_ => F.pure(Response[F]()))
)
)
.withShutdownTimeout(1.second)
.withShutdownTimeout(serverTimeout)
.withPort(port"0")
.build

Expand Down Expand Up @@ -219,11 +242,11 @@ object CompletableFutureTerminationTest {
* in a [[java.util.concurrent.CompletableFuture]].
*/
private def callServer[F[_]](
client: HttpClient,
server: Server
)(implicit F: Sync[F]): F[CompletableFuture[HttpResponse[String]]] =
for {
jURI <- F.catchNonFatal(new URI(server.baseUri.renderString))
client <- F.delay(HttpClient.newHttpClient)
result <- F.delay(
client.sendAsync(HttpRequest.newBuilder(jURI).build(), HttpResponse.BodyHandlers.ofString)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ import javax.net.ssl.SSLHandshakeException
class DeadlockWorkaround extends CatsEffectSuite {

test("fail to connect via TLSv1.3 on Java 11") {
if (Runtime.version().feature() > 11) IO.pure(true)
else
(JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) =>
def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException]
testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *>
testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit))
}
assume(
JdkVersion.tls13TriggersDeadlock,
"Test only applies to JDK 11, which has a deadlock issue that is triggered by using TLSv1.3"
)

(JdkHttpClient.simple[IO], JdkWSClient.simple[IO]).tupled.use { case (http, ws) =>
def testSSLFailure(r: IO[Unit]) = r.intercept[SSLHandshakeException]
testSSLFailure(http.expect[Unit](uri"https://tls13.1d.pw")) *>
testSSLFailure(ws.connectHighLevel(WSRequest(uri"wss://tls13.1d.pw")).use(_ => IO.unit))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") {
}

test("timeout request") {
assume(
JdkVersion.supportsCancellation,
"This test checks cancellation behavior, which was only introduced in JDK 16."
)

val address = server().addresses.head
val path = GetRoutes.DelayedPath // 1s delay before response
val uri = Uri.fromString(s"http://$address$path").toOption.get
Expand All @@ -54,4 +59,23 @@ class JdkHttpClientSpec extends ClientRouteTestBattery("JdkHttpClient") {
}
}
}

test("uncancelable request") {
assume(
!JdkVersion.supportsCancellation,
"This test checks behavior when cancellation is not available, which is pre JDK 16."
)

val address = server().addresses.head
val path = GetRoutes.DelayedPath // 1s delay before response
val uri = Uri.fromString(s"http://$address$path").toOption.get
val req = Request[IO](uri = uri)
val res = client().expect[String](req)
res.as(false).timeoutTo(100.millis, IO.pure(true)).timed.flatMap { case (duration, result) =>
IO {
assert(clue(duration) >= 1.second)
assert(result)
}
}
}
}
25 changes: 25 additions & 0 deletions core/src/test/scala/org/http4s/jdkhttpclient/JdkWSClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ class JdkWSClientSpec extends CatsEffectSuite {
}

test("connect timeout") {
assume(
JdkVersion.supportsCancellation,
"This test checks cancellation behavior, which was only introduced in JDK 16."
)

webSocket()
.connectHighLevel(WSRequest(echoServerUri() / "delayed"))
.use_
Expand All @@ -217,6 +222,26 @@ class JdkWSClientSpec extends CatsEffectSuite {
}
}

test("uncancelable connect") {
assume(
!JdkVersion.supportsCancellation,
"This test checks behavior when cancellation is not available, which is pre JDK 16."
)

webSocket()
.connectHighLevel(WSRequest(echoServerUri() / "delayed"))
.use_
.as(false)
.timeoutTo(100.millis, IO.pure(true))
.timed
.flatMap { case (duration, result) =>
IO {
assert(clue(duration) >= 1.second)
assert(result)
}
}
}

def httpToWsUri(uri: Uri): Uri = uri.copy(scheme = scheme"ws".some)

}
Loading