11package com .dwolla .util .async
22
3+ import cats .*
34import cats .effect .*
45import cats .effect .std .*
6+ import cats .effect .testkit .TestControl
57import cats .syntax .all .*
8+ import cats .effect .syntax .all .*
69import com .dwolla .util .async .twitter .{CancelledViaCatsEffect , liftFuture }
710import com .twitter .util .{Duration as _ , * }
8- import munit .{CatsEffectSuite , ScalaCheckEffectSuite }
9- import org .scalacheck .{ Prop , Test }
11+ import munit .{AnyFixture , CatsEffectSuite , ScalaCheckEffectSuite }
12+ import org .scalacheck .Arbitrary . arbitrary
1013import org .scalacheck .effect .PropF
14+ import org .scalacheck .{Arbitrary , Gen , Prop , Test }
1115
1216import java .util .concurrent .CancellationException
1317import scala .concurrent .duration .*
18+ import scala .util .control .NoStackTrace
1419
1520class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite {
1621 override def munitIOTimeout : Duration = 1 .minute
@@ -43,39 +48,46 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
4348 }
4449 }
4550
46- test(" a running Twitter Future lifted into IO can be completed or cancelled" ) {
47- PropF .forAllF { (i : Option [Int ]) =>
48- (Supervisor [IO ](await = true ), Dispatcher .parallel[IO ](await = true ))
49- .tupled
50- .use { case (supervisor, dispatcher) =>
51- for {
52- capturedInterruptionThrowable <- Deferred [IO , Throwable ]
53- twitterPromise <- IO (new Promise [Option [Int ]]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
54- startedLatch <- CountDownLatch [IO ](1 )
55- promiseFiber <- IO .uncancelable { poll => // we want only the Future to be cancellable
56- supervisor.supervise(poll(liftFuture[IO ](startedLatch.release.as(twitterPromise))))
57- }
58- _ <- startedLatch.await
59- _ <- completeOrCancel(i, twitterPromise, promiseFiber)
60- cancelledRef <- Ref [IO ].of(false )
61- outcome <- promiseFiber.joinWith(cancelledRef.set(true ).as(None ))
62- wasCancelled <- cancelledRef.get
63-
64- expectCancellation = i.isEmpty
65- _ <- interceptMessageIO[CancellationException ](" Cancelled via cats-effect" ) {
66- capturedInterruptionThrowable
67- .get
68- .timeout(10 .millis)
69- .map(_.asLeft)
70- .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message
71- }
72- .whenA(expectCancellation)
73- } yield {
74- assertEquals(outcome, i)
75- assertEquals(wasCancelled, i.as(false ).getOrElse(true ))
76- assertEquals(Option (CancelledViaCatsEffect ).filter(_ => expectCancellation), twitterPromise.isInterrupted)
51+ private val supervisorAndDispatcher = ResourceTestLocalFixture (" supervisorAndDispatcher" ,
52+ Supervisor [IO ](await = true ).product(Dispatcher .sequential[IO ](await = true ))
53+ )
54+
55+ override def munitFixtures : Seq [AnyFixture [? ]] = super .munitFixtures ++ Seq (supervisorAndDispatcher)
56+
57+ test(" a running Twitter Future lifted into IO can be completed (as success or failure) or cancelled" ) {
58+ PropF .forAllF { (i : Outcome [IO , Throwable , Int ]) =>
59+ val (supervisor, dispatcher) = supervisorAndDispatcher()
60+
61+ TestControl .executeEmbed {
62+ for {
63+ expectedResult <- i.embed(CancelledViaCatsEffect .raiseError[IO , Int ]).attempt
64+ capturedInterruptionThrowable <- Deferred [IO , Throwable ]
65+ twitterPromise <- IO (new Promise [Int ]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
66+ startedLatch <- CountDownLatch [IO ](1 )
67+ promiseFiber <- IO .uncancelable { poll => // we want only the Future to be cancellable
68+ supervisor.supervise(poll(liftFuture[IO ](startedLatch.release.as(twitterPromise))))
69+ }
70+ _ <- startedLatch.await
71+
72+ (outcome, _) <- promiseFiber.join.both(completeOrCancel(i, twitterPromise, promiseFiber))
73+
74+ outcomeEmittedValue <- outcome.embed(CancelledViaCatsEffect .raiseError[IO , Int ]).attempt
75+
76+ expectCancellation = i.isCanceled
77+ _ <- interceptMessageIO[CancellationException ](" Cancelled via cats-effect" ) {
78+ capturedInterruptionThrowable
79+ .get
80+ .timeout(10 .millis)
81+ .map(_.asLeft)
82+ .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message
7783 }
84+ .whenA(expectCancellation)
85+ } yield {
86+ assertEquals(outcomeEmittedValue, expectedResult)
87+ assertEquals(outcome.isCanceled, i.isCanceled)
88+ assertEquals(Option (CancelledViaCatsEffect ).filter(_ => expectCancellation), twitterPromise.isInterrupted)
7889 }
90+ }
7991 }
8092 }
8193
@@ -90,6 +102,14 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
90102 }
91103 }
92104
105+ private def genOutcome [F [_] : Applicative , A : Arbitrary ]: Gen [Outcome [F , Throwable , A ]] =
106+ Gen .oneOf(
107+ arbitrary[A ].map(_.pure[F ]).map(Outcome .succeeded[F , Throwable , A ]),
108+ Gen .const(new RuntimeException (" arbitrary exception" ) with NoStackTrace ).map(Outcome .errored[F , Throwable , A ]),
109+ Gen .const(Outcome .canceled[F , Throwable , A ]),
110+ )
111+ private implicit def arbOutcome [F [_] : Applicative , A : Arbitrary ]: Arbitrary [Outcome [F , Throwable , A ]] = Arbitrary (genOutcome)
112+
93113 private def captureThrowableOnInterruption [F [_] : Sync , A ](dispatcher : Dispatcher [F ],
94114 capture : Deferred [F , Throwable ])
95115 (p : Promise [A ]): F [Unit ] =
@@ -99,14 +119,23 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
99119 }
100120 }
101121
102- private def completeOrCancel [F [_] : Sync , A ](maybeA : Option [ A ],
103- promise : Promise [Option [ A ] ],
104- fiber : Fiber [F , Throwable , Option [ A ] ]): F [Unit ] =
122+ private def completeOrCancel [F [_] : Async , A ](maybeA : Outcome [ F , Throwable , A ],
123+ promise : Promise [A ],
124+ fiber : Fiber [F , Throwable , A ]): F [Unit ] =
105125 maybeA match {
106- case Some (a) => Sync [F ].delay {
107- promise.setValue(a.some)
108- }
109- case None =>
126+ case Outcome .Succeeded (fa) =>
127+ fa.flatMap(a => Sync [F ].delay(promise.setValue(a))).void
128+
129+ case Outcome .Errored (ex) =>
130+ // If the fiber is in the background (i.e. not joined) when it completes with an exception,
131+ // the IO runtime will print its stacktrace to stderr. We always plan to `join` the fiber
132+ // our tests are complete, so this feels like a false error report.
133+
134+ // To work around the issue, we delay the completion of the promise to make sure the fiber
135+ // is joined before the promise is completed with the exception.
136+ Sync [F ].delay(promise.setException(ex)).delayBy(10 .millis)
137+
138+ case Outcome .Canceled () =>
110139 fiber.cancel
111140 }
112141
0 commit comments