Skip to content

Commit 981a6e5

Browse files
authored
Also call the listener backend when a request is cancelled (#2679)
Closes #2669
1 parent 7eb2e7e commit 981a6e5

File tree

3 files changed

+104
-1
lines changed

3 files changed

+104
-1
lines changed

core/src/main/scala/sttp/client4/listener/ListenerBackend.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import sttp.capabilities.Effect
66
import sttp.client4.wrappers.DelegateBackend
77
import sttp.shared.Identity
88
import java.util.concurrent.atomic.AtomicBoolean
9+
import scala.util.control.NoStackTrace
910

1011
/** A backend wrapper which notifies the given [[RequestListener]] when a request starts and completes. */
1112
abstract class ListenerBackend[F[_], P, L](
@@ -15,20 +16,39 @@ abstract class ListenerBackend[F[_], P, L](
1516
override def send[T](request: GenericRequest[T, P with Effect[F]]): F[Response[T]] =
1617
listener.before(request).flatMap { case tag =>
1718
val onBodyReceivedCalled = new AtomicBoolean
19+
// #2669. It would be best to either:
20+
// * have a dedicated .handleCancelled method on MonadError, implemented for monads which support it
21+
// * and/or, have a dedicated callback on the RequestListener, which is called when the request is cancelled
22+
// But we cannot modify either of the interfaces (due to bincompat), so we resort to using an atomic flag
23+
val responseHandledOrExceptionCalled = new AtomicBoolean
1824
val requestToSend = request.onBodyReceived { meta =>
1925
onBodyReceivedCalled.set(true)
2026
listener.responseBodyReceived(request, meta, tag)
2127
}
2228
monad
2329
.handleError(delegate.send(requestToSend)) { case e: Exception =>
30+
responseHandledOrExceptionCalled.set(true)
2431
monad.flatMap {
2532
ResponseException.find(e) match {
2633
case Some(re) => listener.responseHandled(requestToSend, re.response, tag, Some(re))
2734
case None => listener.exception(requestToSend, tag, e, onBodyReceivedCalled.get())
2835
}
2936
} { _ => monad.error(e) }
3037
}
31-
.flatMap(response => listener.responseHandled(requestToSend, response, tag, None).map(_ => response))
38+
.flatMap { response =>
39+
responseHandledOrExceptionCalled.set(true)
40+
listener.responseHandled(requestToSend, response, tag, None).map(_ => response)
41+
}
42+
.ensure {
43+
if (!responseHandledOrExceptionCalled.get()) {
44+
listener.exception(
45+
requestToSend,
46+
tag,
47+
new InterruptedException with NoStackTrace,
48+
onBodyReceivedCalled.get()
49+
)
50+
} else monad.unit(())
51+
}
3252
}
3353
}
3454

core/src/main/scala/sttp/client4/listener/RequestListener.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ trait RequestListener[F[_], L] {
4949
*
5050
* The [[responseBodyReceived]] might have been called before this method, but will not be called after.
5151
*
52+
* If handling of the request is cancelled, and the effect system used doesn't rely on exceptions for cancellation,
53+
* this method will still be called, with an [[InterruptedException]] exception.
54+
*
5255
* @param responseBodyReceivedCalled
5356
* Indicates if [[responseBodyReceivedCalled]] has been called before this method.
5457
*/
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package sttp.client4.impl.cats
2+
3+
import cats.effect.IO
4+
import cats.effect.kernel.Ref
5+
import cats.effect.std.Semaphore
6+
import cats.effect.unsafe.implicits.global
7+
import org.scalatest.flatspec.AnyFlatSpec
8+
import org.scalatest.matchers.should.Matchers
9+
import sttp.client4._
10+
import sttp.client4.httpclient.cats.HttpClientCatsBackend
11+
import sttp.client4.listener.ListenerBackend
12+
import sttp.client4.listener.RequestListener
13+
import sttp.model.ResponseMetadata
14+
15+
class CatsListenerBackendTest extends AnyFlatSpec with Matchers {
16+
// #2669
17+
it should "notify the listener when a request is interrupted" in {
18+
for {
19+
inResponse <- Semaphore[IO](0)
20+
trail <- Ref.of[IO, List[String]](Nil)
21+
baseBackend = HttpClientCatsBackend.stub[IO].whenAnyRequest.thenRespondF {
22+
inResponse.release *> IO.never
23+
}
24+
backend = ListenerBackend(baseBackend, spyingListener(trail))
25+
// send the request in the background ...
26+
sendingFiber <- basicRequest.get(uri"http://example.org").send(backend).void.start
27+
// wait until the request is being processed ...
28+
_ <- inResponse.acquire
29+
// the counter should be 1, as it should be incremented in the listener's "before"
30+
trailBefore <- trail.get
31+
_ = trailBefore shouldBe List("before")
32+
// cancel the request, interrupting the IO.never ...
33+
_ <- sendingFiber.cancel
34+
// and now the counter should be decreased again
35+
trailAfter <- trail.get
36+
_ = trailAfter shouldBe List("before", "exception")
37+
} yield ()
38+
}.unsafeRunSync()
39+
40+
it should "properly notify listener in case of normal completion" in {
41+
for {
42+
inResponse <- Semaphore[IO](0)
43+
trail <- Ref.of[IO, List[String]](Nil)
44+
baseBackend = HttpClientCatsBackend.stub[IO].whenAnyRequest.thenRespondOk()
45+
backend = ListenerBackend(baseBackend, spyingListener(trail))
46+
_ <- basicRequest.get(uri"http://example.org").send(backend)
47+
trailAfter <- trail.get
48+
_ = trailAfter shouldBe List("before", "response handled")
49+
} yield ()
50+
}.unsafeRunSync()
51+
52+
it should "properly notify listener in case of an exception" in {
53+
for {
54+
inResponse <- Semaphore[IO](0)
55+
trail <- Ref.of[IO, List[String]](Nil)
56+
baseBackend = HttpClientCatsBackend.stub[IO].whenAnyRequest.thenThrow(new RuntimeException("test exception"))
57+
backend = ListenerBackend(baseBackend, spyingListener(trail))
58+
_ <- basicRequest.get(uri"http://example.org").send(backend).handleErrorWith { case _ => IO.unit }
59+
trailAfter <- trail.get
60+
_ = trailAfter shouldBe List("before", "exception")
61+
} yield ()
62+
}.unsafeRunSync()
63+
64+
def spyingListener(trail: Ref[IO, List[String]]) = new RequestListener[IO, Unit] {
65+
override def before(request: GenericRequest[_, _]): IO[Unit] = trail.update(_ :+ "before")
66+
override def responseBodyReceived(request: GenericRequest[_, _], response: ResponseMetadata, tag: Unit): Unit = ()
67+
override def responseHandled(
68+
request: GenericRequest[_, _],
69+
response: ResponseMetadata,
70+
tag: Unit,
71+
exception: Option[ResponseException[_]]
72+
): IO[Unit] = trail.update(_ :+ "response handled")
73+
override def exception(
74+
request: GenericRequest[_, _],
75+
tag: Unit,
76+
exception: Throwable,
77+
responseBodyReceivedCalled: Boolean
78+
): IO[Unit] = trail.update(_ :+ s"exception")
79+
}
80+
}

0 commit comments

Comments
 (0)