Skip to content

Commit 25cc83d

Browse files
authored
CancellationScope improvements and documentation (#377)
Added newCancellationScope with function parameter and updated CancellationScope documentation
1 parent dd49525 commit 25cc83d

File tree

5 files changed

+136
-3
lines changed

5 files changed

+136
-3
lines changed

src/main/java/com/uber/cadence/internal/sync/CancellationScopeImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.workflow.CancellationScope;
2121
import com.uber.cadence.workflow.CompletablePromise;
22+
import com.uber.cadence.workflow.Functions;
2223
import com.uber.cadence.workflow.Workflow;
2324
import java.util.ArrayDeque;
2425
import java.util.Deque;
@@ -73,6 +74,13 @@ private static void popCurrent(CancellationScopeImpl expected) {
7374
setParent(parent);
7475
}
7576

77+
public CancellationScopeImpl(
78+
boolean ignoreParentCancellation, Functions.Proc1<CancellationScope> proc) {
79+
this.detached = ignoreParentCancellation;
80+
this.runnable = () -> proc.apply(this);
81+
setParent(current());
82+
}
83+
7684
private void setParent(CancellationScopeImpl parent) {
7785
if (parent == null) {
7886
detached = true;

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,11 @@ public static CancellationScope newCancellationScope(boolean detached, Runnable
296296
return new CancellationScopeImpl(detached, runnable);
297297
}
298298

299+
public static CancellationScope newCancellationScope(
300+
boolean detached, Functions.Proc1<CancellationScope> proc) {
301+
return new CancellationScopeImpl(detached, proc);
302+
}
303+
299304
public static CancellationScopeImpl currentCancellationScope() {
300305
return CancellationScopeImpl.current();
301306
}

src/main/java/com/uber/cadence/workflow/CancellationScope.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
/**
2424
* Handle to a cancellation scope created through {@link Workflow#newCancellationScope(Runnable)} or
2525
* {@link Workflow#newDetachedCancellationScope(Runnable)}. Supports explicit cancelling of the code
26-
* a cancellation scope wraps.
26+
* a cancellation scope wraps. The code in the CancellationScope has to be executed using {@link
27+
* Runnable#run()} method.
2728
*/
2829
public interface CancellationScope extends Runnable {
2930

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@
225225
* activities.deleteLocalFile(processedName);
226226
* }
227227
* }
228-
* });
228+
* }
229+
* ).run();
229230
* }
230231
* }
231232
* }</pre>
@@ -581,6 +582,21 @@ public static WorkflowInfo getWorkflowInfo() {
581582
* blocking after the current scope is cancelled use a scope created through {@link
582583
* #newDetachedCancellationScope(Runnable)}.
583584
*
585+
* <p>Example of running activities in parallel and cancelling them after a specified timeout.
586+
*
587+
* <pre><code>
588+
* List<Promise<String>> results = new ArrayList<>();
589+
* CancellationScope scope = Workflow.newDetachedCancellationScope(() -> {
590+
* Async.function(activities::a1);
591+
* Async.function(activities::a2);
592+
* });
593+
* scope.run(); // returns immediately as the activities are invoked asynchronously
594+
* Workflow.sleep(Duration.ofHours(1));
595+
* // Cancels any activity in the scope that is still running
596+
* scope.cancel("one hour passed");
597+
*
598+
* </code></pre>
599+
*
584600
* @param runnable parameter to wrap in a cancellation scope.
585601
* @return wrapped parameter.
586602
*/
@@ -589,7 +605,52 @@ public static CancellationScope newCancellationScope(Runnable runnable) {
589605
}
590606

591607
/**
592-
* Creates a CancellationScope that is not linked to a parent scope.
608+
* Wraps a procedure in a CancellationScope. The procedure receives the wrapping CancellationScope
609+
* as a parameter. Useful when cancellation is requested from within the wrapped code. The
610+
* following example cancels the sibling activity on any failure.
611+
*
612+
* <pre><code>
613+
* Workflow.newCancellationScope(
614+
* (scope) -> {
615+
* Promise<Void> p1 = Async.proc(activities::a1).exceptionally(ex->
616+
* {
617+
* scope.cancel("a1 failed");
618+
* return null;
619+
* });
620+
*
621+
* Promise<Void> p2 = Async.proc(activities::a2).exceptionally(ex->
622+
* {
623+
* scope.cancel("a2 failed");
624+
* return null;
625+
* });
626+
* Promise.allOf(p1, p2).get();
627+
* })
628+
* .run();
629+
* </code></pre>
630+
*
631+
* @param proc code to wrap in the cancellation scope
632+
* @return wrapped proc
633+
*/
634+
public static CancellationScope newCancellationScope(Functions.Proc1<CancellationScope> proc) {
635+
return WorkflowInternal.newCancellationScope(false, proc);
636+
}
637+
638+
/**
639+
* Creates a CancellationScope that is not linked to a parent scope. {@link
640+
* CancellationScope#run()} must be called to execute the code the scope wraps. The detached scope
641+
* is needed to execute cleanup code after a workflow is cancelled which cancels the root scope
642+
* that wraps the @WorkflowMethod invocation. Here is an example usage:
643+
*
644+
* <pre><code>
645+
* try {
646+
* // workflow logic
647+
* } catch (CancellationException e) {
648+
* CancellationScope detached = Workflow.newDetachedCancellationScope(() -> {
649+
* // cleanup logic
650+
* });
651+
* detached.run();
652+
* }
653+
* </code></pre>
593654
*
594655
* @param runnable parameter to wrap in a cancellation scope.
595656
* @return wrapped parameter.

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,64 @@ public void testExplicitThreadCancellation() throws Throwable {
468468
trace.setExpected(expected);
469469
}
470470

471+
@Test
472+
public void testExplicitCancellationOnFailure() throws Throwable {
473+
trace.add("init");
474+
DeterministicRunner d =
475+
new DeterministicRunnerImpl(
476+
() -> {
477+
trace.add("root started");
478+
Workflow.newCancellationScope(
479+
(scope) -> {
480+
Promise<Void> p1 =
481+
Async.procedure(
482+
() -> {
483+
trace.add("thread1 started");
484+
try {
485+
Workflow.sleep(Duration.ofHours(1));
486+
} catch (Exception e) {
487+
trace.add("thread1 exception: " + e.getClass().getSimpleName());
488+
} finally {
489+
trace.add("thread1 done");
490+
}
491+
});
492+
Promise<Void> p2 =
493+
Async.procedure(
494+
() -> {
495+
trace.add("thread2 started");
496+
try {
497+
throw new RuntimeException("simulated");
498+
} finally {
499+
trace.add("thread2 done");
500+
}
501+
})
502+
.exceptionally(
503+
ex -> {
504+
scope.cancel();
505+
return null;
506+
});
507+
Promise.allOf(p1, p2).get();
508+
})
509+
.run();
510+
trace.add("root done");
511+
});
512+
513+
d.runUntilAllBlocked();
514+
assertTrue(d.stackTrace(), d.isDone());
515+
String[] expected =
516+
new String[] {
517+
"init",
518+
"root started",
519+
"thread1 started",
520+
"thread2 started",
521+
"thread2 done",
522+
"thread1 exception: CancellationException",
523+
"thread1 done",
524+
"root done"
525+
};
526+
trace.setExpected(expected);
527+
}
528+
471529
@Test
472530
public void testDetachedCancellation() throws Throwable {
473531
trace.add("init");

0 commit comments

Comments
 (0)