11package com .dwolla .util .async
22
3+ import cats .*
34import cats .effect .*
45import cats .effect .std .*
56import cats .syntax .all .*
67import com .dwolla .util .async .twitter .{CancelledViaCatsEffect , liftFuture }
78import com .twitter .util .{Duration as _ , * }
89import munit .{CatsEffectSuite , ScalaCheckEffectSuite }
9- import org .scalacheck .{ Prop , Test }
10+ import org .scalacheck .Arbitrary . arbitrary
1011import org .scalacheck .effect .PropF
12+ import org .scalacheck .{Arbitrary , Gen , Prop , Test }
1113
1214import java .util .concurrent .CancellationException
1315import scala .concurrent .duration .*
16+ import scala .util .control .NoStackTrace
1417
1518class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite {
1619 override def munitIOTimeout : Duration = 1 .minute
@@ -43,25 +46,34 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
4346 }
4447 }
4548
46- test(" a running Twitter Future lifted into IO can be completed or cancelled" ) {
47- PropF .forAllF { (i : Option [Int ]) =>
49+ // private val supervisorAndDispatcher = ResourceSuiteLocalFixture("supervisorAndDispatcher",
50+ // (Supervisor[IO](await = true), Dispatcher.sequential[IO](await = true).onFinalize(IO.println("shutting down Dispatcher"))).tupled
51+ // )
52+
53+ // override def munitFixtures: Seq[AnyFixture[?]] = super.munitFixtures ++ Seq(supervisorAndDispatcher)
54+
55+ test(" a running Twitter Future lifted into IO can be completed (as success or failure) or cancelled" ) {
56+ PropF .forAllF { (i : Outcome [IO , Throwable , Int ]) =>
57+ // val (supervisor, dispatcher) = supervisorAndDispatcher()
58+
4859 (Supervisor [IO ](await = true ), Dispatcher .parallel[IO ](await = true ))
4960 .tupled
5061 .use { case (supervisor, dispatcher) =>
5162 for {
63+ expectedResult <- i.embed(CancelledViaCatsEffect .raiseError[IO , Int ]).attempt
5264 capturedInterruptionThrowable <- Deferred [IO , Throwable ]
53- twitterPromise <- IO (new Promise [Option [ Int ] ]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
65+ twitterPromise <- IO (new Promise [Int ]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
5466 startedLatch <- CountDownLatch [IO ](1 )
5567 promiseFiber <- IO .uncancelable { poll => // we want only the Future to be cancellable
5668 supervisor.supervise(poll(liftFuture[IO ](startedLatch.release.as(twitterPromise))))
5769 }
5870 _ <- startedLatch.await
5971 _ <- completeOrCancel(i, twitterPromise, promiseFiber)
60- cancelledRef <- Ref [ IO ].of( false )
61- outcome <- promiseFiber.joinWith(cancelledRef.set( true ).as( None ))
62- wasCancelled <- cancelledRef.get
72+ outcome <- promiseFiber.join
73+
74+ outcomeEmittedValue <- outcome.embed( CancelledViaCatsEffect .raiseError[ IO , Int ]).attempt
6375
64- expectCancellation = i.isEmpty
76+ expectCancellation = i.isCanceled
6577 _ <- interceptMessageIO[CancellationException ](" Cancelled via cats-effect" ) {
6678 capturedInterruptionThrowable
6779 .get
@@ -71,8 +83,8 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
7183 }
7284 .whenA(expectCancellation)
7385 } yield {
74- assertEquals(outcome, i )
75- assertEquals(wasCancelled , i.as( false ).getOrElse( true ) )
86+ assertEquals(outcomeEmittedValue, expectedResult )
87+ assertEquals(outcome.isCanceled , i.isCanceled )
7688 assertEquals(Option (CancelledViaCatsEffect ).filter(_ => expectCancellation), twitterPromise.isInterrupted)
7789 }
7890 }
@@ -90,6 +102,14 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
90102 }
91103 }
92104
105+ private def genOutcome [F [_] : Applicative , A : Arbitrary ]: Gen [Outcome [F , Throwable , A ]] =
106+ Gen .oneOf(
107+ arbitrary[A ].map(_.pure[F ]).map(Outcome .succeeded[F , Throwable , A ]),
108+ Gen .const(new RuntimeException (" arbitrary exception" ) with NoStackTrace ).map(Outcome .errored[F , Throwable , A ]),
109+ Gen .const(Outcome .canceled[F , Throwable , A ]),
110+ )
111+ private implicit def arbOutcome [F [_] : Applicative , A : Arbitrary ]: Arbitrary [Outcome [F , Throwable , A ]] = Arbitrary (genOutcome)
112+
93113 private def captureThrowableOnInterruption [F [_] : Sync , A ](dispatcher : Dispatcher [F ],
94114 capture : Deferred [F , Throwable ])
95115 (p : Promise [A ]): F [Unit ] =
@@ -99,14 +119,17 @@ class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectS
99119 }
100120 }
101121
102- private def completeOrCancel [F [_] : Sync , A ](maybeA : Option [ A ],
103- promise : Promise [Option [ A ] ],
104- fiber : Fiber [F , Throwable , Option [ A ] ]): F [Unit ] =
122+ private def completeOrCancel [F [_] : Sync , A ](maybeA : Outcome [ F , Throwable , A ],
123+ promise : Promise [A ],
124+ fiber : Fiber [F , Throwable , A ]): F [Unit ] =
105125 maybeA match {
106- case Some (a) => Sync [F ].delay {
107- promise.setValue(a.some)
108- }
109- case None =>
126+ case Outcome .Succeeded (fa) =>
127+ fa.flatMap(a => Sync [F ].delay(promise.setValue(a))).void
128+
129+ case Outcome .Errored (ex) =>
130+ Sync [F ].delay(promise.setException(ex))
131+
132+ case Outcome .Canceled () =>
110133 fiber.cancel
111134 }
112135
0 commit comments