Skip to content

Commit 1945a1a

Browse files
Adding ability to override activity options. (#772)
* Adding ability to override activity options.
1 parent cab0120 commit 1945a1a

File tree

9 files changed

+150
-10
lines changed

9 files changed

+150
-10
lines changed

src/main/java/com/uber/cadence/internal/sync/ActivityInvocationHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.uber.cadence.workflow.WorkflowInterceptor;
2525
import java.lang.reflect.InvocationHandler;
2626
import java.lang.reflect.Method;
27+
import java.util.Map;
2728
import java.util.function.Function;
2829

2930
class ActivityInvocationHandler extends ActivityInvocationHandlerBase {
@@ -43,9 +44,31 @@ private ActivityInvocationHandler(ActivityOptions options, WorkflowInterceptor a
4344
@Override
4445
protected Function<Object[], Object> getActivityFunc(
4546
Method method, MethodRetry methodRetry, ActivityMethod activityMethod, String activityName) {
47+
48+
ActivityOptions activityOptionsOverride = null;
49+
WorkflowThread workflowThread = WorkflowInternal.getRootWorkflowContext();
50+
if (workflowThread.getDecisionContext() != null
51+
&& workflowThread.getDecisionContext().getWorkflowImplementationOptions() != null) {
52+
Map<String, ActivityOptions> activityOptionsMap =
53+
workflowThread
54+
.getDecisionContext()
55+
.getWorkflowImplementationOptions()
56+
.getActivityOptions();
57+
58+
if (activityOptionsMap.containsKey(activityName)) {
59+
activityOptionsOverride = activityOptionsMap.get(activityName);
60+
}
61+
}
62+
4663
Function<Object[], Object> function;
4764
ActivityOptions mergedOptions = ActivityOptions.merge(activityMethod, methodRetry, options);
48-
ActivityStub stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor);
65+
ActivityStub stub;
66+
if (activityOptionsOverride == null) {
67+
stub = ActivityStubImpl.newInstance(mergedOptions, activityExecutor);
68+
} else {
69+
ActivityOptions mergedOverrideOptions = ActivityOptions.merge(activityMethod, methodRetry, activityOptionsOverride);
70+
stub = ActivityStubImpl.newInstance(mergedOverrideOptions, activityExecutor);
71+
}
4972

5073
function =
5174
(a) -> stub.execute(activityName, method.getReturnType(), method.getGenericReturnType(), a);

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,12 @@ public Thread newThread(Runnable r) {
187187

188188
private static SyncDecisionContext newDummySyncDecisionContext() {
189189
return new SyncDecisionContext(
190-
new DummyDecisionContext(), JsonDataConverter.getInstance(), null, (next) -> next, null);
190+
new DummyDecisionContext(),
191+
JsonDataConverter.getInstance(),
192+
null,
193+
(next) -> next,
194+
null,
195+
null);
191196
}
192197

193198
SyncDecisionContext getDecisionContext() {

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.uber.cadence.internal.replay.ExecuteLocalActivityParameters;
4141
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
4242
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
43+
import com.uber.cadence.worker.WorkflowImplementationOptions;
4344
import com.uber.cadence.workflow.ActivityException;
4445
import com.uber.cadence.workflow.ActivityFailureException;
4546
import com.uber.cadence.workflow.ActivityTimeoutException;
@@ -88,13 +89,15 @@ final class SyncDecisionContext implements WorkflowInterceptor {
8889
private final WorkflowTimers timers = new WorkflowTimers();
8990
private final Map<String, Functions.Func1<byte[], byte[]>> queryCallbacks = new HashMap<>();
9091
private final byte[] lastCompletionResult;
92+
private final WorkflowImplementationOptions workflowImplementationOptions;
9193

9294
public SyncDecisionContext(
9395
DecisionContext context,
9496
DataConverter converter,
9597
List<ContextPropagator> contextPropagators,
9698
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
97-
byte[] lastCompletionResult) {
99+
byte[] lastCompletionResult,
100+
WorkflowImplementationOptions workflowImplementationOptions) {
98101
this.context = context;
99102
this.converter = converter;
100103
this.contextPropagators = contextPropagators;
@@ -105,6 +108,7 @@ public SyncDecisionContext(
105108
}
106109
this.headInterceptor = interceptor;
107110
this.lastCompletionResult = lastCompletionResult;
111+
this.workflowImplementationOptions = workflowImplementationOptions;
108112
}
109113

110114
/**
@@ -764,4 +768,8 @@ public void upsertSearchAttributes(Map<String, Object> searchAttributes) {
764768
SearchAttributes attr = InternalUtils.convertMapToSearchAttributes(searchAttributes);
765769
context.upsertSearchAttributes(attr);
766770
}
771+
772+
public WorkflowImplementationOptions getWorkflowImplementationOptions() {
773+
return workflowImplementationOptions;
774+
}
767775
}

src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public void start(HistoryEvent event, DecisionContext context) {
9696
dataConverter,
9797
contextPropagators,
9898
interceptorFactory,
99-
event.getWorkflowExecutionStartedEventAttributes().getLastCompletionResult());
99+
event.getWorkflowExecutionStartedEventAttributes().getLastCompletionResult(),
100+
workflowImplementationOptions);
100101

101102
workflowProc =
102103
new WorkflowRunnable(

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,4 +390,8 @@ public static <R> R getLastCompletionResult(Class<R> resultClass, Type resultTyp
390390
public static void upsertSearchAttributes(Map<String, Object> searchAttributes) {
391391
getWorkflowInterceptor().upsertSearchAttributes(searchAttributes);
392392
}
393+
394+
public static WorkflowThread getRootWorkflowContext() {
395+
return DeterministicRunnerImpl.currentThreadInternal();
396+
}
393397
}

src/main/java/com/uber/cadence/worker/WorkflowImplementationOptions.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,17 @@
1919

2020
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.BlockWorkflow;
2121

22+
import com.uber.cadence.activity.ActivityOptions;
23+
import java.util.HashMap;
24+
import java.util.Map;
2225
import java.util.Objects;
2326

2427
public final class WorkflowImplementationOptions {
2528

2629
public static final class Builder {
2730

2831
private NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy = BlockWorkflow;
32+
private Map<String, ActivityOptions> activityOptionOverrides = new HashMap<>();
2933

3034
/**
3135
* Optional: Sets how decision worker deals with non-deterministic history events (presumably
@@ -39,27 +43,51 @@ public Builder setNonDeterministicWorkflowPolicy(
3943
return this;
4044
}
4145

46+
/**
47+
* Set overrides for a specific workflow implementation for activity options.
48+
*
49+
* @param activityOptionOverrides a map where the key is the activity name and the value is the activity
50+
* options that should override existing activity configuration that comes from @ActivityMethod annotation.
51+
*/
52+
public Builder setActivityOptionOverrides(Map<String, ActivityOptions> activityOptionOverrides) {
53+
this.activityOptionOverrides = activityOptionOverrides;
54+
return this;
55+
}
56+
4257
public WorkflowImplementationOptions build() {
43-
return new WorkflowImplementationOptions(nonDeterministicWorkflowPolicy);
58+
return new WorkflowImplementationOptions(nonDeterministicWorkflowPolicy, activityOptionOverrides);
4459
}
4560
}
4661

4762
private final NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy;
63+
private Map<String, ActivityOptions> activityOptions;
4864

4965
public WorkflowImplementationOptions(
5066
NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy) {
5167
this.nonDeterministicWorkflowPolicy = nonDeterministicWorkflowPolicy;
5268
}
5369

70+
public WorkflowImplementationOptions(
71+
NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy,
72+
Map<String, ActivityOptions> activityOptions) {
73+
this.nonDeterministicWorkflowPolicy = nonDeterministicWorkflowPolicy;
74+
this.activityOptions = activityOptions;
75+
}
76+
5477
public NonDeterministicWorkflowPolicy getNonDeterministicWorkflowPolicy() {
5578
return nonDeterministicWorkflowPolicy;
5679
}
5780

81+
public Map<String, ActivityOptions> getActivityOptions() {
82+
return activityOptions;
83+
}
84+
5885
@Override
5986
public String toString() {
6087
return "WorkflowImplementationOptions{"
6188
+ "nonDeterministicWorkflowPolicy="
6289
+ nonDeterministicWorkflowPolicy
90+
+ activityOptions
6391
+ '}';
6492
}
6593

@@ -68,11 +96,12 @@ public boolean equals(Object o) {
6896
if (this == o) return true;
6997
if (o == null || getClass() != o.getClass()) return false;
7098
WorkflowImplementationOptions that = (WorkflowImplementationOptions) o;
71-
return nonDeterministicWorkflowPolicy == that.nonDeterministicWorkflowPolicy;
99+
return nonDeterministicWorkflowPolicy == that.nonDeterministicWorkflowPolicy
100+
&& Objects.equals(activityOptions, that.activityOptions);
72101
}
73102

74103
@Override
75104
public int hashCode() {
76-
return Objects.hash(nonDeterministicWorkflowPolicy);
105+
return Objects.hash(nonDeterministicWorkflowPolicy, activityOptions);
77106
}
78107
}

src/test/java/com/uber/cadence/internal/sync/DeterministicRunnerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -727,7 +727,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
727727
new DeterministicRunnerImpl(
728728
threadPool,
729729
new SyncDecisionContext(
730-
decisionContext, JsonDataConverter.getInstance(), null, next -> next, null),
730+
decisionContext, JsonDataConverter.getInstance(), null, next -> next, null, null),
731731
() -> 0L, // clock override
732732
() -> {
733733
Promise<Void> thread =
@@ -752,7 +752,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
752752
new DeterministicRunnerImpl(
753753
threadPool,
754754
new SyncDecisionContext(
755-
decisionContext, JsonDataConverter.getInstance(), null, next -> next, null),
755+
decisionContext, JsonDataConverter.getInstance(), null, next -> next, null, null),
756756
() -> 0L, // clock override
757757
() -> {
758758
Promise<Void> thread =

src/test/java/com/uber/cadence/internal/sync/SyncDecisionContextTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class SyncDecisionContextTest {
3838
public void setUp() {
3939
this.context =
4040
new SyncDecisionContext(
41-
mockDecisionContext, JsonDataConverter.getInstance(), null, (next) -> next, null);
41+
mockDecisionContext, JsonDataConverter.getInstance(), null, (next) -> next, null, null);
4242
}
4343

4444
@Test

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,15 @@ private void startWorkerFor(Class<?>... workflowTypes) {
358358
}
359359
}
360360

361+
private void startWorkerFor(WorkflowImplementationOptions options, Class<?>... workflowTypes) {
362+
worker.registerWorkflowImplementationTypes(options, workflowTypes);
363+
if (useExternalService) {
364+
workerFactory.start();
365+
} else {
366+
testEnvironment.start();
367+
}
368+
}
369+
361370
// TODO: Refactor testEnvironment to support testing through real service to avoid this
362371
// conditional switches
363372
void registerDelayedCallback(Duration delay, Runnable r) {
@@ -460,6 +469,16 @@ public void signal1(String arg) {
460469
}
461470
}
462471

472+
public static class TestWorkflowActivityOptionOverride implements TestWorkflow1 {
473+
474+
@Override
475+
public String execute(String taskList) {
476+
TestActivities activities = Workflow.newActivityStub(TestActivities.class);
477+
478+
return activities.timeOutActivity();
479+
}
480+
}
481+
463482
public static class TestSyncWorkflowImpl implements TestWorkflow1 {
464483

465484
@Override
@@ -3971,6 +3990,9 @@ public interface TestActivities {
39713990

39723991
void neverComplete();
39733992

3993+
@ActivityMethod(startToCloseTimeoutSeconds = 1, scheduleToCloseTimeoutSeconds = 1)
3994+
String timeOutActivity();
3995+
39743996
@ActivityMethod(
39753997
scheduleToStartTimeoutSeconds = 5,
39763998
scheduleToCloseTimeoutSeconds = 5,
@@ -4051,6 +4073,7 @@ public String sleepActivity(long milliseconds, int input) {
40514073

40524074
@Override
40534075
public String activity() {
4076+
40544077
invocations.add("activity");
40554078
return "activity";
40564079
}
@@ -4184,6 +4207,17 @@ public void neverComplete() {
41844207
Activity.doNotCompleteOnReturn(); // Simulate activity timeout
41854208
}
41864209

4210+
@Override
4211+
public String timeOutActivity() {
4212+
try {
4213+
Thread.sleep(2000);
4214+
} catch (InterruptedException e) {
4215+
return "activityWasInterrupted";
4216+
}
4217+
4218+
return "activityCompletedSuccessfully";
4219+
}
4220+
41874221
@Override
41884222
public void throwIOAnnotated() {
41894223
invocations.add("throwIOAnnotated");
@@ -6280,4 +6314,40 @@ public void testGetVersionWithRetryReplay() throws Exception {
62806314
WorkflowReplayer.replayWorkflowExecutionFromResource(
62816315
"testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class);
62826316
}
6317+
6318+
@Test(expected = ExecutionException.class)
6319+
public void testWorkflowTimesOutWhenNoOverridesProvided() throws Exception {
6320+
startWorkerFor(TestWorkflowActivityOptionOverride.class);
6321+
6322+
TestWorkflow1 workflowStub =
6323+
workflowClient.newWorkflowStub(
6324+
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
6325+
6326+
CompletableFuture<String> result = WorkflowClient.execute(workflowStub::execute, taskList);
6327+
// Activity doesn't timeout because we overrode the timeouts to be longer.
6328+
assertEquals("activityCompletedSuccessfully", result.get());
6329+
}
6330+
6331+
@Test
6332+
public void testActivityDoesntTimeoutWhenTimeoutsAreLonger() throws Exception {
6333+
Map<String, ActivityOptions> activityOptionsMap = new HashMap<>();
6334+
activityOptionsMap.put(
6335+
"TestActivities::timeOutActivity",
6336+
new ActivityOptions.Builder()
6337+
.setScheduleToCloseTimeout(Duration.ofSeconds(3))
6338+
.setStartToCloseTimeout(Duration.ofSeconds(3))
6339+
.build());
6340+
6341+
startWorkerFor(
6342+
new WorkflowImplementationOptions.Builder().setActivityOptionOverrides(activityOptionsMap).build(),
6343+
TestWorkflowActivityOptionOverride.class);
6344+
6345+
TestWorkflow1 workflowStub =
6346+
workflowClient.newWorkflowStub(
6347+
TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build());
6348+
6349+
CompletableFuture<String> result = WorkflowClient.execute(workflowStub::execute, taskList);
6350+
// Activity doesn't timeout because we overrode the timeouts to be longer.
6351+
assertEquals("activityCompletedSuccessfully", result.get());
6352+
}
62836353
}

0 commit comments

Comments
 (0)