Skip to content

Commit e991b56

Browse files
authored
Merge pull request #1303 from UdashFramework/close-stale-jetty-connections-on-monix-timeout
close jetty connections on monix timeout (or other cancellation)
2 parents b8d59a4 + c149256 commit e991b56

File tree

6 files changed

+171
-97
lines changed

6 files changed

+171
-97
lines changed

rest/.jvm/src/test/scala/io/udash/rest/ServletBasedRestApiTest.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,11 @@ package rest
33

44
import org.eclipse.jetty.ee8.servlet.{ServletContextHandler, ServletHolder}
55
import org.eclipse.jetty.server.Server
6-
import org.eclipse.jetty.ee8.servlet.{ServletHandler, ServletHolder}
76

8-
import scala.concurrent.duration._
7+
import scala.concurrent.duration.*
98

109
abstract class ServletBasedRestApiTest extends RestApiTest with UsesHttpServer {
11-
override implicit def patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
10+
override implicit val patienceConfig: PatienceConfig = PatienceConfig(10.seconds)
1211

1312
def maxPayloadSize: Int = 1024 * 1024
1413
def serverTimeout: FiniteDuration = 10.seconds

rest/.jvm/src/test/scala/io/udash/rest/SttpRestCallTest.scala

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,25 @@ package rest
33

44
import io.udash.rest.raw.HttpErrorException
55
import io.udash.rest.raw.RawRest.HandleRequest
6-
import sttp.client3.SttpBackend
6+
import sttp.client3.{HttpClientFutureBackend, SttpBackend}
77

8-
import scala.concurrent.duration._
8+
import java.net.http.HttpClient
9+
import java.time.Duration as JDuration
10+
import scala.concurrent.duration.*
911
import scala.concurrent.{Await, Future}
1012

1113
trait SttpClientRestTest extends ServletBasedRestApiTest {
12-
implicit val backend: SttpBackend[Future, Any] = SttpRestClient.defaultBackend()
14+
/**
15+
* Similar to the defaultHttpClient, but with a connection timeout
16+
* significantly exceeding the value of the CallTimeout
17+
*/
18+
implicit val backend: SttpBackend[Future, Any] = HttpClientFutureBackend.usingClient(
19+
HttpClient
20+
.newBuilder()
21+
.connectTimeout(JDuration.ofMillis(IdleTimout.toMillis))
22+
.followRedirects(HttpClient.Redirect.NEVER)
23+
.build()
24+
)
1325

1426
def clientHandle: HandleRequest =
1527
SttpRestClient.asHandleRequest[Future](s"$baseUrl/api")
@@ -21,22 +33,27 @@ trait SttpClientRestTest extends ServletBasedRestApiTest {
2133
}
2234

2335
class SttpRestCallTest extends SttpClientRestTest with RestApiTestScenarios {
24-
test("too large binary request") {
25-
val future = proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
26-
val exception = future.failed.futureValue
27-
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
36+
"too large binary request" in {
37+
proxy.binaryEcho(Array.fill[Byte](maxPayloadSize + 1)(5))
38+
.failed
39+
.map { exception =>
40+
assert(exception == HttpErrorException.plain(413, "Payload is larger than maximum 1048576 bytes (1048577)"))
41+
}
2842
}
2943
}
3044

3145
class ServletTimeoutTest extends SttpClientRestTest {
3246
override def serverTimeout: FiniteDuration = 500.millis
3347

34-
test("rest method timeout") {
35-
val exception = proxy.neverGet.failed.futureValue
36-
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
48+
"rest method timeout" in {
49+
proxy.neverGet
50+
.failed
51+
.map { exception =>
52+
assert(exception == HttpErrorException.plain(500, "server operation timed out after 500 milliseconds"))
53+
}
3754
}
3855

39-
test("subsequent requests with timeout") {
56+
"subsequent requests with timeout" in {
4057
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
4158
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
4259
assertThrows[HttpErrorException](Await.result(proxy.wait(600), Duration.Inf))
Lines changed: 54 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package io.udash
22
package rest.jetty
33

4-
import com.avsystem.commons._
4+
import com.avsystem.commons.*
55
import com.avsystem.commons.annotation.explicitGenerics
6-
import io.udash.rest.raw._
6+
import io.udash.rest.raw.*
77
import io.udash.utils.URLEncoder
88
import monix.eval.Task
9-
import org.eclipse.jetty.client.{BufferingResponseListener, BytesRequestContent, HttpClient, Result, StringRequestContent}
9+
import monix.execution.Callback
10+
import org.eclipse.jetty.client.*
1011
import org.eclipse.jetty.http.{HttpCookie, HttpHeader, MimeTypes}
1112

1213
import java.nio.charset.Charset
13-
import scala.concurrent.duration._
14-
import scala.util.{Failure, Success}
14+
import scala.concurrent.CancellationException
15+
import scala.concurrent.duration.*
1516

1617
object JettyRestClient {
1718
final val DefaultMaxResponseLength = 2 * 1024 * 1024
@@ -31,55 +32,57 @@ object JettyRestClient {
3132
maxResponseLength: Int = DefaultMaxResponseLength,
3233
timeout: Duration = DefaultTimeout
3334
): RawRest.HandleRequest =
34-
request => Task.async { callback =>
35-
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
36-
val httpReq = client.newRequest(baseUrl).method(request.method.name)
35+
request => Task(client.newRequest(baseUrl).method(request.method.name)).flatMap { httpReq =>
36+
Task.async { (callback: Callback[Throwable, RestResponse]) =>
37+
val path = baseUrl + PlainValue.encodePath(request.parameters.path)
3738

38-
httpReq.path(path)
39-
request.parameters.query.entries.foreach {
40-
case (name, PlainValue(value)) => httpReq.param(name, value)
41-
}
42-
request.parameters.headers.entries.foreach {
43-
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
44-
}
45-
request.parameters.cookies.entries.foreach {
46-
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
47-
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
48-
}
49-
50-
request.body match {
51-
case HttpBody.Empty =>
52-
case tb: HttpBody.Textual =>
53-
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
54-
case bb: HttpBody.Binary =>
55-
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
56-
}
39+
httpReq.path(path)
40+
request.parameters.query.entries.foreach {
41+
case (name, PlainValue(value)) => httpReq.param(name, value)
42+
}
43+
request.parameters.headers.entries.foreach {
44+
case (name, PlainValue(value)) => httpReq.headers(headers => headers.add(name, value))
45+
}
46+
request.parameters.cookies.entries.foreach {
47+
case (name, PlainValue(value)) => httpReq.cookie(HttpCookie.build(
48+
URLEncoder.encode(name, spaceAsPlus = true), URLEncoder.encode(value, spaceAsPlus = true)).build())
49+
}
5750

58-
timeout match {
59-
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
60-
case _ =>
61-
}
51+
request.body match {
52+
case HttpBody.Empty =>
53+
case tb: HttpBody.Textual =>
54+
httpReq.body(new StringRequestContent(tb.contentType, tb.content, Charset.forName(tb.charset)))
55+
case bb: HttpBody.Binary =>
56+
httpReq.body(new BytesRequestContent(bb.contentType, bb.bytes))
57+
}
6258

63-
httpReq.send(new BufferingResponseListener(maxResponseLength) {
64-
override def onComplete(result: Result): Unit =
65-
if (result.isSucceeded) {
66-
val httpResp = result.getResponse
67-
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
68-
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
69-
val body = (contentTypeOpt, charsetOpt) match {
70-
case (Opt(contentType), Opt(charset)) =>
71-
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
72-
case (Opt(contentType), Opt.Empty) =>
73-
HttpBody.binary(getContent, contentType)
74-
case _ =>
75-
HttpBody.Empty
76-
}
77-
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
78-
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
79-
callback(Success(response))
80-
} else {
81-
callback(Failure(result.getFailure))
59+
timeout match {
60+
case fd: FiniteDuration => httpReq.timeout(fd.length, fd.unit)
61+
case _ =>
8262
}
83-
})
63+
64+
httpReq.send(new BufferingResponseListener(maxResponseLength) {
65+
override def onComplete(result: Result): Unit =
66+
if (result.isSucceeded) {
67+
val httpResp = result.getResponse
68+
val contentTypeOpt = httpResp.getHeaders.get(HttpHeader.CONTENT_TYPE).opt
69+
val charsetOpt = contentTypeOpt.map(MimeTypes.getCharsetFromContentType)
70+
val body = (contentTypeOpt, charsetOpt) match {
71+
case (Opt(contentType), Opt(charset)) =>
72+
HttpBody.textual(getContentAsString, MimeTypes.getContentTypeWithoutCharset(contentType), charset)
73+
case (Opt(contentType), Opt.Empty) =>
74+
HttpBody.binary(getContent, contentType)
75+
case _ =>
76+
HttpBody.Empty
77+
}
78+
val headers = httpResp.getHeaders.asScala.iterator.map(h => (h.getName, PlainValue(h.getValue))).toList
79+
val response = RestResponse(httpResp.getStatus, IMapping(headers), body)
80+
callback(Success(response))
81+
} else {
82+
callback(Failure(result.getFailure))
83+
}
84+
})
85+
}
86+
.doOnCancel(Task(httpReq.abort(new CancellationException("Request cancelled"))))
8487
}
8588
}

rest/jetty/src/test/scala/io/udash/rest/jetty/JettyRestCallTest.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,14 @@ import io.udash.rest.{RestApiTestScenarios, ServletBasedRestApiTest}
66
import org.eclipse.jetty.client.HttpClient
77

88
final class JettyRestCallTest extends ServletBasedRestApiTest with RestApiTestScenarios {
9-
val client: HttpClient = new HttpClient
9+
/**
10+
* Similar to the default HttpClient, but with a connection timeout
11+
* significantly exceeding the value of the CallTimeout
12+
*/
13+
val client: HttpClient = new HttpClient() {
14+
setMaxConnectionsPerDestination(MaxConnections)
15+
setIdleTimeout(IdleTimout.toMillis)
16+
}
1017

1118
def clientHandle: HandleRequest =
1219
JettyRestClient.asHandleRequest(client, s"$baseUrl/api", maxPayloadSize)

rest/src/test/scala/io/udash/rest/RestApiTest.scala

Lines changed: 62 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,47 @@
11
package io.udash
22
package rest
33

4-
import com.avsystem.commons._
4+
import cats.implicits.catsSyntaxTuple2Semigroupal
5+
import com.avsystem.commons.*
6+
import com.avsystem.commons.misc.ScalaDurationExtensions.durationIntOps
57
import io.udash.rest.raw.RawRest
68
import io.udash.rest.raw.RawRest.HandleRequest
9+
import io.udash.testing.AsyncUdashSharedTest
10+
import monix.eval.Task
711
import monix.execution.Scheduler
812
import org.scalactic.source.Position
9-
import org.scalatest.concurrent.ScalaFutures
10-
import org.scalatest.funsuite.AnyFunSuite
13+
import org.scalatest.time.{Millis, Seconds, Span}
14+
import org.scalatest.{Assertion, BeforeAndAfterEach}
1115

12-
abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
16+
import scala.concurrent.duration.FiniteDuration
17+
18+
abstract class RestApiTest extends AsyncUdashSharedTest with BeforeAndAfterEach {
1319
implicit def scheduler: Scheduler = Scheduler.global
1420

21+
protected final val MaxConnections: Int = 1 // to timeout quickly
22+
protected final val Connections: Int = 10 // > MaxConnections
23+
protected final val CallTimeout: FiniteDuration = 300.millis // << idle timeout
24+
protected final val IdleTimout: FiniteDuration = CallTimeout * 100
25+
26+
protected val impl: RestTestApi.Impl = new RestTestApi.Impl
27+
28+
override protected def beforeEach(): Unit = {
29+
super.beforeEach()
30+
impl.resetCounter()
31+
}
32+
1533
final val serverHandle: RawRest.HandleRequest =
16-
RawRest.asHandleRequest[RestTestApi](RestTestApi.Impl)
34+
RawRest.asHandleRequest[RestTestApi](impl)
1735

1836
def clientHandle: RawRest.HandleRequest
1937

2038
lazy val proxy: RestTestApi =
2139
RawRest.fromHandleRequest[RestTestApi](clientHandle)
2240

23-
def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Unit =
24-
assert(
25-
call(proxy).wrapToTry.futureValue.map(mkDeep) ==
26-
call(RestTestApi.Impl).catchFailures.wrapToTry.futureValue.map(mkDeep)
27-
)
41+
def testCall[T](call: RestTestApi => Future[T])(implicit pos: Position): Future[Assertion] =
42+
(call(proxy).wrapToTry, call(impl).catchFailures.wrapToTry).mapN { (proxyResult, implResult) =>
43+
assert(proxyResult.map(mkDeep) == implResult.map(mkDeep))
44+
}
2845

2946
def mkDeep(value: Any): Any = value match {
3047
case arr: Array[_] => IArraySeq.empty[AnyRef] ++ arr.iterator.map(mkDeep)
@@ -33,62 +50,83 @@ abstract class RestApiTest extends AnyFunSuite with ScalaFutures {
3350
}
3451

3552
trait RestApiTestScenarios extends RestApiTest {
36-
test("trivial GET") {
53+
override implicit val patienceConfig: PatienceConfig = PatienceConfig(scaled(Span(10, Seconds)), scaled(Span(50, Millis)))
54+
55+
"trivial GET" in {
3756
testCall(_.trivialGet)
3857
}
3958

40-
test("failing GET") {
59+
"failing GET" in {
4160
testCall(_.failingGet)
4261
}
4362

44-
test("JSON failing GET") {
63+
"JSON failing GET" in {
4564
testCall(_.jsonFailingGet)
4665
}
4766

48-
test("more failing GET") {
67+
"more failing GET" in {
4968
testCall(_.moreFailingGet)
5069
}
5170

52-
test("complex GET") {
71+
"complex GET" in {
5372
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt(3), 4, "ó /&f"))
5473
testCall(_.complexGet(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", Opt.Empty, 3, "ó /&f"))
5574
}
5675

57-
test("multi-param body POST") {
76+
"multi-param body POST" in {
5877
testCall(_.multiParamPost(0, "a/ +&", 1, "b/ +&", 2, "ć/ +&", 3, "l\"l"))
5978
}
6079

61-
test("single body PUT") {
80+
"single body PUT" in {
6281
testCall(_.singleBodyPut(RestEntity(RestEntityId("id"), "señor")))
6382
}
6483

65-
test("form POST") {
84+
"form POST" in {
6685
testCall(_.formPost("ó", "ą=ę", 42))
6786
}
6887

69-
test("prefixed GET") {
88+
"prefixed GET" in {
7089
testCall(_.prefix("p0", "h0", "q0").subget(0, 1, 2))
7190
}
7291

73-
test("transparent prefix GET") {
92+
"transparent prefix GET" in {
7493
testCall(_.transparentPrefix.subget(0, 1, 2))
7594
}
7695

77-
test("custom response with headers") {
96+
"custom response with headers" in {
7897
testCall(_.customResponse("walue"))
7998
}
8099

81-
test("binary request and response") {
100+
"binary request and response" in {
82101
testCall(_.binaryEcho(Array.fill[Byte](5)(5)))
83102
}
84103

85-
test("large binary request and response") {
104+
"large binary request and response" in {
86105
testCall(_.binaryEcho(Array.fill[Byte](1024 * 1024)(5)))
87106
}
88107

89-
test("body using third party type") {
108+
"body using third party type" in {
90109
testCall(_.thirdPartyBody(HasThirdParty(ThirdParty(5))))
91110
}
111+
112+
"close connection on monix task timeout" in {
113+
Task
114+
.traverse(List.range(0, Connections))(_ => Task.deferFuture(proxy.neverGet).timeout(CallTimeout).failed)
115+
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
116+
.runToFuture
117+
}
118+
119+
"close connection on monix task cancellation" in {
120+
Task
121+
.traverse(List.range(0, Connections)) { i =>
122+
val cancelable = Task.deferFuture(proxy.neverGet).runAsync(_ => ())
123+
Task.sleep(100.millis)
124+
.restartUntil(_ => impl.counterValue() >= i)
125+
.map(_ => cancelable.cancel())
126+
}
127+
.map(_ => assertResult(expected = Connections)(actual = impl.counterValue())) // neverGet should be called Connections times
128+
.runToFuture
129+
}
92130
}
93131

94132
class DirectRestApiTest extends RestApiTestScenarios {

0 commit comments

Comments
 (0)