Skip to content

Commit 854d161

Browse files
mfateevmeiliang86
andcommitted
Switched the WorkflowExecution Promise callback to a workflow thread (#431)
Co-authored-by: Liang Mei <[email protected]>
1 parent 15e5b5b commit 854d161

File tree

3 files changed

+48
-3
lines changed

3 files changed

+48
-3
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ target
77
.gradle
88
/build
99
/out
10-
dummy
10+
dummy
11+
$buildDir

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,9 @@ private Promise<byte[]> executeChildWorkflowOnce(
389389
Consumer<Exception> cancellationCallback =
390390
context.startChildWorkflow(
391391
parameters,
392-
executionResult::complete,
392+
(we) ->
393+
runner.executeInWorkflowThread(
394+
"child workflow completion callback", () -> executionResult.complete(we)),
393395
(output, failure) -> {
394396
if (failure != null) {
395397
runner.executeInWorkflowThread(

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1662,7 +1662,7 @@ public String execute(String taskList) {
16621662

16631663
WaitOnSignalWorkflow child =
16641664
Workflow.newChildWorkflowStub(WaitOnSignalWorkflow.class, workflowOptions);
1665-
Promise<Void> promise = Async.procedure(() -> child.execute());
1665+
Promise<Void> promise = Async.procedure(child::execute);
16661666
Promise<WorkflowExecution> executionPromise = Workflow.getWorkflowExecution(child);
16671667
assertNotNull(executionPromise);
16681668
WorkflowExecution execution = executionPromise.get();
@@ -2854,6 +2854,48 @@ public void testChildWorkflowRetryReplay() throws Exception {
28542854
"testChildWorkflowRetryHistory.json", TestChildWorkflowRetryWorkflow.class);
28552855
}
28562856

2857+
public static class TestChildWorkflowExecutionPromiseHandler implements TestWorkflow1 {
2858+
2859+
private ITestNamedChild child;
2860+
2861+
@Override
2862+
public String execute(String taskList) {
2863+
child = Workflow.newChildWorkflowStub(ITestNamedChild.class);
2864+
Promise<String> childResult = Async.function(child::execute, "foo");
2865+
Promise<WorkflowExecution> executionPromise = Workflow.getWorkflowExecution(child);
2866+
CompletablePromise<String> result = Workflow.newPromise();
2867+
// Ensure that the callback can execute Workflow.* functions.
2868+
executionPromise.thenApply(
2869+
(we) -> {
2870+
Workflow.sleep(Duration.ofSeconds(1));
2871+
result.complete(childResult.get());
2872+
return null;
2873+
});
2874+
return result.get();
2875+
}
2876+
}
2877+
2878+
/** Tests that handler of the WorkflowExecution promise is executed in a workflow thread. */
2879+
@Test
2880+
public void testChildWorkflowExecutionPromiseHandler() {
2881+
startWorkerFor(TestChildWorkflowExecutionPromiseHandler.class, TestNamedChild.class);
2882+
2883+
WorkflowOptions.Builder options = new WorkflowOptions.Builder();
2884+
options.setExecutionStartToCloseTimeout(Duration.ofSeconds(20));
2885+
options.setTaskStartToCloseTimeout(Duration.ofSeconds(2));
2886+
options.setTaskList(taskList);
2887+
WorkflowClient wc;
2888+
if (useExternalService) {
2889+
wc = WorkflowClient.newInstance(service, DOMAIN);
2890+
} else {
2891+
wc = testEnvironment.newWorkflowClient();
2892+
}
2893+
2894+
TestWorkflow1 client = wc.newWorkflowStub(TestWorkflow1.class, options.build());
2895+
String result = client.execute(taskList);
2896+
assertEquals("FOO", result);
2897+
}
2898+
28572899
public static class TestSignalExternalWorkflow implements TestWorkflowSignaled {
28582900

28592901
private final SignalingChild child = Workflow.newChildWorkflowStub(SignalingChild.class);

0 commit comments

Comments
 (0)