11package com .dwolla .util .async
22
3+ import cats .*
34import cats .effect .*
45import cats .effect .std .*
56import cats .syntax .all .*
67import com .dwolla .util .async .twitter .{CancelledViaCatsEffect , liftFuture }
78import com .twitter .util .{Duration as _ , * }
8- import munit .{CatsEffectSuite , ScalaCheckEffectSuite }
9- import org .scalacheck .{ Prop , Test }
9+ import munit .{AnyFixture , CatsEffectSuite , ScalaCheckEffectSuite }
10+ import org .scalacheck .Arbitrary . arbitrary
1011import org .scalacheck .effect .PropF
12+ import org .scalacheck .{Arbitrary , Gen , Prop , Test }
1113
1214import java .util .concurrent .CancellationException
1315import scala .concurrent .duration .*
16+ import scala .util .control .NoStackTrace
1417
1518class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite {
1619 override def munitIOTimeout : Duration = 1 .minute
@@ -43,39 +46,44 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
4346 }
4447 }
4548
46- test(" a running Twitter Future lifted into IO can be completed or cancelled" ) {
47- PropF .forAllF { (i : Option [Int ]) =>
48- (Supervisor [IO ](await = true ), Dispatcher .parallel[IO ](await = true ))
49- .tupled
50- .use { case (supervisor, dispatcher) =>
51- for {
52- capturedInterruptionThrowable <- Deferred [IO , Throwable ]
53- twitterPromise <- IO (new Promise [Option [Int ]]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
54- startedLatch <- CountDownLatch [IO ](1 )
55- promiseFiber <- IO .uncancelable { poll => // we want only the Future to be cancellable
56- supervisor.supervise(poll(liftFuture[IO ](startedLatch.release.as(twitterPromise))))
57- }
58- _ <- startedLatch.await
59- _ <- completeOrCancel(i, twitterPromise, promiseFiber)
60- cancelledRef <- Ref [IO ].of(false )
61- outcome <- promiseFiber.joinWith(cancelledRef.set(true ).as(None ))
62- wasCancelled <- cancelledRef.get
63-
64- expectCancellation = i.isEmpty
65- _ <- interceptMessageIO[CancellationException ](" Cancelled via cats-effect" ) {
66- capturedInterruptionThrowable
67- .get
68- .timeout(10 .millis)
69- .map(_.asLeft)
70- .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message
71- }
72- .whenA(expectCancellation)
73- } yield {
74- assertEquals(outcome, i)
75- assertEquals(wasCancelled, i.as(false ).getOrElse(true ))
76- assertEquals(Option (CancelledViaCatsEffect ).filter(_ => expectCancellation), twitterPromise.isInterrupted)
77- }
49+ private val supervisorAndDispatcher = ResourceTestLocalFixture (" supervisorAndDispatcher" ,
50+ Supervisor [IO ](await = true ).product(Dispatcher .sequential[IO ](await = true ))
51+ )
52+
53+ override def munitFixtures : Seq [AnyFixture [? ]] = super .munitFixtures ++ Seq (supervisorAndDispatcher)
54+
55+ test(" a running Twitter Future lifted into IO can be completed (as success or failure) or cancelled" ) {
56+ PropF .forAllF { (i : Outcome [IO , Throwable , Int ]) =>
57+ val (supervisor, dispatcher) = supervisorAndDispatcher()
58+
59+ for {
60+ expectedResult <- i.embed(CancelledViaCatsEffect .raiseError[IO , Int ]).attempt
61+ capturedInterruptionThrowable <- Deferred [IO , Throwable ]
62+ twitterPromise <- IO (new Promise [Int ]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
63+ startedLatch <- CountDownLatch [IO ](1 )
64+ promiseFiber <- IO .uncancelable { poll => // we want only the Future to be cancellable
65+ supervisor.supervise(poll(liftFuture[IO ](startedLatch.release.as(twitterPromise))))
66+ }
67+ _ <- startedLatch.await
68+ _ <- completeOrCancel(i, twitterPromise, promiseFiber)
69+ outcome <- promiseFiber.join
70+
71+ outcomeEmittedValue <- outcome.embed(CancelledViaCatsEffect .raiseError[IO , Int ]).attempt
72+
73+ expectCancellation = i.isCanceled
74+ _ <- interceptMessageIO[CancellationException ](" Cancelled via cats-effect" ) {
75+ capturedInterruptionThrowable
76+ .get
77+ .timeout(10 .millis)
78+ .map(_.asLeft)
79+ .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message
7880 }
81+ .whenA(expectCancellation)
82+ } yield {
83+ assertEquals(outcomeEmittedValue, expectedResult)
84+ assertEquals(outcome.isCanceled, i.isCanceled)
85+ assertEquals(Option (CancelledViaCatsEffect ).filter(_ => expectCancellation), twitterPromise.isInterrupted)
86+ }
7987 }
8088 }
8189
@@ -90,6 +98,14 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
9098 }
9199 }
92100
101+ private def genOutcome [F [_] : Applicative , A : Arbitrary ]: Gen [Outcome [F , Throwable , A ]] =
102+ Gen .oneOf(
103+ arbitrary[A ].map(_.pure[F ]).map(Outcome .succeeded[F , Throwable , A ]),
104+ Gen .const(new RuntimeException (" arbitrary exception" ) with NoStackTrace ).map(Outcome .errored[F , Throwable , A ]),
105+ Gen .const(Outcome .canceled[F , Throwable , A ]),
106+ )
107+ private implicit def arbOutcome [F [_] : Applicative , A : Arbitrary ]: Arbitrary [Outcome [F , Throwable , A ]] = Arbitrary (genOutcome)
108+
93109 private def captureThrowableOnInterruption [F [_] : Sync , A ](dispatcher : Dispatcher [F ],
94110 capture : Deferred [F , Throwable ])
95111 (p : Promise [A ]): F [Unit ] =
@@ -99,14 +115,17 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
99115 }
100116 }
101117
102- private def completeOrCancel [F [_] : Sync , A ](maybeA : Option [ A ],
103- promise : Promise [Option [ A ] ],
104- fiber : Fiber [F , Throwable , Option [ A ] ]): F [Unit ] =
118+ private def completeOrCancel [F [_] : Sync , A ](maybeA : Outcome [ F , Throwable , A ],
119+ promise : Promise [A ],
120+ fiber : Fiber [F , Throwable , A ]): F [Unit ] =
105121 maybeA match {
106- case Some (a) => Sync [F ].delay {
107- promise.setValue(a.some)
108- }
109- case None =>
122+ case Outcome .Succeeded (fa) =>
123+ fa.flatMap(a => Sync [F ].delay(promise.setValue(a))).void
124+
125+ case Outcome .Errored (ex) =>
126+ Sync [F ].delay(promise.setException(ex))
127+
128+ case Outcome .Canceled () =>
110129 fiber.cancel
111130 }
112131
0 commit comments