Skip to content

Commit 8ea6954

Browse files
noctellajenkins
authored andcommitted
util/util-core: Introduce ImmediateValueFuture (continuations execute immediately)
Problem The transforming continuations (ie not side-effects) of `Future.const` go through the scheduler. While this is safe in all use cases, it can have a negative impact on execution times under certain scenarios due to reordering. Solution Introduce `ImmediateValueFuture` that bypasses the scheduler in all cases and executes continuations immediately. This Future type is not safe to use in all scenarios and we outline this in the doc. Differential Revision: https://phabricator.twitter.biz/D1285047
1 parent f8735d2 commit 8ea6954

File tree

2 files changed

+293
-0
lines changed

2 files changed

+293
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.twitter.util
2+
3+
import scala.util.control.NonFatal
4+
5+
/**
6+
* Successful Future that contains a value. Transformations are executed immediately (don't go
7+
* through the scheduler). Unlike `Future.const`, Future recursion *will* grow the stack (see
8+
* `ImmediateValueFutureTest` for an example of this) -- it is therefore extremely important that
9+
* you understand the full context of how this Future will be used in order to avoid this.
10+
*
11+
* DO NOT USE THIS without thoroughly understanding the risks!
12+
*/
13+
private[twitter] class ImmediateValueFuture[A](result: A) extends Future[A] {
14+
15+
private[this] val ReturnResult = Return(result)
16+
17+
def respond(f: Try[A] => Unit): Future[A] = {
18+
val saved = Local.save()
19+
try {
20+
f(ReturnResult)
21+
} catch Monitor.catcher
22+
finally {
23+
Local.restore(saved)
24+
}
25+
this
26+
}
27+
28+
override def proxyTo[B >: A](other: Promise[B]): Unit = {
29+
other.update(ReturnResult)
30+
}
31+
32+
def raise(interrupt: Throwable): Unit = ()
33+
34+
override def rescue[B >: A](rescueException: PartialFunction[Throwable, Future[B]]): Future[B] = {
35+
this
36+
}
37+
38+
protected def transformTry[B](f: Try[A] => Try[B]): Future[B] = {
39+
val saved = Local.save()
40+
try {
41+
f(ReturnResult) match {
42+
case Return(result) => new ImmediateValueFuture(result)
43+
case t @ Throw(_) => Future.const(t)
44+
}
45+
} catch {
46+
case NonFatal(e) => Future.const(Throw(e))
47+
} finally {
48+
Local.restore(saved)
49+
}
50+
}
51+
52+
def transform[B](f: Try[A] => Future[B]): Future[B] = {
53+
val saved = Local.save()
54+
try {
55+
f(ReturnResult)
56+
} catch {
57+
case NonFatal(e) => Future.const(Throw(e))
58+
} finally {
59+
Local.restore(saved)
60+
}
61+
}
62+
63+
def poll: Option[Try[A]] = Some(ReturnResult)
64+
65+
override def toString: String = s"ImmediateValueFuture($result)"
66+
67+
def ready(timeout: Duration)(implicit permit: Awaitable.CanAwait): this.type = this
68+
69+
def result(timeout: Duration)(implicit permit: Awaitable.CanAwait): A = result
70+
71+
def isReady(implicit permit: Awaitable.CanAwait): Boolean = true
72+
}
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
package com.twitter.util
2+
3+
import com.twitter.concurrent.Scheduler
4+
import java.util.concurrent.atomic.AtomicBoolean
5+
import org.scalatest.funsuite.AnyFunSuite
6+
7+
class ImmediateValueFutureTest extends AnyFunSuite {
8+
9+
// The following is intended to demonstrate the differing recursive behaviour of `ConstFuture`
10+
// and `ImmediateValueFuture`. `ConstFuture` essentially does tail-call elimination which means
11+
// the stack will not grow during a recursive call. `ImmediateValueFuture` does not, so the stack
12+
// will grow with each call!
13+
def recurseAndGetStackSizes(f: Future[Unit]): Seq[Int] = {
14+
val stop = new AtomicBoolean(false)
15+
16+
@volatile var loopStackSizes: Seq[Int] = Seq.empty
17+
18+
def loop(): Future[Unit] = {
19+
if (stop.get) {
20+
Future.Done
21+
} else {
22+
f.flatMap { _ =>
23+
loopStackSizes = loopStackSizes :+ new Throwable().getStackTrace.length
24+
loop()
25+
}
26+
}
27+
}
28+
29+
FuturePool.unboundedPool {
30+
loop()
31+
}
32+
33+
while (loopStackSizes.size < 10) {}
34+
stop.set(true)
35+
loopStackSizes
36+
}
37+
38+
test("ConstFuture recursion does not grow the stack") {
39+
val loopStackSizes = recurseAndGetStackSizes(Future.const(Return[Unit](())))
40+
assert(loopStackSizes.forall(_ == loopStackSizes.head))
41+
}
42+
43+
test("ImmediateValueFuture recursion does grow the stack") {
44+
val loopStackSizes = recurseAndGetStackSizes(new ImmediateValueFuture(()))
45+
assert(loopStackSizes.zip(loopStackSizes.tail).forall { case (a, b) => a < b })
46+
}
47+
48+
test(s"ImmediateValueFuture.interruptible should do nothing") {
49+
val f = new ImmediateValueFuture(())
50+
val i = f.interruptible()
51+
i.raise(new Exception())
52+
assert(f.poll.contains(Return[Unit](())))
53+
assert(i.poll.contains(Return[Unit](())))
54+
}
55+
56+
test(s"ImmediateValueFuture should propagate locals and restore original context in `respond`") {
57+
val local = new Local[Int]
58+
val f = new ImmediateValueFuture(111)
59+
60+
var ran = 0
61+
local() = 1010
62+
63+
f.ensure {
64+
assert(local().contains(1010))
65+
local() = 1212
66+
f.ensure {
67+
assert(local().contains(1212))
68+
local() = 1313
69+
ran += 1
70+
}
71+
assert(local().contains(1212))
72+
ran += 1
73+
}
74+
75+
assert(local().contains(1010))
76+
assert(ran == 2)
77+
}
78+
79+
test(
80+
s"ImmediateValueFuture should propagate locals and restore original context in `transform`") {
81+
val local = new Local[Int]
82+
val f = new ImmediateValueFuture(111)
83+
84+
var ran = 0
85+
local() = 1010
86+
87+
f.transform { tryRes =>
88+
assert(local().contains(1010))
89+
local() = 1212
90+
f.transform { tryRes =>
91+
assert(local().contains(1212))
92+
local() = 1313
93+
ran += 1
94+
Future.const(tryRes)
95+
}
96+
assert(local().contains(1212))
97+
ran += 1
98+
Future.const(tryRes)
99+
}
100+
101+
assert(local().contains(1010))
102+
assert(ran == 2)
103+
}
104+
105+
test(s"ImmediateValueFuture should propagate locals and restore original context in `map`") {
106+
val local = new Local[Int]
107+
val f = new ImmediateValueFuture(111)
108+
109+
var ran = 0
110+
local() = 1010
111+
112+
f.map { i =>
113+
assert(local().contains(1010))
114+
local() = 1212
115+
f.map { i =>
116+
assert(local().contains(1212))
117+
local() = 1313
118+
ran += 1
119+
i
120+
}
121+
assert(local().contains(1212))
122+
ran += 1
123+
i
124+
}
125+
126+
assert(local().contains(1010))
127+
assert(ran == 2)
128+
}
129+
130+
test(s"ImmediateValueFuture should not delay execution") {
131+
val numDispatchesBefore = Scheduler().numDispatches
132+
val f = new ImmediateValueFuture(111)
133+
134+
var count = 0
135+
f.onSuccess { _ =>
136+
assert(count == 0)
137+
f.ensure {
138+
assert(count == 0)
139+
count += 1
140+
}
141+
142+
assert(count == 1)
143+
count += 1
144+
}
145+
146+
assert(count == 2)
147+
assert(Scheduler().numDispatches == numDispatchesBefore)
148+
}
149+
150+
test(s"ImmediateValueFuture side effects should be monitored") {
151+
val inner = new ImmediateValueFuture(111)
152+
val exc = new Exception("a raw exception")
153+
154+
var monitored = false
155+
156+
val monitor = new Monitor {
157+
override def handle(exc: Throwable): Boolean = {
158+
monitored = true
159+
true
160+
}
161+
}
162+
163+
Monitor.using(monitor) {
164+
val f = inner.respond { _ =>
165+
throw exc
166+
}
167+
168+
assert(f.poll.contains(Return(111)))
169+
assert(monitored == true)
170+
}
171+
}
172+
173+
test("ImmediateValueFuture.rescue returns self") {
174+
val f = new ImmediateValueFuture(111)
175+
val r = f.rescue {
176+
case e: Exception => Future.value(1)
177+
}
178+
179+
assert(f eq r)
180+
}
181+
182+
test("ImmediateValueFuture.map returns ImmediateValueFuture if f does not throw") {
183+
val f1 = new ImmediateValueFuture(111)
184+
185+
val f2 = f1.map { _ =>
186+
"hello"
187+
}
188+
189+
assert(f2.isInstanceOf[ImmediateValueFuture[String]])
190+
}
191+
192+
test("ImmediateValueFuture.map returns Future.exception if f throws") {
193+
val f1 = new ImmediateValueFuture(111)
194+
195+
val f2 = f1.map { _ =>
196+
throw new Exception("boom!")
197+
}
198+
199+
intercept[Exception](Await.result(f2))
200+
}
201+
202+
test("ImmediateValueFuture.flatMap returns Future.exception if f returns exceptional Future") {
203+
val f1 = new ImmediateValueFuture(111)
204+
205+
val f2 = f1.flatMap { _ =>
206+
Future.exception(new Exception("boom!"))
207+
}
208+
209+
intercept[Exception](Await.result(f2))
210+
}
211+
212+
test("ImmediateValueFuture.transform returns Future.exception if f throws") {
213+
val f1 = new ImmediateValueFuture(111)
214+
215+
val f2 = f1.transform { _ =>
216+
throw new Exception("boom!")
217+
}
218+
219+
intercept[Exception](Await.result(f2))
220+
}
221+
}

0 commit comments

Comments
 (0)