Skip to content

Commit dd22cb1

Browse files
authored
Added support for NonDeterministicWorkflowPolicy.FailWorkflow (#236)
* Added nonDeterministicWorkflowPolicy to WorkflowMethod * Added WorkflowImplementationOptions * Added support for NonDeterministicWorkflowPolicyFailWorkflow
1 parent 946a692 commit dd22cb1

File tree

11 files changed

+288
-12
lines changed

11 files changed

+288
-12
lines changed

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
21+
2022
import com.uber.cadence.Decision;
2123
import com.uber.cadence.EventType;
2224
import com.uber.cadence.GetWorkflowExecutionHistoryRequest;
@@ -246,7 +248,7 @@ private void eventLoop() {
246248
try {
247249
completed = workflow.eventLoop();
248250
} catch (Error e) {
249-
throw e; // errors fail decision, not a workflow
251+
throw e;
250252
} catch (WorkflowExecutionException e) {
251253
failure = e;
252254
completed = true;
@@ -418,8 +420,17 @@ private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc
418420
decisionsHelper.handleDecisionTaskStartedEvent(decision);
419421
}
420422
} catch (Error e) {
421-
metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);
422-
throw e;
423+
if (this.workflow.getWorkflowImplementationOptions().getNonDeterministicWorkflowPolicy()
424+
== FailWorkflow) {
425+
// fail workflow
426+
failure = workflow.mapError(e);
427+
completed = true;
428+
completeWorkflow();
429+
} else {
430+
metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);
431+
// fail decision, not a workflow
432+
throw e;
433+
}
423434
} finally {
424435
if (query != null) {
425436
query.apply();

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ private Result handleDecisionTaskImpl(PollForDecisionTaskResponse decisionTask)
109109
if (decisionTask.isSetQuery()) {
110110
return processQuery(decisionTask);
111111
} else {
112-
113112
return processDecision(decisionTask);
114113
}
115114
}

src/main/java/com/uber/cadence/internal/replay/ReplayWorkflow.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.uber.cadence.HistoryEvent;
2121
import com.uber.cadence.WorkflowQuery;
2222
import com.uber.cadence.internal.worker.WorkflowExecutionException;
23+
import com.uber.cadence.worker.WorkflowImplementationOptions;
2324

2425
public interface ReplayWorkflow {
2526

@@ -60,4 +61,8 @@ public interface ReplayWorkflow {
6061
* @return Serialized failure
6162
*/
6263
WorkflowExecutionException mapUnexpectedException(Exception failure);
64+
65+
WorkflowExecutionException mapError(Error failure);
66+
67+
WorkflowImplementationOptions getWorkflowImplementationOptions();
6368
}

src/main/java/com/uber/cadence/internal/sync/POJOWorkflowImplementationFactory.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow;
21+
2022
import com.google.common.reflect.TypeToken;
2123
import com.uber.cadence.WorkflowType;
2224
import com.uber.cadence.converter.DataConverter;
@@ -29,6 +31,7 @@
2931
import com.uber.cadence.internal.replay.ReplayWorkflowFactory;
3032
import com.uber.cadence.internal.worker.WorkflowExecutionException;
3133
import com.uber.cadence.testing.SimulatedTimeoutException;
34+
import com.uber.cadence.worker.WorkflowImplementationOptions;
3235
import com.uber.cadence.workflow.Functions;
3336
import com.uber.cadence.workflow.Functions.Func;
3437
import com.uber.cadence.workflow.QueryMethod;
@@ -62,6 +65,9 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
6265
private final Map<String, Functions.Func<SyncWorkflowDefinition>> workflowDefinitions =
6366
Collections.synchronizedMap(new HashMap<>());
6467

68+
private Map<String, WorkflowImplementationOptions> implementationOptions =
69+
Collections.synchronizedMap(new HashMap<>());
70+
6571
private final Map<Class<?>, Functions.Func<?>> workflowImplementationFactories =
6672
Collections.synchronizedMap(new HashMap<>());
6773

@@ -79,19 +85,25 @@ final class POJOWorkflowImplementationFactory implements ReplayWorkflowFactory {
7985
this.cache = cache;
8086
}
8187

82-
void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {
88+
void setWorkflowImplementationTypes(
89+
WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
8390
workflowDefinitions.clear();
8491
for (Class<?> type : workflowImplementationTypes) {
85-
addWorkflowImplementationType(type);
92+
addWorkflowImplementationType(options, type);
8693
}
8794
}
8895

8996
<R> void addWorkflowImplementationFactory(Class<R> clazz, Functions.Func<R> factory) {
9097
workflowImplementationFactories.put(clazz, factory);
91-
addWorkflowImplementationType(clazz);
98+
WorkflowImplementationOptions unitTestingOptions =
99+
new WorkflowImplementationOptions.Builder()
100+
.setNonDeterministicWorkflowPolicy(FailWorkflow)
101+
.build();
102+
addWorkflowImplementationType(unitTestingOptions, clazz);
92103
}
93104

94-
private void addWorkflowImplementationType(Class<?> workflowImplementationClass) {
105+
private void addWorkflowImplementationType(
106+
WorkflowImplementationOptions options, Class<?> workflowImplementationClass) {
95107
TypeToken<?>.TypeSet interfaces =
96108
TypeToken.of(workflowImplementationClass).getTypes().interfaces();
97109
if (interfaces.isEmpty()) {
@@ -129,6 +141,7 @@ private void addWorkflowImplementationType(Class<?> workflowImplementationClass)
129141
workflowName + " workflow type is already registered with the worker");
130142
}
131143
workflowDefinitions.put(workflowName, factory);
144+
implementationOptions.put(workflowName, options);
132145
hasWorkflowMethod = true;
133146
}
134147
if (signalMethod != null) {
@@ -183,7 +196,9 @@ public void setDataConverter(DataConverter dataConverter) {
183196
@Override
184197
public ReplayWorkflow getWorkflow(WorkflowType workflowType) {
185198
SyncWorkflowDefinition workflow = getWorkflowDefinition(workflowType);
186-
return new SyncWorkflow(workflow, dataConverter, threadPool, interceptorFactory, cache);
199+
WorkflowImplementationOptions options = implementationOptions.get(workflowType.getName());
200+
return new SyncWorkflow(
201+
workflow, options, dataConverter, threadPool, interceptorFactory, cache);
187202
}
188203

189204
@Override
@@ -238,6 +253,7 @@ public byte[] execute(byte[] input) throws CancellationException, WorkflowExecut
238253
+ context.getWorkflowType(),
239254
targetException);
240255
}
256+
// Cast to Exception is safe as Error is handled above.
241257
throw mapToWorkflowExecutionException((Exception) targetException, dataConverter);
242258
}
243259
}
@@ -335,6 +351,11 @@ static WorkflowExecutionException mapToWorkflowExecutionException(
335351
failure.getClass().getName(), dataConverter.toData(failure));
336352
}
337353

354+
static WorkflowExecutionException mapError(Error failure, DataConverter dataConverter) {
355+
return new WorkflowExecutionException(
356+
failure.getClass().getName(), dataConverter.toData(failure));
357+
}
358+
338359
@Override
339360
public String toString() {
340361
return "POJOWorkflowImplementationFactory{"

src/main/java/com/uber/cadence/internal/sync/SyncWorkflow.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.replay.DecisionContext;
2828
import com.uber.cadence.internal.replay.ReplayWorkflow;
2929
import com.uber.cadence.internal.worker.WorkflowExecutionException;
30+
import com.uber.cadence.worker.WorkflowImplementationOptions;
3031
import com.uber.cadence.workflow.WorkflowInterceptor;
3132
import java.util.Objects;
3233
import java.util.concurrent.ExecutorService;
@@ -41,24 +42,35 @@ class SyncWorkflow implements ReplayWorkflow {
4142
private final DataConverter dataConverter;
4243
private final ExecutorService threadPool;
4344
private final SyncWorkflowDefinition workflow;
45+
WorkflowImplementationOptions workflowImplementationOptions;
4446
private final Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory;
4547
private DeciderCache cache;
4648
private WorkflowRunnable workflowProc;
4749
private DeterministicRunner runner;
4850

4951
public SyncWorkflow(
5052
SyncWorkflowDefinition workflow,
53+
WorkflowImplementationOptions workflowImplementationOptions,
5154
DataConverter dataConverter,
5255
ExecutorService threadPool,
5356
Function<WorkflowInterceptor, WorkflowInterceptor> interceptorFactory,
5457
DeciderCache cache) {
5558
this.workflow = Objects.requireNonNull(workflow);
59+
this.workflowImplementationOptions =
60+
workflowImplementationOptions == null
61+
? new WorkflowImplementationOptions.Builder().build()
62+
: workflowImplementationOptions;
5663
this.dataConverter = Objects.requireNonNull(dataConverter);
5764
this.threadPool = Objects.requireNonNull(threadPool);
5865
this.interceptorFactory = Objects.requireNonNull(interceptorFactory);
5966
this.cache = cache;
6067
}
6168

69+
@Override
70+
public WorkflowImplementationOptions getWorkflowImplementationOptions() {
71+
return workflowImplementationOptions;
72+
}
73+
6274
@Override
6375
public void start(HistoryEvent event, DecisionContext context) {
6476
WorkflowType workflowType =
@@ -137,4 +149,9 @@ public WorkflowExecutionException mapUnexpectedException(Exception failure) {
137149
return POJOWorkflowImplementationFactory.mapToWorkflowExecutionException(
138150
failure, dataConverter);
139151
}
152+
153+
@Override
154+
public WorkflowExecutionException mapError(Error failure) {
155+
return POJOWorkflowImplementationFactory.mapError(failure, dataConverter);
156+
}
140157
}

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.worker.SingleWorkerOptions;
2828
import com.uber.cadence.internal.worker.WorkflowWorker;
2929
import com.uber.cadence.serviceclient.IWorkflowService;
30+
import com.uber.cadence.worker.WorkflowImplementationOptions;
3031
import com.uber.cadence.workflow.Functions.Func;
3132
import com.uber.cadence.workflow.WorkflowInterceptor;
3233
import java.lang.reflect.Type;
@@ -74,8 +75,9 @@ public SyncWorkflowWorker(
7475
worker = new WorkflowWorker(service, domain, taskList, this.options, taskHandler);
7576
}
7677

77-
public void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {
78-
factory.setWorkflowImplementationTypes(workflowImplementationTypes);
78+
public void setWorkflowImplementationTypes(
79+
WorkflowImplementationOptions options, Class<?>[] workflowImplementationTypes) {
80+
factory.setWorkflowImplementationTypes(options, workflowImplementationTypes);
7981
}
8082

8183
public <R> void addWorkflowImplementationFactory(Class<R> clazz, Func<R> factory) {
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.worker;
19+
20+
public enum NonDeterministicWorkflowPolicy {
21+
22+
/**
23+
* Fails decision tasks, blocking workflow progress until problem is fixed usually by rollback.
24+
*/
25+
BlockWorkflow,
26+
27+
/**
28+
* Fails a workflow instance on the first non deterministic error. Useful when workflow doesn't
29+
* have any important state (like cron) and is restarted automatically through {@link
30+
* com.uber.cadence.common.RetryOptions}.
31+
*/
32+
FailWorkflow,
33+
}

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,31 @@ public void registerWorkflowImplementationTypes(Class<?>... workflowImplementati
185185
!started.get(),
186186
"registerWorkflowImplementationTypes is not allowed after worker has started");
187187

188-
workflowWorker.setWorkflowImplementationTypes(workflowImplementationClasses);
188+
workflowWorker.setWorkflowImplementationTypes(
189+
new WorkflowImplementationOptions.Builder().build(), workflowImplementationClasses);
190+
}
191+
192+
/**
193+
* Register workflow implementation classes with a worker. Overwrites previously registered types.
194+
* A workflow implementation class must implement at least one interface with a method annotated
195+
* with {@link WorkflowMethod}. That method becomes a workflow type that this worker supports.
196+
*
197+
* <p>Implementations that share a worker must implement different interfaces as a workflow type
198+
* is identified by the workflow interface, not by the implementation.
199+
*
200+
* <p>The reason for registration accepting workflow class, but not the workflow instance is that
201+
* workflows are stateful and a new instance is created for each workflow execution.
202+
*/
203+
public void registerWorkflowImplementationTypes(
204+
WorkflowImplementationOptions options, Class<?>... workflowImplementationClasses) {
205+
Preconditions.checkState(
206+
workflowWorker != null,
207+
"registerWorkflowImplementationTypes is not allowed when disableWorkflowWorker is set in worker options");
208+
Preconditions.checkState(
209+
!started.get(),
210+
"registerWorkflowImplementationTypes is not allowed after worker has started");
211+
212+
workflowWorker.setWorkflowImplementationTypes(options, workflowImplementationClasses);
189213
}
190214

191215
/**
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.worker;
19+
20+
import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.BlockWorkflow;
21+
22+
import java.util.Objects;
23+
24+
public final class WorkflowImplementationOptions {
25+
26+
public static final class Builder {
27+
28+
private NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy = BlockWorkflow;
29+
30+
/**
31+
* Optional: Sets how decision worker deals with non-deterministic history events (presumably
32+
* arising from non-deterministic workflow definitions or non-backward compatible workflow
33+
* definition changes). default: BlockWorkflow, which just logs error but reply nothing back to
34+
* server.
35+
*/
36+
public Builder setNonDeterministicWorkflowPolicy(
37+
NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy) {
38+
this.nonDeterministicWorkflowPolicy = Objects.requireNonNull(nonDeterministicWorkflowPolicy);
39+
return this;
40+
}
41+
42+
public WorkflowImplementationOptions build() {
43+
return new WorkflowImplementationOptions(nonDeterministicWorkflowPolicy);
44+
}
45+
}
46+
47+
private final NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy;
48+
49+
public WorkflowImplementationOptions(
50+
NonDeterministicWorkflowPolicy nonDeterministicWorkflowPolicy) {
51+
this.nonDeterministicWorkflowPolicy = nonDeterministicWorkflowPolicy;
52+
}
53+
54+
public NonDeterministicWorkflowPolicy getNonDeterministicWorkflowPolicy() {
55+
return nonDeterministicWorkflowPolicy;
56+
}
57+
58+
@Override
59+
public String toString() {
60+
return "WorkflowImplementationOptions{"
61+
+ "nonDeterministicWorkflowPolicy="
62+
+ nonDeterministicWorkflowPolicy
63+
+ '}';
64+
}
65+
66+
@Override
67+
public boolean equals(Object o) {
68+
if (this == o) return true;
69+
if (o == null || getClass() != o.getClass()) return false;
70+
WorkflowImplementationOptions that = (WorkflowImplementationOptions) o;
71+
return nonDeterministicWorkflowPolicy == that.nonDeterministicWorkflowPolicy;
72+
}
73+
74+
@Override
75+
public int hashCode() {
76+
return Objects.hash(nonDeterministicWorkflowPolicy);
77+
}
78+
}

0 commit comments

Comments
 (0)