Skip to content

Commit 64d3cfb

Browse files
authored
Merge pull request #152 from meiliang86/child-wf-stub
fix issues in Async promise execution
2 parents 9917088 + b0e439f commit 64d3cfb

File tree

7 files changed

+79
-14
lines changed

7 files changed

+79
-14
lines changed

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowStubImpl.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,14 @@ class ChildWorkflowStubImpl implements ChildWorkflowStub {
3535
private final String workflowType;
3636
private final ChildWorkflowOptions options;
3737
private final WorkflowInterceptor decisionContext;
38-
private CompletablePromise<WorkflowExecution> execution;
38+
private final CompletablePromise<WorkflowExecution> execution;
3939

4040
ChildWorkflowStubImpl(
4141
String workflowType, ChildWorkflowOptions options, WorkflowInterceptor decisionContext) {
4242
this.workflowType = Objects.requireNonNull(workflowType);
4343
this.options = new ChildWorkflowOptions.Builder(options).validateAndBuildWithDefaults();
4444
this.decisionContext = Objects.requireNonNull(decisionContext);
45+
this.execution = Workflow.newPromise();
4546
}
4647

4748
@Override
@@ -80,20 +81,12 @@ public <R> R execute(Class<R> returnType, Object... args) {
8081
public <R> Promise<R> executeAsync(Class<R> returnType, Object... args) {
8182
WorkflowResult<R> result =
8283
decisionContext.executeChildWorkflow(workflowType, returnType, args, options);
83-
execution = Workflow.newPromise();
8484
execution.completeFrom(result.getWorkflowExecution());
8585
return result.getResult();
8686
}
8787

8888
@Override
8989
public void signal(String signalName, Object... args) {
90-
if (execution == null) {
91-
throw new IllegalStateException(
92-
"This stub cannot be used to signal a workflow"
93-
+ " without starting it first. "
94-
+ "To signal a workflow execution that was started elsewhere "
95-
+ "use a stub created through Workflow.newExternalWorkflowStub");
96-
}
9790
Promise<Void> signaled =
9891
decisionContext.signalExternalWorkflow(execution.get(), signalName, args);
9992
if (AsyncInternal.isAsync()) {

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public static <T> T newExternalWorkflowStub(
167167
new ExternalWorkflowInvocationHandler(execution, getWorkflowInterceptor()));
168168
}
169169

170-
public static Promise<WorkflowExecution> getChildWorkflowExecution(Object workflowStub) {
170+
public static Promise<WorkflowExecution> getWorkflowExecution(Object workflowStub) {
171171
if (workflowStub instanceof WorkflowStub) {
172172
return ((WorkflowStub) workflowStub).__getWorkflowExecution();
173173
}

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,7 @@ public void childWorkflowStarted(ChildWorkflowExecutionStartedEventAttributes a)
545545
String childId = a.getWorkflowExecution().getWorkflowId();
546546
StateMachine<ChildWorkflowData> child = getChildWorkflow(childId);
547547
child.action(StateMachines.Action.START, ctx, a, 0);
548+
scheduleDecision(ctx);
548549
// No need to lock until completion as child workflow might skip
549550
// time as well
550551
ctx.unlockTimer();

src/main/java/com/uber/cadence/workflow/Async.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public static Promise<Void> procedure(Functions.Proc procedure) {
148148
* @return Promise that contains procedure result or failure
149149
*/
150150
public static <A1> Promise<Void> procedure(Functions.Proc1<A1> procedure, A1 arg1) {
151-
return procedure(() -> procedure.apply(arg1));
151+
return AsyncInternal.procedure(procedure, arg1);
152152
}
153153

154154
/**

src/main/java/com/uber/cadence/workflow/Functions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public interface Func6<T1, T2, T3, T4, T5, T6, R> extends Serializable {
5757
}
5858

5959
@FunctionalInterface
60-
public interface Proc {
60+
public interface Proc extends Serializable {
6161
void apply();
6262
}
6363

src/main/java/com/uber/cadence/workflow/Workflow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ public static <R> R newExternalWorkflowStub(
443443
* as child workflow start is asynchronous.
444444
*/
445445
public static Promise<WorkflowExecution> getWorkflowExecution(Object childWorkflowStub) {
446-
return WorkflowInternal.getChildWorkflowExecution(childWorkflowStub);
446+
return WorkflowInternal.getWorkflowExecution(childWorkflowStub);
447447
}
448448

449449
/**

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import static org.junit.Assert.assertEquals;
2121
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertNotEquals;
23+
import static org.junit.Assert.assertNotNull;
2224
import static org.junit.Assert.assertNull;
2325
import static org.junit.Assert.assertTrue;
2426
import static org.junit.Assert.fail;
@@ -46,6 +48,7 @@
4648
import com.uber.cadence.common.RetryOptions;
4749
import com.uber.cadence.converter.JsonDataConverter;
4850
import com.uber.cadence.internal.sync.DeterministicRunnerTest;
51+
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
4952
import com.uber.cadence.testing.TestEnvironmentOptions;
5053
import com.uber.cadence.testing.TestEnvironmentOptions.Builder;
5154
import com.uber.cadence.testing.TestWorkflowEnvironment;
@@ -140,6 +143,7 @@ protected void failed(Throwable e, Description description) {
140143
private TestWorkflowEnvironment testEnvironment;
141144
private ScheduledExecutorService scheduledExecutor;
142145
private List<ScheduledFuture<?>> delayedCallbacks = new ArrayList<>();
146+
private static WorkflowServiceTChannel service = new WorkflowServiceTChannel();
143147

144148
private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList) {
145149
return new WorkflowOptions.Builder()
@@ -173,7 +177,7 @@ public void setUp() {
173177
if (useExternalService) {
174178
WorkerOptions workerOptions =
175179
new WorkerOptions.Builder().setInterceptorFactory(tracer).build();
176-
worker = new Worker(DOMAIN, taskList, workerOptions);
180+
worker = new Worker(service, DOMAIN, taskList, workerOptions);
177181
workflowClient = WorkflowClient.newInstance(DOMAIN);
178182
WorkflowClientOptions clientOptions =
179183
new WorkflowClientOptions.Builder()
@@ -1011,6 +1015,73 @@ public void testChildAsyncWorkflow() {
10111015
assertEquals(null, client.execute(taskList));
10121016
}
10131017

1018+
// This workflow is designed specifically for testing some internal logic in Async.procedure
1019+
// and ChildWorkflowStubImpl. See comments on testChildAsyncLambdaWorkflow for more details.
1020+
public interface WaitOnSignalWorkflow {
1021+
1022+
@WorkflowMethod()
1023+
void execute();
1024+
1025+
@SignalMethod
1026+
void signal(String value);
1027+
}
1028+
1029+
public static class TestWaitOnSignalWorkflowImpl implements WaitOnSignalWorkflow {
1030+
private final CompletablePromise<String> signal = Workflow.newPromise();
1031+
1032+
@Override
1033+
public void execute() {
1034+
signal.get();
1035+
}
1036+
1037+
@Override
1038+
public void signal(String value) {
1039+
signal.complete(value);
1040+
}
1041+
}
1042+
1043+
public static class TestChildAsyncLambdaWorkflow implements TestWorkflow1 {
1044+
1045+
@Override
1046+
public String execute(String taskList) {
1047+
ChildWorkflowOptions workflowOptions =
1048+
new ChildWorkflowOptions.Builder()
1049+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(100))
1050+
.setTaskStartToCloseTimeout(Duration.ofSeconds(60))
1051+
.setTaskList(taskList)
1052+
.build();
1053+
1054+
WaitOnSignalWorkflow child =
1055+
Workflow.newChildWorkflowStub(WaitOnSignalWorkflow.class, workflowOptions);
1056+
Promise<Void> promise = Async.procedure(() -> child.execute());
1057+
Promise<WorkflowExecution> executionPromise = Workflow.getWorkflowExecution(child);
1058+
assertNotNull(executionPromise);
1059+
WorkflowExecution execution = executionPromise.get();
1060+
assertNotEquals("", execution.getWorkflowId());
1061+
assertNotEquals("", execution.getRunId());
1062+
child.signal("test");
1063+
1064+
promise.get();
1065+
return null;
1066+
}
1067+
}
1068+
1069+
// The purpose of this test is to exercise the lambda execution logic inside Async.procedure(),
1070+
// which executes on a different thread than workflow-main. This is different than executing
1071+
// classes that implements the workflow method interface, which executes on the workflow main
1072+
// thread.
1073+
@Test
1074+
public void testChildAsyncLambdaWorkflow() {
1075+
startWorkerFor(TestChildAsyncLambdaWorkflow.class, TestWaitOnSignalWorkflowImpl.class);
1076+
1077+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
1078+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200));
1079+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(60));
1080+
options.setTaskList(taskList);
1081+
TestWorkflow1 client = workflowClient.newWorkflowStub(TestWorkflow1.class, options.build());
1082+
assertEquals(null, client.execute(taskList));
1083+
}
1084+
10141085
public static class TestUntypedChildStubWorkflow implements TestWorkflow1 {
10151086

10161087
@Override

0 commit comments

Comments
 (0)