Skip to content

Commit 9dd9c4a

Browse files
authored
Refactored InvocationHandlers to use Stubs internally. (#134)
* Added ExternalWorkflowInvocationHandler * Migrated Workflow InvocationHandlers to use untyped stub * Renamed Stubs * Added ChildWorkflowStub and ExternalWorkflowStub to isAsync
1 parent 05956a8 commit 9dd9c4a

19 files changed

+242
-202
lines changed

src/main/java/com/uber/cadence/client/WorkflowClient.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ static WorkflowClient newInstance(
201201
* @param options options used to start a workflow through returned stub
202202
* @return Stub that can be used to start workflow and later to signal or query it.
203203
*/
204-
UntypedWorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions options);
204+
WorkflowStub newUntypedWorkflowStub(String workflowType, WorkflowOptions options);
205205

206206
/**
207207
* Creates workflow untyped client stub for a known execution. Use it to send signals or queries
@@ -213,7 +213,7 @@ static WorkflowClient newInstance(
213213
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
214214
* @return Stub that can be used to start workflow and later to signal or query it.
215215
*/
216-
UntypedWorkflowStub newUntypedWorkflowStub(
216+
WorkflowStub newUntypedWorkflowStub(
217217
String workflowId, Optional<String> runId, Optional<String> workflowType);
218218

219219
/**
@@ -224,8 +224,7 @@ UntypedWorkflowStub newUntypedWorkflowStub(
224224
* @param workflowType type of the workflow. Optional as it is used for error reporting only.
225225
* @return Stub that can be used to start workflow and later to signal or query it.
226226
*/
227-
UntypedWorkflowStub newUntypedWorkflowStub(
228-
WorkflowExecution execution, Optional<String> workflowType);
227+
WorkflowStub newUntypedWorkflowStub(WorkflowExecution execution, Optional<String> workflowType);
229228

230229
/**
231230
* Creates new {@link ActivityCompletionClient} that can be used to complete activities

src/main/java/com/uber/cadence/client/WorkflowClientInterceptor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222

2323
public interface WorkflowClientInterceptor {
2424

25-
UntypedWorkflowStub newUntypedWorkflowStub(
26-
String workflowType, WorkflowOptions options, UntypedWorkflowStub next);
25+
WorkflowStub newUntypedWorkflowStub(
26+
String workflowType, WorkflowOptions options, WorkflowStub next);
2727

28-
UntypedWorkflowStub newUntypedWorkflowStub(
29-
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub next);
28+
WorkflowStub newUntypedWorkflowStub(
29+
WorkflowExecution execution, Optional<String> workflowType, WorkflowStub next);
3030

3131
ActivityCompletionClient newActivityCompletionClient(ActivityCompletionClient next);
3232
}

src/main/java/com/uber/cadence/client/WorkflowClientInterceptorBase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@
2323
public class WorkflowClientInterceptorBase implements WorkflowClientInterceptor {
2424

2525
@Override
26-
public UntypedWorkflowStub newUntypedWorkflowStub(
27-
String workflowType, WorkflowOptions options, UntypedWorkflowStub next) {
26+
public WorkflowStub newUntypedWorkflowStub(
27+
String workflowType, WorkflowOptions options, WorkflowStub next) {
2828
return next;
2929
}
3030

3131
@Override
32-
public UntypedWorkflowStub newUntypedWorkflowStub(
33-
WorkflowExecution execution, Optional<String> workflowType, UntypedWorkflowStub next) {
32+
public WorkflowStub newUntypedWorkflowStub(
33+
WorkflowExecution execution, Optional<String> workflowType, WorkflowStub next) {
3434
return next;
3535
}
3636

src/main/java/com/uber/cadence/client/UntypedWorkflowStub.java renamed to src/main/java/com/uber/cadence/client/WorkflowStub.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,13 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.TimeoutException;
2525

26-
public interface UntypedWorkflowStub {
26+
/**
27+
* WorkflowStub is a client side stub to a single workflow instance. It can be used to start,
28+
* signal, query, wait for completion and cancel a workflow execution. Created through {@link
29+
* WorkflowClient#newUntypedWorkflowStub(String, WorkflowOptions)} or {@link
30+
* WorkflowClient#newUntypedWorkflowStub(WorkflowExecution, Optional)}.
31+
*/
32+
public interface WorkflowStub {
2733

2834
void signal(String signalName, Object... args);
2935

src/main/java/com/uber/cadence/internal/common/InternalUtils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package com.uber.cadence.internal.common;
1919

20+
import com.uber.cadence.workflow.WorkflowMethod;
2021
import java.lang.reflect.Method;
2122
import java.time.Duration;
2223

@@ -34,6 +35,38 @@ public static String getSimpleName(Method method) {
3435
return method.getDeclaringClass().getSimpleName() + "::" + method.getName();
3536
}
3637

38+
public static String getWorkflowType(Method method, WorkflowMethod workflowMethod) {
39+
String workflowName = workflowMethod.name();
40+
if (workflowName.isEmpty()) {
41+
return InternalUtils.getSimpleName(method);
42+
} else {
43+
return workflowName;
44+
}
45+
}
46+
47+
public static Method getWorkflowMethod(Class<?> workflowInterface) {
48+
Method result = null;
49+
for (Method m : workflowInterface.getMethods()) {
50+
if (m.getAnnotation(WorkflowMethod.class) != null) {
51+
if (result != null) {
52+
throw new IllegalArgumentException(
53+
"Workflow interface must have exactly one method "
54+
+ "annotated with @WorkflowMethod. Found \""
55+
+ result
56+
+ "\" and \""
57+
+ m
58+
+ "\"");
59+
}
60+
result = m;
61+
}
62+
}
63+
if (result == null) {
64+
throw new IllegalArgumentException(
65+
"Method annotated with @WorkflowMethod is not " + "found at " + workflowInterface);
66+
}
67+
return result;
68+
}
69+
3770
/**
3871
* Convert milliseconds to seconds rounding up. Used by timers to ensure that they never fire
3972
* earlier than requested.

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.uber.cadence.activity.MethodRetry;
2323
import com.uber.cadence.internal.common.InternalUtils;
2424
import com.uber.cadence.internal.sync.AsyncInternal.AsyncMarker;
25-
import com.uber.cadence.workflow.UntypedActivityStub;
25+
import com.uber.cadence.workflow.ActivityStub;
2626
import com.uber.cadence.workflow.Workflow;
2727
import java.lang.reflect.InvocationHandler;
2828
import java.lang.reflect.Method;
@@ -79,8 +79,7 @@ public Object invoke(Object proxy, Method method, Object[] args) {
7979
}
8080

8181
ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options);
82-
UntypedActivityStub stub =
83-
UntypedActivityStubImpl.newInstance(mergedOptions, activityExecutor);
82+
ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor);
8483
function = (a) -> stub.execute(activityName, method.getReturnType(), a);
8584
methodFunctions.put(method, function);
8685
} catch (NoSuchMethodException e) {

src/main/java/com/uber/cadence/internal/sync/UntypedActivityStubImpl.java renamed to src/main/java/com/uber/cadence/internal/sync/ActivityStubImpl.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,22 @@
2020
import com.google.common.base.Defaults;
2121
import com.uber.cadence.activity.ActivityOptions;
2222
import com.uber.cadence.workflow.ActivityException;
23+
import com.uber.cadence.workflow.ActivityStub;
2324
import com.uber.cadence.workflow.Promise;
24-
import com.uber.cadence.workflow.UntypedActivityStub;
2525

2626
/** Supports calling activity by name and arguments without its strongly typed interface. */
27-
class UntypedActivityStubImpl implements UntypedActivityStub {
27+
class ActivityStubImpl implements ActivityStub {
2828

2929
private final ActivityOptions options;
3030
private final ActivityExecutor activityExecutor;
3131

32-
static UntypedActivityStub newInstance(
33-
ActivityOptions options, ActivityExecutor activityExecutor) {
32+
static ActivityStub newInstance(ActivityOptions options, ActivityExecutor activityExecutor) {
3433
ActivityOptions validatedOptions =
3534
new ActivityOptions.Builder(options).validateAndBuildWithDefaults();
36-
return new UntypedActivityStubImpl(validatedOptions, activityExecutor);
35+
return new ActivityStubImpl(validatedOptions, activityExecutor);
3736
}
3837

39-
private UntypedActivityStubImpl(ActivityOptions options, ActivityExecutor activityExecutor) {
38+
private ActivityStubImpl(ActivityOptions options, ActivityExecutor activityExecutor) {
4039
this.options = options;
4140
this.activityExecutor = activityExecutor;
4241
}

src/main/java/com/uber/cadence/internal/sync/AsyncInternal.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121

2222
import com.uber.cadence.common.RetryOptions;
2323
import com.uber.cadence.internal.common.LambdaUtils;
24+
import com.uber.cadence.workflow.ActivityStub;
25+
import com.uber.cadence.workflow.ChildWorkflowStub;
2426
import com.uber.cadence.workflow.CompletablePromise;
27+
import com.uber.cadence.workflow.ExternalWorkflowStub;
2528
import com.uber.cadence.workflow.Functions;
2629
import com.uber.cadence.workflow.Promise;
27-
import com.uber.cadence.workflow.UntypedActivityStub;
2830
import com.uber.cadence.workflow.Workflow;
2931
import java.lang.invoke.MethodHandleInfo;
3032
import java.lang.invoke.SerializedLambda;
@@ -303,7 +305,9 @@ private static <R> Promise<R> execute(boolean async, Functions.Func<R> func) {
303305
public static boolean isAsync(Object func) {
304306
SerializedLambda lambda = LambdaUtils.toSerializedLambda(func);
305307
Object target = getTarget(lambda);
306-
return target instanceof UntypedActivityStub
308+
return target instanceof ActivityStub
309+
|| target instanceof ChildWorkflowStub
310+
|| target instanceof ExternalWorkflowStub
307311
|| (target instanceof AsyncMarker
308312
&& lambda.getImplMethodKind() == MethodHandleInfo.REF_invokeInterface);
309313
}

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowInvocationHandler.java

Lines changed: 21 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,52 +17,43 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20-
import com.google.common.base.Defaults;
21-
import com.uber.cadence.WorkflowExecution;
20+
import static com.uber.cadence.internal.common.InternalUtils.getWorkflowMethod;
21+
import static com.uber.cadence.internal.common.InternalUtils.getWorkflowType;
22+
2223
import com.uber.cadence.activity.MethodRetry;
23-
import com.uber.cadence.converter.DataConverter;
2424
import com.uber.cadence.internal.common.InternalUtils;
25-
import com.uber.cadence.workflow.ChildWorkflowException;
2625
import com.uber.cadence.workflow.ChildWorkflowOptions;
27-
import com.uber.cadence.workflow.CompletablePromise;
28-
import com.uber.cadence.workflow.Promise;
26+
import com.uber.cadence.workflow.ChildWorkflowStub;
2927
import com.uber.cadence.workflow.QueryMethod;
30-
import com.uber.cadence.workflow.SignalExternalWorkflowException;
3128
import com.uber.cadence.workflow.SignalMethod;
32-
import com.uber.cadence.workflow.Workflow;
3329
import com.uber.cadence.workflow.WorkflowMethod;
3430
import java.lang.reflect.InvocationHandler;
3531
import java.lang.reflect.Method;
3632

3733
/** Dynamic implementation of a strongly typed child workflow interface. */
3834
class ChildWorkflowInvocationHandler implements InvocationHandler {
3935

40-
private final ChildWorkflowOptions options;
41-
private final SyncDecisionContext decisionContext;
42-
private final DataConverter dataConverter;
43-
private CompletablePromise<WorkflowExecution> execution = Workflow.newPromise();
44-
private boolean startRequested;
36+
private final ChildWorkflowStub stub;
4537

4638
ChildWorkflowInvocationHandler(
47-
ChildWorkflowOptions options, SyncDecisionContext decisionContext) {
48-
this.options = options;
49-
this.decisionContext = decisionContext;
50-
dataConverter = decisionContext.getDataConverter();
51-
}
39+
Class<?> workflowInterface,
40+
ChildWorkflowOptions options,
41+
SyncDecisionContext decisionContext) {
42+
Method workflowMethod = getWorkflowMethod(workflowInterface);
43+
WorkflowMethod workflowAnnotation = workflowMethod.getAnnotation(WorkflowMethod.class);
44+
String workflowType = getWorkflowType(workflowMethod, workflowAnnotation);
45+
MethodRetry retryAnnotation = workflowMethod.getAnnotation(MethodRetry.class);
5246

53-
public ChildWorkflowInvocationHandler(
54-
WorkflowExecution execution, SyncDecisionContext decisionContext) {
55-
this.options = null;
56-
this.decisionContext = decisionContext;
57-
dataConverter = decisionContext.getDataConverter();
58-
this.execution.complete(execution);
47+
ChildWorkflowOptions merged =
48+
ChildWorkflowOptions.merge(workflowAnnotation, retryAnnotation, options);
49+
this.stub = new ChildWorkflowStubImpl(workflowType, merged, decisionContext);
5950
}
6051

6152
@Override
6253
public Object invoke(Object proxy, Method method, Object[] args) {
6354
// Implement WorkflowStub
6455
if (method.getName().equals(WorkflowStub.GET_EXECUTION_METHOD_NAME)) {
65-
return execution;
56+
return stub.getExecution();
6657
}
6758
WorkflowMethod workflowMethod = method.getAnnotation(WorkflowMethod.class);
6859
QueryMethod queryMethod = method.getAnnotation(QueryMethod.class);
@@ -78,17 +69,12 @@ public Object invoke(Object proxy, Method method, Object[] args) {
7869
+ "from @WorkflowMethod, @QueryMethod or @SignalMethod");
7970
}
8071
if (workflowMethod != null) {
81-
if (startRequested) {
82-
throw new IllegalStateException("Already started: " + execution);
83-
}
84-
startRequested = true;
85-
return executeChildWorkflow(method, workflowMethod, args);
72+
return stub.execute(method.getReturnType(), args);
8673
}
8774
if (queryMethod != null) {
88-
if (execution == null) {
89-
throw new IllegalStateException("Workflow not started yet");
90-
}
91-
return queryWorkflow(method, queryMethod, args);
75+
throw new UnsupportedOperationException(
76+
"Query is not supported from workflow to workflow. "
77+
+ "Use activity that perform the query instead.");
9278
}
9379
if (signalMethod != null) {
9480
signalWorkflow(method, signalMethod, args);
@@ -103,51 +89,6 @@ private void signalWorkflow(Method method, SignalMethod signalMethod, Object[] a
10389
if (signalName.isEmpty()) {
10490
signalName = InternalUtils.getSimpleName(method);
10591
}
106-
Promise<Void> signalled = decisionContext.signalWorkflow(execution.get(), signalName, args);
107-
if (AsyncInternal.isAsync()) {
108-
AsyncInternal.setAsyncResult(signalled);
109-
return;
110-
}
111-
try {
112-
signalled.get();
113-
} catch (SignalExternalWorkflowException e) {
114-
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
115-
// an event handling method.
116-
e.setStackTrace(Thread.currentThread().getStackTrace());
117-
throw e;
118-
}
119-
}
120-
121-
private Object queryWorkflow(Method method, QueryMethod queryMethod, Object[] args) {
122-
throw new UnsupportedOperationException(
123-
"Query is not supported from workflow to workflow. "
124-
+ "Use activity that perform the query instead.");
125-
}
126-
127-
private Object executeChildWorkflow(Method method, WorkflowMethod workflowMethod, Object[] args) {
128-
String workflowType = workflowMethod.name();
129-
if (workflowType.isEmpty()) {
130-
workflowType = InternalUtils.getSimpleName(method);
131-
}
132-
byte[] input = dataConverter.toData(args);
133-
MethodRetry retry = method.getAnnotation(MethodRetry.class);
134-
ChildWorkflowOptions merged = ChildWorkflowOptions.merge(workflowMethod, retry, options);
135-
Promise<byte[]> encodedResult =
136-
decisionContext.executeChildWorkflow(workflowType, merged, input, execution);
137-
Promise<?> result =
138-
encodedResult.thenApply(
139-
(encoded) -> dataConverter.fromData(encoded, method.getReturnType()));
140-
if (AsyncInternal.isAsync()) {
141-
AsyncInternal.setAsyncResult(result);
142-
return Defaults.defaultValue(method.getReturnType());
143-
}
144-
try {
145-
return result.get();
146-
} catch (ChildWorkflowException e) {
147-
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
148-
// an event handling method.
149-
e.setStackTrace(Thread.currentThread().getStackTrace());
150-
throw e;
151-
}
92+
stub.signal(signalName, args);
15293
}
15394
}

0 commit comments

Comments
 (0)