Skip to content

Commit f3c96d7

Browse files
authored
Fixed workflowId options and annotation (#119)
1 parent e4345b5 commit f3c96d7

File tree

4 files changed

+21
-13
lines changed

4 files changed

+21
-13
lines changed

src/main/java/com/uber/cadence/client/UntypedWorkflowStub.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public interface UntypedWorkflowStub {
2828

2929
WorkflowExecution start(Object... args);
3030

31+
WorkflowExecution getExecution();
32+
3133
/**
3234
* Returns workflow result potentially waiting for workflow to complete.
3335
* Behind the scene this call performs long poll on Cadence service waiting for workflow completion notification.

src/main/java/com/uber/cadence/internal/sync/UntypedWorkflowStubImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ WorkflowExecution startWithOptions(WorkflowOptions o, Object... args) {
9898
try {
9999
execution.set(genericClient.startWorkflow(p));
100100
} catch (WorkflowExecutionAlreadyStartedError e) {
101+
execution.set(new WorkflowExecution().setWorkflowId(p.getWorkflowId()).setRunId(e.getRunId()));
101102
WorkflowExecution execution = new WorkflowExecution()
102103
.setWorkflowId(p.getWorkflowId())
103104
.setRunId(e.getRunId());
@@ -111,6 +112,11 @@ public WorkflowExecution start(Object... args) {
111112
return startWithOptions(WorkflowOptions.merge(null, options), args);
112113
}
113114

115+
@Override
116+
public WorkflowExecution getExecution() {
117+
return execution.get();
118+
}
119+
114120
@Override
115121
public <R> R getResult(Class<R> returnType) {
116122
try {

src/main/java/com/uber/cadence/internal/sync/WorkflowExternalInvocationHandler.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ class WorkflowExternalInvocationHandler implements InvocationHandler {
3838
private final AtomicReference<UntypedWorkflowStubImpl> untyped = new AtomicReference<>();
3939
private final WorkflowOptions options;
4040
private final DataConverter dataConverter;
41-
private final AtomicReference<WorkflowExecution> execution = new AtomicReference<>();
4241
private final GenericWorkflowClientExternal genericClient;
4342
private String workflowType;
4443

@@ -79,7 +78,6 @@ public static void closeAsyncInvocation() {
7978
}
8079
this.genericClient = genericClient;
8180
this.untyped.set(new UntypedWorkflowStubImpl(genericClient, dataConverter, execution));
82-
this.execution.set(execution);
8381
this.options = null;
8482
this.dataConverter = dataConverter;
8583
}
@@ -104,18 +102,16 @@ public Object invoke(Object proxy, Method method, Object[] args) {
104102
if (workflowMethod != null) {
105103
WorkflowOptions mergedOptions = WorkflowOptions.merge(workflowMethod, options);
106104
// We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for result.
107-
if (execution.get() != null) { // stub is reused
105+
UntypedWorkflowStubImpl workflowStub = untyped.get();
106+
if (workflowStub != null) { // stub is reused
108107
if (mergedOptions.getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
109108
throw new DuplicateWorkflowException(
110-
execution.get(), workflowType, "Cannot call @WorkflowMethod more than once per stub instance");
109+
workflowStub.getExecution(), workflowType, "Cannot call @WorkflowMethod more than once per stub instance");
111110
}
112-
return getUntyped().getResult(method.getReturnType());
111+
return workflowStub.getResult(method.getReturnType());
113112
}
114113
return startWorkflow(method, workflowMethod, mergedOptions, args);
115114
}
116-
if (execution.get() == null) {
117-
throw new IllegalStateException("Workflow not started yet");
118-
}
119115
if (queryMethod != null) {
120116
return queryWorkflow(method, queryMethod, args);
121117
}
@@ -172,20 +168,20 @@ private Object startWorkflow(Method method, WorkflowMethod workflowMethod,
172168
if (untyped.get() == null) {
173169
untyped.set(new UntypedWorkflowStubImpl(genericClient, dataConverter, workflowType, mergedOptions));
174170
}
171+
UntypedWorkflowStubImpl workflowStub = getUntyped();
175172
try {
176-
execution.set(getUntyped().startWithOptions(mergedOptions, args));
173+
workflowStub.startWithOptions(mergedOptions, args);
177174
} catch (DuplicateWorkflowException e) {
178-
execution.set(e.getExecution());
179175
// We do allow duplicated calls if policy is not AllowDuplicate. Semantic is to wait for result.
180176
if (mergedOptions.getWorkflowIdReusePolicy() == WorkflowIdReusePolicy.AllowDuplicate) {
181177
throw e;
182178
}
183179
}
184180
AtomicReference<WorkflowExecution> async = asyncResult.get();
185181
if (async != null) {
186-
async.set(execution.get());
182+
async.set(workflowStub.getExecution());
187183
return null;
188184
}
189-
return getUntyped().getResult(method.getReturnType());
185+
return workflowStub.getResult(method.getReturnType());
190186
}
191187
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,9 +759,13 @@ public void mySignal(String value) {
759759
@Test
760760
public void testSignal() throws Exception {
761761
startWorkerFor(TestSignalWorkflowImpl.class);
762-
QueryableWorkflow client = workflowClient.newWorkflowStub(QueryableWorkflow.class, newWorkflowOptionsBuilder().build());
762+
WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder();
763+
String workflowId = UUID.randomUUID().toString();
764+
optionsBuilder.setWorkflowId(workflowId);
765+
QueryableWorkflow client = workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build());
763766
// To execute workflow client.execute() would do. But we want to start workflow and immediately return.
764767
WorkflowExecution execution = WorkflowClient.asyncStart(client::execute);
768+
assertEquals(workflowId, execution.getWorkflowId());
765769
assertEquals("initial", client.getState());
766770
client.mySignal("Hello ");
767771
Thread.sleep(200);

0 commit comments

Comments
 (0)