Skip to content

Commit 049c29c

Browse files
authored
Add executeWorkflow method for WorkflowInterceptor to match go client library (#779)
* executeWorkflowPath -- added executeWorkflow method -- added ExecutionInput class for getting input -- fixed tracer and tests
1 parent 868c392 commit 049c29c

File tree

8 files changed

+123
-27
lines changed

8 files changed

+123
-27
lines changed

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@ public WorkflowInterceptor getWorkflowInterceptor() {
127127
return headInterceptor;
128128
}
129129

130+
@Override
131+
public byte[] executeWorkflow(
132+
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
133+
return workflowDefinition.execute(input.getInput());
134+
}
135+
130136
@Override
131137
public <T> Promise<T> executeActivity(
132138
String activityName,

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowDefinition.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20-
interface SyncWorkflowDefinition {
20+
public interface SyncWorkflowDefinition {
2121

2222
byte[] execute(byte[] input);
2323

src/main/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternal.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,9 @@
2929
import com.uber.cadence.serviceclient.IWorkflowService;
3030
import com.uber.cadence.testing.TestActivityEnvironment;
3131
import com.uber.cadence.testing.TestEnvironmentOptions;
32-
import com.uber.cadence.workflow.ActivityFailureException;
33-
import com.uber.cadence.workflow.ChildWorkflowOptions;
34-
import com.uber.cadence.workflow.ContinueAsNewOptions;
32+
import com.uber.cadence.workflow.*;
3533
import com.uber.cadence.workflow.Functions.Func;
3634
import com.uber.cadence.workflow.Functions.Func1;
37-
import com.uber.cadence.workflow.Promise;
38-
import com.uber.cadence.workflow.Workflow;
39-
import com.uber.cadence.workflow.WorkflowInterceptor;
4035
import java.lang.reflect.InvocationHandler;
4136
import java.lang.reflect.Proxy;
4237
import java.lang.reflect.Type;
@@ -107,7 +102,8 @@ public <T> T newActivityStub(Class<T> activityInterface) {
107102
ActivityOptions options =
108103
new ActivityOptions.Builder().setScheduleToCloseTimeout(Duration.ofDays(1)).build();
109104
InvocationHandler invocationHandler =
110-
ActivityInvocationHandler.newInstance(options, new TestActivityExecutor(workflowService));
105+
ActivityInvocationHandler.newInstance(
106+
options, new TestActivityExecutor(workflowService, null));
111107
invocationHandler = new DeterministicRunnerWrapper(invocationHandler);
112108
return ActivityInvocationHandlerBase.newProxy(activityInterface, invocationHandler);
113109
}
@@ -131,12 +127,13 @@ public void setWorkflowService(IWorkflowService workflowService) {
131127
this.activityTaskHandler.setWorkflowService(service);
132128
}
133129

134-
private class TestActivityExecutor implements WorkflowInterceptor {
130+
private class TestActivityExecutor extends WorkflowInterceptorBase {
135131

136132
@SuppressWarnings("UnusedVariable")
137133
private final IWorkflowService workflowService;
138134

139-
TestActivityExecutor(IWorkflowService workflowService) {
135+
TestActivityExecutor(IWorkflowService workflowService, WorkflowInterceptorBase next) {
136+
super(next);
140137
this.workflowService = workflowService;
141138
}
142139

src/main/java/com/uber/cadence/internal/sync/WorkflowRunnable.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.uber.cadence.internal.sync;
1919

2020
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
21+
import com.uber.cadence.workflow.WorkflowInterceptor;
2122
import java.util.Objects;
2223

2324
class WorkflowRunnable implements Runnable {
@@ -43,8 +44,10 @@ public WorkflowRunnable(
4344
@Override
4445
public void run() {
4546
try {
46-
47-
output = workflow.execute(attributes.getInput());
47+
WorkflowInterceptor interceptor = context.getWorkflowInterceptor();
48+
output =
49+
interceptor.executeWorkflow(
50+
workflow, new WorkflowInterceptor.WorkflowExecuteInput(attributes));
4851
} finally {
4952
done = true;
5053
}

src/main/java/com/uber/cadence/workflow/WorkflowInterceptor.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,42 @@
1818
package com.uber.cadence.workflow;
1919

2020
import com.uber.cadence.WorkflowExecution;
21+
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
22+
import com.uber.cadence.WorkflowType;
2123
import com.uber.cadence.activity.ActivityOptions;
2224
import com.uber.cadence.activity.LocalActivityOptions;
25+
import com.uber.cadence.internal.sync.SyncWorkflowDefinition;
26+
import com.uber.cadence.internal.worker.WorkflowExecutionException;
2327
import com.uber.cadence.workflow.Functions.Func;
2428
import java.lang.reflect.Type;
2529
import java.time.Duration;
26-
import java.util.Map;
27-
import java.util.Optional;
28-
import java.util.Random;
29-
import java.util.UUID;
30+
import java.util.*;
31+
import java.util.concurrent.CancellationException;
3032
import java.util.function.BiPredicate;
3133
import java.util.function.Supplier;
3234

3335
public interface WorkflowInterceptor {
3436

37+
public final class WorkflowExecuteInput {
38+
private final WorkflowExecutionStartedEventAttributes workflowEventStart;
39+
private final WorkflowType workflowType;
40+
private final byte[] input;
41+
42+
public WorkflowExecuteInput(WorkflowExecutionStartedEventAttributes workflowEventStart) {
43+
this.workflowEventStart = workflowEventStart;
44+
this.workflowType = workflowEventStart.workflowType;
45+
this.input = workflowEventStart.getInput();
46+
}
47+
48+
public WorkflowType getWorkflowType() {
49+
return workflowType;
50+
}
51+
52+
public byte[] getInput() {
53+
return input;
54+
}
55+
}
56+
3557
final class WorkflowResult<R> {
3658

3759
private final Promise<R> result;
@@ -51,6 +73,10 @@ public Promise<WorkflowExecution> getWorkflowExecution() {
5173
}
5274
}
5375

76+
// to match behavior in go client: interceptor executeWorkflow method
77+
byte[] executeWorkflow(SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input)
78+
throws CancellationException, WorkflowExecutionException;
79+
5480
<R> Promise<R> executeActivity(
5581
String activityName,
5682
Class<R> resultClass,

src/main/java/com/uber/cadence/workflow/WorkflowInterceptorBase.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,12 @@
2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.activity.ActivityOptions;
2222
import com.uber.cadence.activity.LocalActivityOptions;
23+
import com.uber.cadence.internal.sync.SyncWorkflowDefinition;
2324
import com.uber.cadence.workflow.Functions.Func;
2425
import com.uber.cadence.workflow.Functions.Func1;
2526
import java.lang.reflect.Type;
2627
import java.time.Duration;
27-
import java.util.Map;
28-
import java.util.Optional;
29-
import java.util.Random;
30-
import java.util.UUID;
28+
import java.util.*;
3129
import java.util.function.BiPredicate;
3230
import java.util.function.Supplier;
3331

@@ -40,6 +38,14 @@ public WorkflowInterceptorBase(WorkflowInterceptor next) {
4038
this.next = next;
4139
}
4240

41+
// base interceptor executeWorkflow method: no-op interceptors delegate calls to next interceptors
42+
// used for users to create new interceptors easily
43+
@Override
44+
public byte[] executeWorkflow(
45+
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input) {
46+
return next.executeWorkflow(workflowDefinition, input);
47+
}
48+
4349
@Override
4450
public <R> Promise<R> executeActivity(
4551
String activityName,

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@
6262
import com.uber.cadence.converter.JsonDataConverter;
6363
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
6464
import com.uber.cadence.internal.sync.DeterministicRunnerTest;
65+
import com.uber.cadence.internal.sync.SyncWorkflowDefinition;
6566
import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal;
6667
import com.uber.cadence.internal.worker.PollerOptions;
68+
import com.uber.cadence.internal.worker.WorkflowExecutionException;
6769
import com.uber.cadence.serviceclient.ClientOptions;
6870
import com.uber.cadence.serviceclient.IWorkflowService;
6971
import com.uber.cadence.serviceclient.WorkflowServiceTChannel;
@@ -503,6 +505,7 @@ public void testSync() {
503505
String result = workflowStub.execute(taskList);
504506
assertEquals("activity10", result);
505507
tracer.setExpected(
508+
"executeWorkflow: TestWorkflow1::execute",
506509
"sleep PT2S",
507510
"executeActivity TestActivities::activityWithDelay",
508511
"executeActivity TestActivities::activity2");
@@ -1276,7 +1279,16 @@ public void testContinueAsNew() {
12761279
TestContinueAsNew.class, newWorkflowOptionsBuilder(this.taskList).build());
12771280
int result = client.execute(4, continuedTaskList);
12781281
assertEquals(111, result);
1279-
tracer.setExpected("continueAsNew", "continueAsNew", "continueAsNew", "continueAsNew");
1282+
tracer.setExpected(
1283+
"executeWorkflow: TestContinueAsNew::execute",
1284+
"continueAsNew",
1285+
"executeWorkflow: TestContinueAsNew::execute",
1286+
"continueAsNew",
1287+
"executeWorkflow: TestContinueAsNew::execute",
1288+
"continueAsNew",
1289+
"executeWorkflow: TestContinueAsNew::execute",
1290+
"continueAsNew",
1291+
"executeWorkflow: TestContinueAsNew::execute");
12801292
}
12811293

12821294
public static class TestAsyncActivityWorkflowImpl implements TestWorkflow1 {
@@ -2156,10 +2168,18 @@ public void testTimer() {
21562168
assertEquals("testTimer", result);
21572169
if (useExternalService) {
21582170
tracer.setExpected(
2159-
"registerQuery getTrace", "newTimer PT0.7S", "newTimer PT1.3S", "newTimer PT10S");
2171+
"executeWorkflow: testActivity",
2172+
"registerQuery getTrace",
2173+
"newTimer PT0.7S",
2174+
"newTimer PT1.3S",
2175+
"newTimer PT10S");
21602176
} else {
21612177
tracer.setExpected(
2162-
"registerQuery getTrace", "newTimer PT11M40S", "newTimer PT21M40S", "newTimer PT10H");
2178+
"executeWorkflow: testActivity",
2179+
"registerQuery getTrace",
2180+
"newTimer PT11M40S",
2181+
"newTimer PT21M40S",
2182+
"newTimer PT10H");
21632183
}
21642184
}
21652185

@@ -3325,7 +3345,9 @@ public void testSignalExternalWorkflow() {
33253345
workflowClient.newWorkflowStub(TestWorkflowSignaled.class, options.build());
33263346
assertEquals("Hello World!", client.execute());
33273347
tracer.setExpected(
3348+
"executeWorkflow: TestWorkflowSignaled::execute",
33283349
"executeChildWorkflow SignalingChild::execute",
3350+
"executeWorkflow: SignalingChild::execute",
33293351
"signalExternalWorkflow " + UUID_REGEXP + " testSignal");
33303352
}
33313353

@@ -4499,7 +4521,11 @@ public void testSideEffect() {
44994521
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
45004522
String result = workflowStub.execute(taskList);
45014523
assertEquals("activity1", result);
4502-
tracer.setExpected("sideEffect", "sleep PT1S", "executeActivity customActivity1");
4524+
tracer.setExpected(
4525+
"executeWorkflow: TestWorkflow1::execute",
4526+
"sideEffect",
4527+
"sleep PT1S",
4528+
"executeActivity customActivity1");
45034529
}
45044530

45054531
private static final Map<String, Queue<Long>> mutableSideEffectValue =
@@ -4595,6 +4621,7 @@ public void testGetVersion() {
45954621
String result = workflowStub.execute(taskList);
45964622
assertEquals("activity22activity1activity1activity1", result);
45974623
tracer.setExpected(
4624+
"executeWorkflow: TestWorkflow1::execute",
45984625
"getVersion",
45994626
"executeActivity TestActivities::activity2",
46004627
"getVersion",
@@ -4765,6 +4792,7 @@ public void testGetVersionRemovedInReplay() {
47654792
String result = workflowStub.execute(taskList);
47664793
assertEquals("activity22activity", result);
47674794
tracer.setExpected(
4795+
"executeWorkflow: TestWorkflowQuery::execute",
47684796
"registerQuery TestWorkflowQuery::query",
47694797
"getVersion",
47704798
"executeActivity TestActivities::activity2",
@@ -4809,6 +4837,7 @@ public void testGetVersionRemovedInReplay2() {
48094837
String result = workflowStub.execute(taskList);
48104838
assertEquals("activity", result);
48114839
tracer.setExpected(
4840+
"executeWorkflow: TestWorkflowQuery::execute",
48124841
"registerQuery TestWorkflowQuery::query",
48134842
"getVersion",
48144843
"getVersion",
@@ -4844,7 +4873,8 @@ public void testGetVersionRemovedInReplay3() {
48444873
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
48454874
String result = workflowStub.execute(taskList);
48464875
assertEquals("done", result);
4847-
tracer.setExpected("getVersion", "upsertSearchAttributes");
4876+
tracer.setExpected(
4877+
"executeWorkflow: TestWorkflow1::execute", "getVersion", "upsertSearchAttributes");
48484878
}
48494879

48504880
public static class TestVersionNotSupportedWorkflowImpl implements TestWorkflow1 {
@@ -5113,7 +5143,11 @@ public void testUUIDAndRandom() {
51135143
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
51145144
String result = workflowStub.execute(taskList);
51155145
assertEquals("foo10", result);
5116-
tracer.setExpected("sideEffect", "sideEffect", "executeActivity TestActivities::activity2");
5146+
tracer.setExpected(
5147+
"executeWorkflow: TestWorkflow1::execute",
5148+
"sideEffect",
5149+
"sideEffect",
5150+
"executeActivity TestActivities::activity2");
51175151
}
51185152

51195153
public interface GenericParametersActivity {
@@ -5941,10 +5975,13 @@ public void testSaga() {
59415975
TestSagaWorkflow.class, newWorkflowOptionsBuilder(taskList).build());
59425976
sagaWorkflow.execute(taskList, false);
59435977
tracer.setExpected(
5978+
"executeWorkflow: TestSagaWorkflow::execute",
59445979
"executeActivity customActivity1",
59455980
"executeChildWorkflow TestMultiargsWorkflowsFunc::func",
5981+
"executeWorkflow: TestMultiargsWorkflowsFunc::func",
59465982
"executeActivity TestActivities::throwIO",
59475983
"executeChildWorkflow TestCompensationWorkflow::compensate",
5984+
"executeWorkflow: TestCompensationWorkflow::compensate",
59485985
"executeActivity TestActivities::activity2");
59495986
}
59505987

@@ -6062,7 +6099,10 @@ public void testUpsertSearchAttributes() {
60626099
TestUpsertSearchAttributes.class, newWorkflowOptionsBuilder(taskList).build());
60636100
String result = testWorkflow.execute(taskList, "testKey");
60646101
assertEquals("done", result);
6065-
tracer.setExpected("upsertSearchAttributes", "executeActivity TestActivities::activity");
6102+
tracer.setExpected(
6103+
"executeWorkflow: TestUpsertSearchAttributes::execute",
6104+
"upsertSearchAttributes",
6105+
"executeActivity TestActivities::activity");
60666106
}
60676107

60686108
public static class TestMultiargsWorkflowsFuncChild implements TestMultiargsWorkflowsFunc2 {
@@ -6118,6 +6158,14 @@ private TracingWorkflowInterceptor(FilteredTrace trace, WorkflowInterceptor next
61186158
this.next = Objects.requireNonNull(next);
61196159
}
61206160

6161+
@Override
6162+
public byte[] executeWorkflow(
6163+
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input)
6164+
throws CancellationException, WorkflowExecutionException {
6165+
trace.add("executeWorkflow: " + input.getWorkflowType().getName());
6166+
return next.executeWorkflow(workflowDefinition, input);
6167+
}
6168+
61216169
@Override
61226170
public <R> Promise<R> executeActivity(
61236171
String activityName,

src/test/java/com/uber/cadence/workflow/interceptors/SignalWorkflowInterceptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.activity.ActivityOptions;
2222
import com.uber.cadence.activity.LocalActivityOptions;
23+
import com.uber.cadence.internal.sync.SyncWorkflowDefinition;
24+
import com.uber.cadence.internal.worker.WorkflowExecutionException;
2325
import com.uber.cadence.workflow.*;
2426
import java.lang.reflect.Type;
2527
import java.time.Duration;
@@ -28,6 +30,7 @@
2830
import java.util.Optional;
2931
import java.util.Random;
3032
import java.util.UUID;
33+
import java.util.concurrent.CancellationException;
3134
import java.util.function.BiPredicate;
3235
import java.util.function.Function;
3336
import java.util.function.Supplier;
@@ -47,6 +50,13 @@ public SignalWorkflowInterceptor(
4750
this.next = Objects.requireNonNull(next);
4851
}
4952

53+
@Override
54+
public byte[] executeWorkflow(
55+
SyncWorkflowDefinition workflowDefinition, WorkflowExecuteInput input)
56+
throws CancellationException, WorkflowExecutionException {
57+
return next.executeWorkflow(workflowDefinition, input);
58+
}
59+
5060
@Override
5161
public <R> Promise<R> executeActivity(
5262
String activityName,

0 commit comments

Comments
 (0)