Skip to content

Commit 1f18a97

Browse files
authored
Added simulated timeout support for a child workflow (#139)
* Fxed support for mocked activity implementation registration * Added a check when a child workflow stub is used to signal only * Added simulated timout to child workflow
1 parent 088a36c commit 1f18a97

File tree

10 files changed

+108
-31
lines changed

10 files changed

+108
-31
lines changed

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowStubImpl.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
3535
private final String workflowType;
3636
private final ChildWorkflowOptions options;
3737
private final WorkflowInterceptor decisionContext;
38-
private CompletablePromise<WorkflowExecution> execution = Workflow.newPromise();
38+
private CompletablePromise<WorkflowExecution> execution;
3939

4040
ChildWorkflowStubImpl(
4141
String workflowType, ChildWorkflowOptions options, WorkflowInterceptor decisionContext) {
@@ -80,20 +80,28 @@ public <R> R execute(Class<R> returnType, Object... args) {
8080
public <R> Promise<R> executeAsync(Class<R> returnType, Object... args) {
8181
WorkflowResult<R> result =
8282
decisionContext.executeChildWorkflow(workflowType, returnType, args, options);
83+
execution = Workflow.newPromise();
8384
execution.completeFrom(result.getWorkflowExecution());
8485
return result.getResult();
8586
}
8687

8788
@Override
8889
public void signal(String signalName, Object... args) {
89-
Promise<Void> signalled =
90+
if (execution == null) {
91+
throw new IllegalStateException(
92+
"This stub cannot be used to signal a workflow"
93+
+ " without starting it first. "
94+
+ "To signal a workflow execution that was started elsewhere "
95+
+ "use a stub created through Workflow.newExternalWorkflowStub");
96+
}
97+
Promise<Void> signaled =
9098
decisionContext.signalExternalWorkflow(execution.get(), signalName, args);
9199
if (AsyncInternal.isAsync()) {
92-
AsyncInternal.setAsyncResult(signalled);
100+
AsyncInternal.setAsyncResult(signaled);
93101
return;
94102
}
95103
try {
96-
signalled.get();
104+
signaled.get();
97105
} catch (SignalExternalWorkflowException e) {
98106
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
99107
// an event handling method.

src/main/java/com/uber/cadence/internal/sync/ExternalWorkflowStubImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ public WorkflowExecution getExecution() {
4444

4545
@Override
4646
public void signal(String signalName, Object... args) {
47-
Promise<Void> signalled = decisionContext.signalExternalWorkflow(execution, signalName, args);
47+
Promise<Void> signaled = decisionContext.signalExternalWorkflow(execution, signalName, args);
4848
if (AsyncInternal.isAsync()) {
49-
AsyncInternal.setAsyncResult(signalled);
49+
AsyncInternal.setAsyncResult(signaled);
5050
return;
5151
}
5252
try {
53-
signalled.get();
53+
signaled.get();
5454
} catch (SignalExternalWorkflowException e) {
5555
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
5656
// an event handling method.

src/main/java/com/uber/cadence/internal/sync/POJOActivityTaskHandler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.uber.cadence.internal.common.InternalUtils;
3232
import com.uber.cadence.internal.worker.ActivityTaskHandler;
3333
import com.uber.cadence.serviceclient.IWorkflowService;
34-
import com.uber.cadence.testing.TestActivityTimeoutException;
34+
import com.uber.cadence.testing.SimulatedTimeoutException;
3535
import java.lang.reflect.InvocationTargetException;
3636
import java.lang.reflect.Method;
3737
import java.util.Collections;
@@ -78,6 +78,9 @@ public void addActivityImplementation(Object activity) {
7878
throw new IllegalArgumentException("Activity must implement at least one interface");
7979
}
8080
for (TypeToken<?> i : interfaces) {
81+
if (i.getType().getTypeName().startsWith("org.mockito")) {
82+
continue;
83+
}
8184
for (Method method : i.getRawType().getMethods()) {
8285
POJOActivityImplementation implementation =
8386
new POJOActivityImplementation(method, activity);
@@ -105,10 +108,10 @@ private ActivityTaskHandler.Result mapToActivityFailure(ActivityTask task, Throw
105108
throw new CancellationException(failure.getMessage());
106109
}
107110
// Only expected during unit tests.
108-
if (failure instanceof TestActivityTimeoutException) {
109-
TestActivityTimeoutException timeoutException = (TestActivityTimeoutException) failure;
111+
if (failure instanceof SimulatedTimeoutException) {
112+
SimulatedTimeoutException timeoutException = (SimulatedTimeoutException) failure;
110113
failure =
111-
new TestActivityTimeoutExceptionInternal(
114+
new SimulatedTimeoutExceptionInternal(
112115
timeoutException.getTimeoutType(),
113116
dataConverter.toData(timeoutException.getDetails()));
114117
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.uber.cadence.internal.replay.ReplayWorkflow;
2727
import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
2828
import com.uber.cadence.internal.worker.WorkflowExecutionException;
29+
import com.uber.cadence.testing.SimulatedTimeoutException;
2930
import com.uber.cadence.workflow.Functions;
3031
import com.uber.cadence.workflow.QueryMethod;
3132
import com.uber.cadence.workflow.SignalMethod;
@@ -291,6 +292,15 @@ public void processSignal(String signalName, byte[] input, long eventId) {
291292
public static WorkflowExecutionException mapToWorkflowExecutionException(
292293
Exception failure, DataConverter dataConverter) {
293294
failure = CheckedExceptionWrapper.unwrap(failure);
295+
// Only expected during unit tests.
296+
if (failure instanceof SimulatedTimeoutException) {
297+
SimulatedTimeoutException timeoutException = (SimulatedTimeoutException) failure;
298+
failure =
299+
new SimulatedTimeoutExceptionInternal(
300+
timeoutException.getTimeoutType(),
301+
dataConverter.toData(timeoutException.getDetails()));
302+
}
303+
294304
return new WorkflowExecutionException(
295305
failure.getClass().getName(), dataConverter.toData(failure));
296306
}

src/main/java/com/uber/cadence/internal/sync/TestActivityTimeoutExceptionInternal.java renamed to src/main/java/com/uber/cadence/internal/sync/SimulatedTimeoutExceptionInternal.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@
2020
import com.uber.cadence.TimeoutType;
2121

2222
/**
23-
* TestActivityTimeoutException is created from an TestActivityTimeoutException. The main difference
24-
* is that details are in a serialized form.
23+
* SimulatedTimeoutExceptionInternal is created from a SimulatedTimeoutException. The main
24+
* difference is that the details are in a serialized form.
2525
*/
26-
final class TestActivityTimeoutExceptionInternal extends RuntimeException {
26+
final class SimulatedTimeoutExceptionInternal extends RuntimeException {
2727

2828
private final TimeoutType timeoutType;
2929

3030
private final byte[] details;
3131

32-
TestActivityTimeoutExceptionInternal(TimeoutType timeoutType, byte[] details) {
32+
SimulatedTimeoutExceptionInternal(TimeoutType timeoutType, byte[] details) {
3333
this.timeoutType = timeoutType;
3434
this.details = details;
3535
}
3636

37-
TestActivityTimeoutExceptionInternal(TimeoutType timeoutType) {
37+
SimulatedTimeoutExceptionInternal(TimeoutType timeoutType) {
3838
this.timeoutType = timeoutType;
3939
this.details = null;
4040
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import com.uber.cadence.workflow.ChildWorkflowException;
4040
import com.uber.cadence.workflow.ChildWorkflowFailureException;
4141
import com.uber.cadence.workflow.ChildWorkflowOptions;
42+
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
4243
import com.uber.cadence.workflow.CompletablePromise;
4344
import com.uber.cadence.workflow.ContinueAsNewOptions;
4445
import com.uber.cadence.workflow.Functions;
@@ -180,10 +181,9 @@ private RuntimeException mapActivityException(Exception failure) {
180181
} catch (Exception e) {
181182
cause = e;
182183
}
183-
if (cause instanceof TestActivityTimeoutExceptionInternal) {
184+
if (cause instanceof SimulatedTimeoutExceptionInternal) {
184185
// This exception is thrown only in unit tests to mock the activity timeouts
185-
TestActivityTimeoutExceptionInternal testTimeout =
186-
(TestActivityTimeoutExceptionInternal) cause;
186+
SimulatedTimeoutExceptionInternal testTimeout = (SimulatedTimeoutExceptionInternal) cause;
187187
return new ActivityTimeoutException(
188188
taskFailed.getEventId(),
189189
taskFailed.getActivityType(),
@@ -303,6 +303,11 @@ private RuntimeException mapChildWorkflowException(Exception failure) {
303303
} catch (Exception e) {
304304
cause = e;
305305
}
306+
if (cause instanceof SimulatedTimeoutExceptionInternal) {
307+
// This exception is thrown only in unit tests to mock the child workflow timeouts
308+
return new ChildWorkflowTimedOutException(
309+
taskFailed.getEventId(), taskFailed.getWorkflowExecution(), taskFailed.getWorkflowType());
310+
}
306311
return new ChildWorkflowFailureException(
307312
taskFailed.getEventId(),
308313
taskFailed.getWorkflowExecution(),

src/main/java/com/uber/cadence/testing/TestActivityTimeoutException.java renamed to src/main/java/com/uber/cadence/testing/SimulatedTimeoutException.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,29 @@
2020
import com.uber.cadence.TimeoutType;
2121

2222
/**
23-
* TestActivityTimeoutException can be thrown from an activity implementation to simulate an
24-
* activity timeout. To be used only in unit tests. The workflow code is going to receive it as
25-
* {@link com.uber.cadence.workflow.ActivityTimeoutException}.
23+
* SimulatedTimeoutException can be thrown from an activity or child workflow implementation to
24+
* simulate a timeout. To be used only in unit tests. If thrown from an activity the workflow code
25+
* is going to receive it as {@link com.uber.cadence.workflow.ActivityTimeoutException}. If thrown
26+
* from a child workflow the workflow code is going to receive it as {@link
27+
* com.uber.cadence.workflow.ChildWorkflowTimedOutException}.
2628
*/
27-
public final class TestActivityTimeoutException extends RuntimeException {
29+
public final class SimulatedTimeoutException extends RuntimeException {
2830

2931
private final TimeoutType timeoutType;
3032

3133
private final Object details;
3234

33-
public TestActivityTimeoutException(TimeoutType timeoutType, Object details) {
35+
public SimulatedTimeoutException(TimeoutType timeoutType, Object details) {
3436
this.timeoutType = timeoutType;
3537
this.details = details;
3638
}
3739

38-
public TestActivityTimeoutException(TimeoutType timeoutType) {
40+
public SimulatedTimeoutException() {
41+
this.timeoutType = TimeoutType.START_TO_CLOSE;
42+
this.details = null;
43+
}
44+
45+
public SimulatedTimeoutException(TimeoutType timeoutType) {
3946
this.timeoutType = timeoutType;
4047
this.details = null;
4148
}

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,9 @@ public static ActivityStub newUntypedActivityStub(ActivityOptions options) {
5858
}
5959

6060
/**
61-
* Creates client stub that can be used to start a child workflow that implements given interface
62-
* using parent options.
61+
* Creates client stub that can be used to start a child workflow that implements the given
62+
* interface using parent options. Use {@link #newExternalWorkflowStub(Class, String)} to get a
63+
* stub to signal a workflow without starting it.
6364
*
6465
* @param workflowInterface interface type implemented by activities
6566
*/
@@ -69,6 +70,8 @@ public static <T> T newChildWorkflowStub(Class<T> workflowInterface) {
6970

7071
/**
7172
* Creates client stub that can be used to start a child workflow that implements given interface.
73+
* Use {@link #newExternalWorkflowStub(Class, String)} to get a stub to signal a workflow without
74+
* starting it.
7275
*
7376
* @param workflowInterface interface type implemented by activities
7477
* @param options options passed to the child workflow.

src/main/java/com/uber/cadence/workflow/WorkflowLocal.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
/**
2525
* A value that is local to a single workflow execution. So it can act as a <i>global</i> variable
26-
* for the workflow code. For example the {@code Workflow.isSignalled()} static method returns the
26+
* for the workflow code. For example the {@code Workflow.isSignaled()} static method returns the
2727
* correct value even if there are multiple workflows executing on the same machine simultaneously.
2828
* It would be invalid if the {@code signaled} was a {@code static boolean} variable.
2929
*
@@ -32,7 +32,7 @@
3232
*
3333
* private static final WorkflowLocal<Boolean> signaled = WorkflowLocal.withInitial(() -> false);
3434
*
35-
* public static boolean isSignalled() {
35+
* public static boolean isSignaled() {
3636
* return signaled.get();
3737
* }
3838
*

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,12 @@
3636
import com.uber.cadence.client.WorkflowStub;
3737
import com.uber.cadence.client.WorkflowTimedOutException;
3838
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
39-
import com.uber.cadence.testing.TestActivityTimeoutException;
39+
import com.uber.cadence.testing.SimulatedTimeoutException;
4040
import com.uber.cadence.testing.TestWorkflowEnvironment;
4141
import com.uber.cadence.worker.Worker;
4242
import com.uber.cadence.workflow.ActivityTimeoutException;
4343
import com.uber.cadence.workflow.Async;
44+
import com.uber.cadence.workflow.ChildWorkflowTimedOutException;
4445
import com.uber.cadence.workflow.Promise;
4546
import com.uber.cadence.workflow.SignalMethod;
4647
import com.uber.cadence.workflow.Workflow;
@@ -205,7 +206,7 @@ private static class SimulatedTimeoutActivityImpl implements TestActivity {
205206

206207
@Override
207208
public String activity1(String input) {
208-
throw new TestActivityTimeoutException(TimeoutType.HEARTBEAT, "progress1");
209+
throw new SimulatedTimeoutException(TimeoutType.HEARTBEAT, "progress1");
209210
}
210211
}
211212

@@ -619,4 +620,44 @@ public void testChild() {
619620
String result = workflow.workflow("input1");
620621
assertEquals("child input1", result);
621622
}
623+
624+
public static class SimulatedTimeoutParentWorkflow implements ParentWorkflow {
625+
626+
@Override
627+
public String workflow(String input) {
628+
ChildWorkflow child = Workflow.newChildWorkflowStub(ChildWorkflow.class);
629+
Promise<String> result =
630+
Async.function(child::workflow, input, Workflow.getWorkflowInfo().getWorkflowId());
631+
return result.get();
632+
}
633+
634+
@Override
635+
public void signal(String value) {}
636+
}
637+
638+
public static class SimulatedTimeoutChildWorklfow implements ChildWorkflow {
639+
640+
@Override
641+
public String workflow(String input, String parentId) {
642+
Workflow.sleep(Duration.ofHours(2));
643+
throw new SimulatedTimeoutException();
644+
}
645+
}
646+
647+
@Test
648+
public void testChildSimulatedTimeout() {
649+
Worker worker = testEnvironment.newWorker(TASK_LIST);
650+
worker.registerWorkflowImplementationTypes(
651+
SimulatedTimeoutParentWorkflow.class, SimulatedTimeoutChildWorklfow.class);
652+
worker.start();
653+
WorkflowClient client = testEnvironment.newWorkflowClient();
654+
WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId("parent1").build();
655+
ParentWorkflow workflow = client.newWorkflowStub(ParentWorkflow.class, options);
656+
try {
657+
workflow.workflow("input1");
658+
fail("unreacheable");
659+
} catch (WorkflowException e) {
660+
assertTrue(e.getCause() instanceof ChildWorkflowTimedOutException);
661+
}
662+
}
622663
}

0 commit comments

Comments
 (0)