Skip to content

Commit 05956a8

Browse files
authored
Added support for calling activities and child workflows through untyped interfaces. (#133)
1 parent 418ea38 commit 05956a8

21 files changed

+1211
-211
lines changed

src/main/java/com/uber/cadence/activity/ActivityOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ public ActivityOptions build() {
150150
retryOptions);
151151
}
152152

153-
private ActivityOptions validateAndBuildWithDefaults() {
153+
public ActivityOptions validateAndBuildWithDefaults() {
154154
if (scheduleToCloseTimeout == null
155155
&& (scheduleToStartTimeout == null || startToCloseTimeout == null)) {
156156
throw new IllegalStateException(

src/main/java/com/uber/cadence/client/DuplicateWorkflowException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* <ul>
4444
* Method annotated with {@link com.uber.cadence.workflow.WorkflowMethod} is called <i>more than
4545
* once</i> on a stub created through {@link
46-
* com.uber.cadence.workflow.Workflow#newWorkflowStub(Class)} and the {@link
46+
* com.uber.cadence.workflow.Workflow#newChildWorkflowStub(Class)} and the {@link
4747
* WorkflowOptions#getWorkflowIdReusePolicy()} is {@link
4848
* com.uber.cadence.WorkflowIdReusePolicy#AllowDuplicate}
4949
* </ul>

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ private ReplayDecider createDecider(HistoryHelper historyHelper) throws Exceptio
154154
PollForDecisionTaskResponse decisionTask = historyHelper.getDecisionTask();
155155
WorkflowType workflowType = decisionTask.getWorkflowType();
156156
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
157-
return new ReplayDecider(
158-
domain, workflowFactory.getWorkflow(workflowType), historyHelper, decisionsHelper);
157+
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
158+
return new ReplayDecider(domain, workflow, historyHelper, decisionsHelper);
159159
}
160160
}

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java

Lines changed: 33 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,26 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20-
import com.google.common.base.Defaults;
2120
import com.uber.cadence.activity.ActivityMethod;
2221
import com.uber.cadence.activity.ActivityOptions;
2322
import com.uber.cadence.activity.MethodRetry;
2423
import com.uber.cadence.internal.common.InternalUtils;
2524
import com.uber.cadence.internal.sync.AsyncInternal.AsyncMarker;
26-
import com.uber.cadence.workflow.ActivityException;
27-
import com.uber.cadence.workflow.Promise;
25+
import com.uber.cadence.workflow.UntypedActivityStub;
2826
import com.uber.cadence.workflow.Workflow;
2927
import java.lang.reflect.InvocationHandler;
3028
import java.lang.reflect.Method;
3129
import java.lang.reflect.Proxy;
30+
import java.util.HashMap;
31+
import java.util.Map;
32+
import java.util.function.Function;
3233

3334
/** Dynamic implementation of a strongly typed child workflow interface. */
3435
class ActivityInvocationHandler implements InvocationHandler {
3536

3637
private final ActivityOptions options;
3738
private final ActivityExecutor activityExecutor;
39+
private final Map<Method, Function<Object[], Object>> methodFunctions = new HashMap<>();
3840

3941
static InvocationHandler newInstance(ActivityOptions options, ActivityExecutor activityExecutor) {
4042
return new ActivityInvocationHandler(options, activityExecutor);
@@ -56,41 +58,35 @@ private ActivityInvocationHandler(ActivityOptions options, ActivityExecutor acti
5658

5759
@Override
5860
public Object invoke(Object proxy, Method method, Object[] args) {
59-
try {
60-
if (method.equals(Object.class.getMethod("toString"))) {
61-
// TODO: activity info
62-
return "ActivityInvocationHandler";
61+
Function<Object[], Object> function = methodFunctions.get(method);
62+
if (function == null) {
63+
try {
64+
if (method.equals(Object.class.getMethod("toString"))) {
65+
// TODO: activity info
66+
return "ActivityInvocationHandler";
67+
}
68+
if (!method.getDeclaringClass().isInterface()) {
69+
throw new IllegalArgumentException(
70+
"Interface type is expected: " + method.getDeclaringClass());
71+
}
72+
MethodRetry methodRetry = method.getAnnotation(MethodRetry.class);
73+
ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class);
74+
String activityName;
75+
if (activityMethod == null || activityMethod.name().isEmpty()) {
76+
activityName = InternalUtils.getSimpleName(method);
77+
} else {
78+
activityName = activityMethod.name();
79+
}
80+
81+
ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options);
82+
UntypedActivityStub stub =
83+
UntypedActivityStubImpl.newInstance(mergedOptions, activityExecutor);
84+
function = (a) -> stub.execute(activityName, method.getReturnType(), a);
85+
methodFunctions.put(method, function);
86+
} catch (NoSuchMethodException e) {
87+
throw Workflow.wrap(e);
6388
}
64-
} catch (NoSuchMethodException e) {
65-
throw Workflow.wrap(e);
66-
}
67-
if (!method.getDeclaringClass().isInterface()) {
68-
throw new IllegalArgumentException(
69-
"Interface type is expected: " + method.getDeclaringClass());
70-
}
71-
ActivityMethod activityMethod = method.getAnnotation(ActivityMethod.class);
72-
String activityName;
73-
if (activityMethod == null || activityMethod.name().isEmpty()) {
74-
activityName = InternalUtils.getSimpleName(method);
75-
} else {
76-
activityName = activityMethod.name();
77-
}
78-
MethodRetry methodRetry = method.getAnnotation(MethodRetry.class);
79-
ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options);
80-
Promise<?> result =
81-
activityExecutor.executeActivity(activityName, mergedOptions, args, method.getReturnType());
82-
if (AsyncInternal.isAsync()) {
83-
AsyncInternal.setAsyncResult(result);
84-
return Defaults.defaultValue(method.getReturnType());
85-
}
86-
try {
87-
return result.get();
88-
} catch (ActivityException e) {
89-
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
90-
// an event handling method.
91-
StackTraceElement[] currentStackTrace = Thread.currentThread().getStackTrace();
92-
e.setStackTrace(currentStackTrace);
93-
throw e;
9489
}
90+
return function.apply(args);
9591
}
9692
}

src/main/java/com/uber/cadence/internal/sync/AsyncInternal.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.uber.cadence.workflow.CompletablePromise;
2525
import com.uber.cadence.workflow.Functions;
2626
import com.uber.cadence.workflow.Promise;
27+
import com.uber.cadence.workflow.UntypedActivityStub;
2728
import com.uber.cadence.workflow.Workflow;
2829
import java.lang.invoke.MethodHandleInfo;
2930
import java.lang.invoke.SerializedLambda;
@@ -302,8 +303,9 @@ private static <R> Promise<R> execute(boolean async, Functions.Func<R> func) {
302303
public static boolean isAsync(Object func) {
303304
SerializedLambda lambda = LambdaUtils.toSerializedLambda(func);
304305
Object target = getTarget(lambda);
305-
return target instanceof AsyncMarker
306-
&& lambda.getImplMethodKind() == MethodHandleInfo.REF_invokeInterface;
306+
return target instanceof UntypedActivityStub
307+
|| (target instanceof AsyncMarker
308+
&& lambda.getImplMethodKind() == MethodHandleInfo.REF_invokeInterface);
307309
}
308310

309311
private static boolean hasAsyncResult() {

src/main/java/com/uber/cadence/internal/sync/ChildWorkflowInvocationHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,15 @@ private Object queryWorkflow(Method method, QueryMethod queryMethod, Object[] ar
125125
}
126126

127127
private Object executeChildWorkflow(Method method, WorkflowMethod workflowMethod, Object[] args) {
128-
String workflowName = workflowMethod.name();
129-
if (workflowName.isEmpty()) {
130-
workflowName = InternalUtils.getSimpleName(method);
128+
String workflowType = workflowMethod.name();
129+
if (workflowType.isEmpty()) {
130+
workflowType = InternalUtils.getSimpleName(method);
131131
}
132132
byte[] input = dataConverter.toData(args);
133133
MethodRetry retry = method.getAnnotation(MethodRetry.class);
134134
ChildWorkflowOptions merged = ChildWorkflowOptions.merge(workflowMethod, retry, options);
135135
Promise<byte[]> encodedResult =
136-
decisionContext.executeChildWorkflow(workflowName, merged, input, execution);
136+
decisionContext.executeChildWorkflow(workflowType, merged, input, execution);
137137
Promise<?> result =
138138
encodedResult.thenApply(
139139
(encoded) -> dataConverter.fromData(encoded, method.getReturnType()));
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
import com.google.common.base.Defaults;
21+
import com.uber.cadence.WorkflowExecution;
22+
import com.uber.cadence.converter.DataConverter;
23+
import com.uber.cadence.workflow.ChildWorkflowException;
24+
import com.uber.cadence.workflow.ChildWorkflowOptions;
25+
import com.uber.cadence.workflow.ChildWorkflowStub;
26+
import com.uber.cadence.workflow.CompletablePromise;
27+
import com.uber.cadence.workflow.Promise;
28+
import com.uber.cadence.workflow.SignalExternalWorkflowException;
29+
import com.uber.cadence.workflow.Workflow;
30+
import java.util.Objects;
31+
32+
class ChildWorkflowStubImpl implements ChildWorkflowStub {
33+
34+
private final String workflowType;
35+
private final ChildWorkflowOptions options;
36+
private final SyncDecisionContext decisionContext;
37+
private final DataConverter dataConverter;
38+
private CompletablePromise<WorkflowExecution> execution = Workflow.newPromise();
39+
40+
ChildWorkflowStubImpl(
41+
String workflowType, ChildWorkflowOptions options, SyncDecisionContext decisionContext) {
42+
this.workflowType = Objects.requireNonNull(workflowType);
43+
this.options = new ChildWorkflowOptions.Builder(options).validateAndBuildWithDefaults();
44+
this.decisionContext = Objects.requireNonNull(decisionContext);
45+
dataConverter = Objects.requireNonNull(decisionContext.getDataConverter());
46+
}
47+
48+
@Override
49+
public String getWorkflowType() {
50+
return workflowType;
51+
}
52+
53+
@Override
54+
public Promise<WorkflowExecution> getExecution() {
55+
return execution;
56+
}
57+
58+
@Override
59+
public ChildWorkflowOptions getOptions() {
60+
return options;
61+
}
62+
63+
@Override
64+
public <R> R execute(Class<R> returnType, Object... args) {
65+
Promise<R> result = executeAsync(returnType, args);
66+
if (AsyncInternal.isAsync()) {
67+
AsyncInternal.setAsyncResult(result);
68+
return Defaults.defaultValue(returnType);
69+
}
70+
try {
71+
return result.get();
72+
} catch (ChildWorkflowException e) {
73+
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
74+
// an event handling method.
75+
e.setStackTrace(Thread.currentThread().getStackTrace());
76+
throw e;
77+
}
78+
}
79+
80+
@Override
81+
public <R> Promise<R> executeAsync(Class<R> returnType, Object... args) {
82+
byte[] input = dataConverter.toData(args);
83+
Promise<byte[]> encodedResult =
84+
decisionContext.executeChildWorkflow(workflowType, options, input, execution);
85+
return encodedResult.thenApply((encoded) -> dataConverter.fromData(encoded, returnType));
86+
}
87+
88+
@Override
89+
public void signal(String signalName, Object... args) {
90+
Promise<Void> signalled = decisionContext.signalWorkflow(execution.get(), signalName, args);
91+
if (AsyncInternal.isAsync()) {
92+
AsyncInternal.setAsyncResult(signalled);
93+
return;
94+
}
95+
try {
96+
signalled.get();
97+
} catch (SignalExternalWorkflowException e) {
98+
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
99+
// an event handling method.
100+
e.setStackTrace(Thread.currentThread().getStackTrace());
101+
throw e;
102+
}
103+
}
104+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.sync;
19+
20+
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.converter.DataConverter;
22+
import com.uber.cadence.workflow.ExternalWorkflowStub;
23+
import com.uber.cadence.workflow.Promise;
24+
import com.uber.cadence.workflow.SignalExternalWorkflowException;
25+
import java.util.Objects;
26+
27+
/** Dynamic implementation of a strongly typed child workflow interface. */
28+
class ExternalWorkflowStubImpl implements ExternalWorkflowStub {
29+
30+
private final SyncDecisionContext decisionContext;
31+
private final DataConverter dataConverter;
32+
private final WorkflowExecution execution;
33+
34+
public ExternalWorkflowStubImpl(
35+
WorkflowExecution execution, SyncDecisionContext decisionContext) {
36+
this.decisionContext = Objects.requireNonNull(decisionContext);
37+
dataConverter = Objects.requireNonNull(decisionContext).getDataConverter();
38+
this.execution = Objects.requireNonNull(execution);
39+
}
40+
41+
@Override
42+
public WorkflowExecution getExecution() {
43+
return execution;
44+
}
45+
46+
@Override
47+
public void signal(String signalName, Object... args) {
48+
Promise<Void> signalled = decisionContext.signalWorkflow(execution, signalName, args);
49+
if (AsyncInternal.isAsync()) {
50+
AsyncInternal.setAsyncResult(signalled);
51+
return;
52+
}
53+
try {
54+
signalled.get();
55+
} catch (SignalExternalWorkflowException e) {
56+
// Reset stack to the current one. Otherwise it is very confusing to see a stack of
57+
// an event handling method.
58+
e.setStackTrace(Thread.currentThread().getStackTrace());
59+
throw e;
60+
}
61+
}
62+
63+
@Override
64+
public void cancel() {
65+
decisionContext.requestCancelWorkflowExecution(execution);
66+
}
67+
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,12 @@ private SyncWorkflowDefinition getWorkflowDefinition(WorkflowType workflowType)
140140
Functions.Func<SyncWorkflowDefinition> factory =
141141
workflowDefinitions.get(workflowType.getName());
142142
if (factory == null) {
143-
return null;
143+
// throw Error to abort decision, not fail the workflow
144+
throw new Error(
145+
"Unknown workflow type \""
146+
+ workflowType.getName()
147+
+ "\". Known types are "
148+
+ workflowDefinitions.keySet());
144149
}
145150
try {
146151
return factory.apply();
@@ -282,4 +287,12 @@ public static WorkflowExecutionException mapToWorkflowExecutionException(
282287
return new WorkflowExecutionException(
283288
failure.getClass().getName(), dataConverter.toData(failure));
284289
}
290+
291+
@Override
292+
public String toString() {
293+
return "POJOWorkflowImplementationFactory{"
294+
+ "registeredWorkflowTypes="
295+
+ workflowDefinitions.keySet()
296+
+ '}';
297+
}
285298
}

0 commit comments

Comments
 (0)