Skip to content

Commit 73ce124

Browse files
stephbianjohnynek
authored andcommitted
Cancellable Executions (#1920)
* add failing tests * add another failing test to test writes * wip try to thread CancellationHandler through * finish threading cancellation handler through * try to fix test * wip cfuture/cpromise * test passes, everything compiles * reenable tests * remove sys error * fix indenting * don't recover on flow stop * move failFastSequence to CFuture; add CFuture helper methods * uncancellable * move CFuture/CPromise * CFuture.fromFuture * add typeclass * remove cec * execution changes * future cache backwards compatible * AsyncFlowDef fixes * change jdk * add failing test * delete failing test * add flatmap test * add flow step listener * tryFailure * handle null throwable * respond to review * stop flow in promise
1 parent 59d9327 commit 73ce124

File tree

13 files changed

+555
-148
lines changed

13 files changed

+555
-148
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
language: scala
2-
jdk: oraclejdk8
2+
jdk: openjdk8
33
sudo: false
44

55
before_install:

scalding-core/src/main/java/com/twitter/scalding/cascading_interop/FlowListenerPromise.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
package com.twitter.scalding.cascading_interop;
1717

1818
import cascading.flow.FlowListener;
19+
import cascading.flow.FlowException;
20+
import cascading.flow.FlowStepListener;
21+
import cascading.flow.FlowStep;
1922
import cascading.flow.Flow;
2023
import cascading.stats.CascadingStats;
2124

@@ -28,6 +31,12 @@
2831
* deal with in scala
2932
*/
3033
public class FlowListenerPromise {
34+
public static class FlowStopException extends Exception {
35+
public FlowStopException(String message) {
36+
super(message);
37+
}
38+
}
39+
3140
/*
3241
* This starts the flow and applies a mapping function fn in
3342
* the same thread that completion happens
@@ -37,7 +46,7 @@ public static <Config, T> Future<T> start(Flow<Config> flow, final scala.Functio
3746
flow.addListener(new FlowListener() {
3847
public void onStarting(Flow f) { } // ignore
3948
public void onStopping(Flow f) { // in case of runtime exception cascading call onStopping
40-
result.tryFailure(new Exception("Flow was stopped"));
49+
result.tryFailure(new FlowStopException("Flow was stopped"));
4150
}
4251
public void onCompleted(Flow f) {
4352
// This is always called, but onThrowable is called first
@@ -48,15 +57,30 @@ public void onCompleted(Flow f) {
4857
T toPut = (T) fn.apply(f);
4958
result.success(toPut);
5059
} catch (Throwable t) {
51-
result.failure(t);
60+
result.tryFailure(t);
5261
}
5362
} else {
54-
result.failure(new Exception("Flow was not successfully finished"));
63+
result.tryFailure(new Exception("Flow was not successfully finished"));
5564
}
5665
}
5766
}
5867
public boolean onThrowable(Flow f, Throwable t) {
59-
result.failure(t);
68+
result.tryFailure(t);
69+
// The exception is handled by the owner of the promise and should not be rethrown
70+
return true;
71+
}
72+
});
73+
flow.addStepListener(new FlowStepListener() {
74+
public void onStepStarting(FlowStep flowStep) { } // ignore
75+
public void onStepRunning(FlowStep flowStep) { } // ignore
76+
public void onStepCompleted(FlowStep flowStep) { } // ignore
77+
public void onStepStopping(FlowStep f) { result.tryFailure(new FlowStopException("Flow step was stopped")); }
78+
public boolean onStepThrowable(FlowStep f, Throwable t) {
79+
if (t != null) {
80+
result.tryFailure(t);
81+
} else {
82+
result.tryFailure(new FlowException("Flow step failed: " + f.getName()));
83+
}
6084
// The exception is handled by the owner of the promise and should not be rethrown
6185
return true;
6286
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.twitter.scalding
2+
3+
import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext }
4+
5+
/**
6+
* Represents a cancellable future.
7+
*/
8+
case class CFuture[+T](future: Future[T], cancellationHandler: CancellationHandler) {
9+
def map[S](fn: T => S)(implicit cec: ConcurrentExecutionContext): CFuture[S] = {
10+
val mapped = future.map(fn)
11+
CFuture(mapped, cancellationHandler)
12+
}
13+
14+
def mapFuture[S](fn: Future[T] => Future[S]): CFuture[S] = {
15+
val transformed = fn(future)
16+
CFuture(transformed, cancellationHandler)
17+
}
18+
19+
def zip[U](other: CFuture[U])(implicit cec: ConcurrentExecutionContext): CFuture[(T, U)] = {
20+
val zippedFut: Future[(T, U)] = Execution.failFastZip(future, other.future)
21+
val cancelHandler = cancellationHandler.compose(other.cancellationHandler)
22+
23+
CFuture(zippedFut, cancelHandler)
24+
}
25+
}
26+
27+
object CFuture {
28+
def successful[T](result: T): CFuture[T] = {
29+
CFuture(Future.successful(result), CancellationHandler.empty)
30+
}
31+
32+
def failed(t: Throwable): CFuture[Nothing] = {
33+
val f = Future.failed(t)
34+
CFuture(f, CancellationHandler.empty)
35+
}
36+
37+
def uncancellable[T](fut: Future[T]): CFuture[T] = {
38+
CFuture(fut, CancellationHandler.empty)
39+
}
40+
41+
def fromFuture[T](fut: Future[CFuture[T]])(implicit cec: ConcurrentExecutionContext): CFuture[T] = {
42+
CFuture(fut.flatMap(_.future), CancellationHandler.fromFuture(fut.map(_.cancellationHandler)))
43+
}
44+
45+
/**
46+
* Use our internal faster failing zip function rather than the standard one due to waiting
47+
*/
48+
def failFastSequence[T](t: Iterable[CFuture[T]])(implicit cec: ConcurrentExecutionContext): CFuture[List[T]] = {
49+
t.foldLeft(CFuture.successful(Nil: List[T])) { (f, i) =>
50+
f.zip(i).map { case (tail, h) => h :: tail }
51+
}.map(_.reverse)
52+
}
53+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.twitter.scalding
2+
3+
import scala.concurrent.{ Future, Promise, ExecutionContext => ConcurrentExecutionContext }
4+
5+
/**
6+
* Represents a cancellable promise.
7+
*/
8+
case class CPromise[T](promise: Promise[T], cancellationHandler: Promise[CancellationHandler]) {
9+
/**
10+
* Creates a CFuture using the given promises.
11+
*/
12+
def cfuture: CFuture[T] = {
13+
CFuture(promise.future, CancellationHandler.fromFuture(cancellationHandler.future))
14+
}
15+
16+
def completeWith(other: CFuture[T]): this.type = {
17+
// fullfill the main and cancellation handler promises
18+
promise.completeWith(other.future)
19+
cancellationHandler.completeWith(Future.successful(other.cancellationHandler))
20+
this
21+
}
22+
}
23+
object CPromise {
24+
def apply[T](): CPromise[T] = CPromise(Promise[T](), Promise[CancellationHandler]())
25+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.twitter.scalding
2+
3+
import scala.concurrent.{ Future, ExecutionContext => ConcurrentExecutionContext }
4+
5+
sealed trait CancellationHandler { outer =>
6+
def stop()(implicit ec: ConcurrentExecutionContext): Future[Unit]
7+
def compose(other: CancellationHandler): CancellationHandler = new CancellationHandler {
8+
override def stop()(implicit ec: ConcurrentExecutionContext): Future[Unit] = {
9+
other.stop().zip(outer.stop()).map(_ => ())
10+
}
11+
}
12+
}
13+
14+
object CancellationHandler {
15+
val empty: CancellationHandler = new CancellationHandler {
16+
def stop()(implicit ec: ConcurrentExecutionContext): Future[Unit] = Future.successful(())
17+
}
18+
19+
def fromFn(fn: ConcurrentExecutionContext => Future[Unit]): CancellationHandler = new CancellationHandler {
20+
override def stop()(implicit ec: ConcurrentExecutionContext): Future[Unit] = fn(ec)
21+
}
22+
23+
def fromFuture(f: Future[CancellationHandler]): CancellationHandler = new CancellationHandler {
24+
override def stop()(implicit ec: ConcurrentExecutionContext): Future[Unit] = {
25+
f.flatMap(_.stop())
26+
}
27+
}
28+
}

0 commit comments

Comments
 (0)