Skip to content

Commit 4a84205

Browse files
committed
implement the Twitter Future cancellation protocol
1 parent 24c73e1 commit 4a84205

File tree

4 files changed

+375
-15
lines changed

4 files changed

+375
-15
lines changed

full_diff.txt

Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
diff --git a/project/AsyncUtilsBuildPlugin.scala b/project/AsyncUtilsBuildPlugin.scala
2+
index 03cfccc..48d7e89 100644
3+
--- a/project/AsyncUtilsBuildPlugin.scala
4+
+++ b/project/AsyncUtilsBuildPlugin.scala
5+
@@ -158,6 +158,9 @@ object AsyncUtilsBuildPlugin extends AutoPlugin {
6+
Seq(
7+
"org.typelevel" %% "cats-effect" % CatsEffect3V,
8+
"com.twitter" %% "util-core" % v,
9+
+ "org.scalameta" %% "munit" % "1.2.1" % Test,
10+
+ "org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test,
11+
+ "org.typelevel" %% "scalacheck-effect-munit" % "2.1.0-RC1" % Test,
12+
) ++ (if (scalaVersion.value.startsWith("2")) scala2CompilerPlugins else Nil)
13+
},
14+
mimaPreviousArtifacts += organizationName.value %% name.value % "0.3.0",
15+
diff --git a/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala b/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala
16+
index 01d3070..32751c9 100644
17+
--- a/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala
18+
+++ b/twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala
19+
@@ -1,13 +1,16 @@
20+
package com.dwolla.util.async
21+
22+
-import cats._
23+
-import cats.data._
24+
-import cats.effect._
25+
-import cats.syntax.all._
26+
-import cats.tagless._
27+
-import cats.tagless.syntax.all._
28+
+import cats.*
29+
+import cats.data.*
30+
+import cats.effect.*
31+
+import cats.syntax.all.*
32+
+import cats.tagless.*
33+
+import cats.tagless.syntax.all.*
34+
import com.twitter.util
35+
36+
+import java.util.concurrent.CancellationException
37+
+import scala.util.control.NoStackTrace
38+
+
39+
object twitter extends ToAsyncFunctorKOps {
40+
implicit def twitterFutureAsyncFunctorK[F[_]]: util.Future ~~> F = new (util.Future ~~> F) {
41+
override def asyncMapK[Alg[_[_]] : FunctorK](alg: Alg[util.Future])
42+
@@ -34,15 +37,43 @@ class PartiallyAppliedProvide[F[_]](private val dummy: Boolean = true) extends A
43+
}
44+
45+
class PartiallyAppliedLiftFuture[F[_]] {
46+
- def apply[A](fa: F[util.Future[A]])
47+
- (implicit
48+
- F: Async[F]): F[A] =
49+
- Async[F].async[A] { cb =>
50+
- fa.map {
51+
- _.respond {
52+
- case util.Return(a) => cb(Right(a))
53+
- case util.Throw(ex) => cb(Left(ex))
54+
+ def apply[A](ffa: F[util.Future[A]])
55+
+ (implicit F: Async[F]): F[A] =
56+
+ MonadCancelThrow[F].uncancelable { (poll: Poll[F]) =>
57+
+ poll {
58+
+ Async[F].async[A] { cb: (Either[Throwable, A] => Unit) =>
59+
+ ffa
60+
+ .flatMap { fa =>
61+
+ Sync[F].delay {
62+
+ fa.respond {
63+
+ case util.Return(a) => cb(Right(a))
64+
+ case util.Throw(ex) => cb(Left(ex))
65+
+ }
66+
+ }
67+
+ }
68+
+ .map { fa =>
69+
+ Sync[F].delay {
70+
+ fa.raise(CancelledViaCatsEffect)
71+
+ }.some
72+
+ }
73+
}
74+
- }.as(None)
75+
+ }
76+
+ .recoverWith(recoverFromCancelledViaCatsEffect)
77+
}
78+
+
79+
+ /**
80+
+ * According to CE maintainer Daniel Spiewak in Discord, there's
81+
+ * a race condition in the CE runtime that means sometimes it will
82+
+ * see the future as completed (with the `CancelledViaCatsEffect`
83+
+ * exception) before it transitions into the canceled state. This
84+
+ * `recoverWith` should prevent that from happening.
85+
+ */
86+
+ private final def recoverFromCancelledViaCatsEffect[A](implicit F: Async[F]): PartialFunction[Throwable, F[A]] = {
87+
+ case CancelledViaCatsEffect =>
88+
+ Async[F].canceled >> Async[F].never
89+
+ }
90+
}
91+
+
92+
+object CancelledViaCatsEffect
93+
+ extends CancellationException("Cancelled via cats-effect")
94+
+ with NoStackTrace
95+
diff --git a/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala
96+
new file mode 100644
97+
index 0000000..5abd193
98+
--- /dev/null
99+
+++ b/twitter-futures/src/test/scala/com/dwolla/util/async/twitter/TwitterFutureAsyncMapKTests.scala
100+
@@ -0,0 +1,113 @@
101+
+package com.dwolla.util.async
102+
+
103+
+import cats.effect.*
104+
+import cats.effect.std.*
105+
+import cats.syntax.all.*
106+
+import com.dwolla.util.async.twitter.liftFuture
107+
+import com.twitter.util.{Duration as _, *}
108+
+import munit.{CatsEffectSuite, ScalaCheckEffectSuite}
109+
+import org.scalacheck.{Prop, Test}
110+
+import org.scalacheck.effect.PropF
111+
+
112+
+import java.util.concurrent.CancellationException
113+
+import scala.concurrent.duration.*
114+
+
115+
+class TwitterFutureAsyncMapKTests extends CatsEffectSuite with ScalaCheckEffectSuite {
116+
+ override def munitIOTimeout: Duration = 1.minute
117+
+
118+
+ override protected def scalaCheckTestParameters: Test.Parameters =
119+
+ super.scalaCheckTestParameters.withMinSuccessfulTests(100000)
120+
+
121+
+ test("lift a Twitter Future into IO") {
122+
+ PropF.forAllF { (i: Int) =>
123+
+ for {
124+
+ promise <- IO(Promise[Int]())
125+
+ (x, _) <- liftFuture[IO](IO(promise)).both(IO(promise.setValue(i)))
126+
+ } yield {
127+
+ assertEquals(x, i)
128+
+ }
129+
+ }
130+
+ }
131+
+
132+
+ test("cancelling a running Twitter Future lifted into IO should interrupt the underlying Twitter Future") {
133+
+ for {
134+
+ promise <- IO(Promise[Int]())
135+
+ startedLatch <- CountDownLatch[IO](1)
136+
+ fiber <- IO.uncancelable { poll => // we only want the Future to be cancellable
137+
+ poll(liftFuture[IO](startedLatch.release.as(promise))).start
138+
+ }
139+
+ _ <- startedLatch.await
140+
+ _ <- fiber.cancel
141+
+ } yield {
142+
+ assert(promise.isInterrupted.isDefined)
143+
+ }
144+
+ }
145+
+
146+
+ test("a running Twitter Future lifted into IO can be completed or cancelled") {
147+
+ PropF.forAllF { (i: Option[Int]) =>
148+
+ (Supervisor[IO](await = true), Dispatcher.parallel[IO](await = true))
149+
+ .tupled
150+
+ .use { case (supervisor, dispatcher) =>
151+
+ for {
152+
+ capturedInterruptionThrowable <- Deferred[IO, Throwable]
153+
+ twitterPromise <- IO(new Promise[Option[Int]]()).flatTap(captureThrowableOnInterruption(dispatcher, capturedInterruptionThrowable))
154+
+ startedLatch <- CountDownLatch[IO](1)
155+
+ promiseFiber <- IO.uncancelable { poll => // we only want the Future to be cancellable
156+
+ supervisor.supervise(poll(liftFuture[IO](startedLatch.release.as(twitterPromise))))
157+
+ }
158+
+ _ <- startedLatch.await
159+
+ _ <- completeOrCancel(i, twitterPromise, promiseFiber)
160+
+ cancelledRef <- Ref[IO].of(false)
161+
+ outcome <- promiseFiber.joinWith(cancelledRef.set(true).as(None))
162+
+ wasCancelled <- cancelledRef.get
163+
+
164+
+ expectCancellation = i.isEmpty
165+
+ _ <- interceptMessageIO[CancellationException]("Cancelled via cats-effect") {
166+
+ capturedInterruptionThrowable
167+
+ .get
168+
+ .timeout(10.millis)
169+
+ .map(_.asLeft)
170+
+ .rethrow // interceptMessageIO works by throwing an exception, so we need to rethrow it to get the message
171+
+ }
172+
+ .whenA(expectCancellation)
173+
+ } yield {
174+
+ assertEquals(outcome, i)
175+
+ assertEquals(wasCancelled, i.as(false).getOrElse(true))
176+
+ assertEquals(Option(CancelledViaCatsEffect).filter(_ => expectCancellation), twitterPromise.isInterrupted)
177+
+ }
178+
+ }
179+
+ }
180+
+ }
181+
+
182+
+ // just here to make sure we understand how Twitter Future / Promise handles interruption
183+
+ test("the Twitter Future cancellation protocol") {
184+
+ Prop.forAll { (throwable: Throwable) =>
185+
+ val promise = Promise[Int]()
186+
+
187+
+ promise.raise(throwable)
188+
+
189+
+ assertEquals(promise.isInterrupted, throwable.some)
190+
+ }
191+
+ }
192+
+
193+
+ private def captureThrowableOnInterruption[F[_] : Sync, A](dispatcher: Dispatcher[F],
194+
+ capture: Deferred[F, Throwable])
195+
+ (p: Promise[A]): F[Unit] =
196+
+ Sync[F].delay {
197+
+ p.setInterruptHandler { case ex =>
198+
+ dispatcher.unsafeRunSync(capture.complete(ex).void)
199+
+ }
200+
+ }
201+
+
202+
+ private def completeOrCancel[F[_] : Sync, A](maybeA: Option[A],
203+
+ promise: Promise[Option[A]],
204+
+ fiber: Fiber[F, Throwable, Option[A]]): F[Unit] =
205+
+ maybeA match {
206+
+ case Some(a) => Sync[F].delay {
207+
+ promise.setValue(a.some)
208+
+ }
209+
+ case None =>
210+
+ fiber.cancel
211+
+ }
212+
+
213+
+}

project/AsyncUtilsBuildPlugin.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ object AsyncUtilsBuildPlugin extends AutoPlugin {
158158
Seq(
159159
"org.typelevel" %% "cats-effect" % CatsEffect3V,
160160
"com.twitter" %% "util-core" % v,
161+
"org.scalameta" %% "munit" % "1.2.1" % Test,
162+
"org.typelevel" %% "munit-cats-effect" % "2.1.0" % Test,
163+
"org.typelevel" %% "scalacheck-effect-munit" % "2.1.0-RC1" % Test,
161164
) ++ (if (scalaVersion.value.startsWith("2")) scala2CompilerPlugins else Nil)
162165
},
163166
mimaPreviousArtifacts += organizationName.value %% name.value % "0.3.0",

twitter-futures/src/main/scala/com/dwolla/util/async/twitter.scala

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package com.dwolla.util.async
22

3-
import cats._
4-
import cats.data._
5-
import cats.effect._
6-
import cats.syntax.all._
7-
import cats.tagless._
8-
import cats.tagless.syntax.all._
3+
import cats.*
4+
import cats.data.*
5+
import cats.effect.*
6+
import cats.syntax.all.*
7+
import cats.tagless.*
8+
import cats.tagless.syntax.all.*
99
import com.twitter.util
1010

11+
import java.util.concurrent.CancellationException
12+
import scala.util.control.NoStackTrace
13+
1114
object twitter extends ToAsyncFunctorKOps {
1215
implicit def twitterFutureAsyncFunctorK[F[_]]: util.Future ~~> F = new (util.Future ~~> F) {
1316
override def asyncMapK[Alg[_[_]] : FunctorK](alg: Alg[util.Future])
@@ -34,15 +37,43 @@ class PartiallyAppliedProvide[F[_]](private val dummy: Boolean = true) extends A
3437
}
3538

3639
class PartiallyAppliedLiftFuture[F[_]] {
37-
def apply[A](fa: F[util.Future[A]])
38-
(implicit
39-
F: Async[F]): F[A] =
40-
Async[F].async[A] { cb =>
41-
fa.map {
42-
_.respond {
43-
case util.Return(a) => cb(Right(a))
44-
case util.Throw(ex) => cb(Left(ex))
40+
def apply[A](ffa: F[util.Future[A]])
41+
(implicit F: Async[F]): F[A] =
42+
MonadCancelThrow[F].uncancelable { (poll: Poll[F]) =>
43+
poll {
44+
Async[F].async[A] { cb: (Either[Throwable, A] => Unit) =>
45+
ffa
46+
.flatMap { fa =>
47+
Sync[F].delay {
48+
fa.respond {
49+
case util.Return(a) => cb(Right(a))
50+
case util.Throw(ex) => cb(Left(ex))
51+
}
52+
}
53+
}
54+
.map { fa =>
55+
Sync[F].delay {
56+
fa.raise(CancelledViaCatsEffect)
57+
}.some
58+
}
4559
}
46-
}.as(None)
60+
}
61+
.recoverWith(recoverFromCancelledViaCatsEffect)
4762
}
63+
64+
/**
65+
* According to CE maintainer Daniel Spiewak in Discord, there's
66+
* a race condition in the CE runtime that means sometimes it will
67+
* see the future as completed (with the `CancelledViaCatsEffect`
68+
* exception) before it transitions into the canceled state. This
69+
* `recoverWith` should prevent that from happening.
70+
*/
71+
private final def recoverFromCancelledViaCatsEffect[A](implicit F: Async[F]): PartialFunction[Throwable, F[A]] = {
72+
case CancelledViaCatsEffect =>
73+
Async[F].canceled >> Async[F].never
74+
}
4875
}
76+
77+
case object CancelledViaCatsEffect
78+
extends CancellationException("Cancelled via cats-effect")
79+
with NoStackTrace

0 commit comments

Comments
 (0)