Skip to content

Commit eff4f13

Browse files
authored
Refactored decider to rely on initiatedEventId instead of string ids (#160)
* Refactored decider to rely on initiatedEventId instead of string ids * Added Workflow.sideEffect * Added Marker decision and event to TestService * Switched TestWorkflowService to use initiatedEventId for child workflows to deal with duplicated scheduling gracefully
1 parent 1c03f30 commit eff4f13

37 files changed

+1202
-520
lines changed

src/main/java/com/uber/cadence/internal/common/WorkflowExecutionUtils.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,4 +910,56 @@ private static void fixStackTrace(JsonElement json, String stackIndentation) {
910910
fixStackTrace(entry.getValue(), stackIndentation + INDENTATION);
911911
}
912912
}
913+
914+
/** Is this an event that was created to mirror a decision? */
915+
public static boolean isDecisionEvent(HistoryEvent event) {
916+
EventType eventType = event.getEventType();
917+
boolean result =
918+
((event != null)
919+
&& (eventType == EventType.ActivityTaskScheduled
920+
|| eventType == EventType.StartChildWorkflowExecutionInitiated
921+
|| eventType == EventType.TimerStarted
922+
|| eventType == EventType.WorkflowExecutionCompleted
923+
|| eventType == EventType.WorkflowExecutionFailed
924+
|| eventType == EventType.WorkflowExecutionCanceled
925+
|| eventType == EventType.WorkflowExecutionContinuedAsNew
926+
|| eventType == EventType.ActivityTaskCancelRequested
927+
|| eventType == EventType.RequestCancelActivityTaskFailed
928+
|| eventType == EventType.TimerCanceled
929+
|| eventType == EventType.CancelTimerFailed
930+
|| eventType == EventType.RequestCancelExternalWorkflowExecutionInitiated
931+
|| eventType == EventType.MarkerRecorded
932+
|| eventType == EventType.SignalExternalWorkflowExecutionInitiated));
933+
return result;
934+
}
935+
936+
public static EventType getEventTypeForDecision(DecisionType decisionType) {
937+
switch (decisionType) {
938+
case ScheduleActivityTask:
939+
return EventType.ActivityTaskScheduled;
940+
case RequestCancelActivityTask:
941+
return EventType.ActivityTaskCancelRequested;
942+
case StartTimer:
943+
return EventType.TimerStarted;
944+
case CompleteWorkflowExecution:
945+
return EventType.WorkflowExecutionCompleted;
946+
case FailWorkflowExecution:
947+
return EventType.WorkflowExecutionFailed;
948+
case CancelTimer:
949+
return EventType.TimerCanceled;
950+
case CancelWorkflowExecution:
951+
return EventType.WorkflowExecutionCanceled;
952+
case RequestCancelExternalWorkflowExecution:
953+
return EventType.ExternalWorkflowExecutionCancelRequested;
954+
case RecordMarker:
955+
return EventType.MarkerRecorded;
956+
case ContinueAsNewWorkflowExecution:
957+
return EventType.WorkflowExecutionContinuedAsNew;
958+
case StartChildWorkflowExecution:
959+
return EventType.StartChildWorkflowExecutionInitiated;
960+
case SignalExternalWorkflowExecution:
961+
return EventType.SignalExternalWorkflowExecutionInitiated;
962+
}
963+
throw new IllegalArgumentException("Unknown decisionType");
964+
}
913965
}

src/main/java/com/uber/cadence/internal/replay/ActivityDecisionContext.java

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import com.uber.cadence.ActivityTaskCanceledEventAttributes;
2121
import com.uber.cadence.ActivityTaskCompletedEventAttributes;
2222
import com.uber.cadence.ActivityTaskFailedEventAttributes;
23-
import com.uber.cadence.ActivityTaskStartedEventAttributes;
2423
import com.uber.cadence.ActivityTaskTimedOutEventAttributes;
2524
import com.uber.cadence.ActivityType;
2625
import com.uber.cadence.HistoryEvent;
@@ -37,30 +36,35 @@ final class ActivityDecisionContext {
3736

3837
private final class ActivityCancellationHandler implements Consumer<Exception> {
3938

39+
private final long scheduledEventId;
40+
4041
private final String activityId;
4142

4243
private final BiConsumer<byte[], Exception> callback;
4344

4445
private ActivityCancellationHandler(
45-
String activityId, BiConsumer<byte[], Exception> callaback) {
46+
long scheduledEventId, String activityId, BiConsumer<byte[], Exception> callaback) {
47+
this.scheduledEventId = scheduledEventId;
4648
this.activityId = activityId;
4749
this.callback = callaback;
4850
}
4951

5052
@Override
5153
public void accept(Exception cause) {
52-
if (!scheduledActivities.containsKey(activityId)) {
54+
if (!scheduledActivities.containsKey(scheduledEventId)) {
5355
// Cancellation handlers are not deregistered. So they fire after an activity completion.
5456
return;
5557
}
5658
decisions.requestCancelActivityTask(
57-
activityId,
59+
scheduledEventId,
5860
() -> {
5961
OpenRequestInfo<byte[], ActivityType> scheduled =
60-
scheduledActivities.remove(activityId);
62+
scheduledActivities.remove(scheduledEventId);
6163
if (scheduled == null) {
6264
throw new IllegalArgumentException(
63-
"Activity \"" + activityId + "\" wasn't scheduled");
65+
String.format(
66+
"Activity with activityId=%s and scheduledEventId=%d wasn't found",
67+
activityId, scheduledEventId));
6468
}
6569
callback.accept(null, new CancellationException("Cancelled by request"));
6670
});
@@ -69,7 +73,8 @@ public void accept(Exception cause) {
6973

7074
private final DecisionsHelper decisions;
7175

72-
private final Map<String, OpenRequestInfo<byte[], ActivityType>> scheduledActivities =
76+
// key is scheduledEventId
77+
private final Map<Long, OpenRequestInfo<byte[], ActivityType>> scheduledActivities =
7378
new HashMap<>();
7479

7580
ActivityDecisionContext(DecisionsHelper decisions) {
@@ -106,21 +111,19 @@ Consumer<Exception> scheduleActivityTask(
106111
tl.setName(taskList);
107112
attributes.setTaskList(tl);
108113
}
109-
decisions.scheduleActivityTask(attributes);
114+
long scheduledEventId = decisions.scheduleActivityTask(attributes);
110115
context.setCompletionHandle(callback);
111-
scheduledActivities.put(attributes.getActivityId(), context);
116+
scheduledActivities.put(scheduledEventId, context);
112117
return new ActivityDecisionContext.ActivityCancellationHandler(
113-
attributes.getActivityId(), callback);
118+
scheduledEventId, attributes.getActivityId(), callback);
114119
}
115120

116-
void handleActivityTaskStarted(ActivityTaskStartedEventAttributes attributes) {}
117-
118121
void handleActivityTaskCanceled(HistoryEvent event) {
119122
ActivityTaskCanceledEventAttributes attributes = event.getActivityTaskCanceledEventAttributes();
120-
String activityId = decisions.getActivityId(attributes);
121123
if (decisions.handleActivityTaskCanceled(event)) {
122124
CancellationException e = new CancellationException();
123-
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
125+
OpenRequestInfo<byte[], ActivityType> scheduled =
126+
scheduledActivities.remove(attributes.getScheduledEventId());
124127
if (scheduled != null) {
125128
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
126129
// It is OK to fail with subclass of CancellationException when cancellation requested.
@@ -134,9 +137,9 @@ void handleActivityTaskCanceled(HistoryEvent event) {
134137
void handleActivityTaskCompleted(HistoryEvent event) {
135138
ActivityTaskCompletedEventAttributes attributes =
136139
event.getActivityTaskCompletedEventAttributes();
137-
String activityId = decisions.getActivityId(attributes);
138-
if (decisions.handleActivityTaskClosed(activityId)) {
139-
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
140+
if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
141+
OpenRequestInfo<byte[], ActivityType> scheduled =
142+
scheduledActivities.remove(attributes.getScheduledEventId());
140143
if (scheduled != null) {
141144
byte[] result = attributes.getResult();
142145
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
@@ -147,15 +150,15 @@ void handleActivityTaskCompleted(HistoryEvent event) {
147150

148151
void handleActivityTaskFailed(HistoryEvent event) {
149152
ActivityTaskFailedEventAttributes attributes = event.getActivityTaskFailedEventAttributes();
150-
String activityId = decisions.getActivityId(attributes);
151-
if (decisions.handleActivityTaskClosed(activityId)) {
152-
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
153+
if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
154+
OpenRequestInfo<byte[], ActivityType> scheduled =
155+
scheduledActivities.remove(attributes.getScheduledEventId());
153156
if (scheduled != null) {
154157
String reason = attributes.getReason();
155158
byte[] details = attributes.getDetails();
156159
ActivityTaskFailedException failure =
157160
new ActivityTaskFailedException(
158-
event.getEventId(), scheduled.getUserContext(), activityId, reason, details);
161+
event.getEventId(), scheduled.getUserContext(), null, reason, details);
159162
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
160163
completionHandle.accept(null, failure);
161164
}
@@ -164,15 +167,15 @@ void handleActivityTaskFailed(HistoryEvent event) {
164167

165168
void handleActivityTaskTimedOut(HistoryEvent event) {
166169
ActivityTaskTimedOutEventAttributes attributes = event.getActivityTaskTimedOutEventAttributes();
167-
String activityId = decisions.getActivityId(attributes);
168-
if (decisions.handleActivityTaskClosed(activityId)) {
169-
OpenRequestInfo<byte[], ActivityType> scheduled = scheduledActivities.remove(activityId);
170+
if (decisions.handleActivityTaskClosed(attributes.getScheduledEventId())) {
171+
OpenRequestInfo<byte[], ActivityType> scheduled =
172+
scheduledActivities.remove(attributes.getScheduledEventId());
170173
if (scheduled != null) {
171174
TimeoutType timeoutType = attributes.getTimeoutType();
172175
byte[] details = attributes.getDetails();
173176
ActivityTaskTimeoutException failure =
174177
new ActivityTaskTimeoutException(
175-
event.getEventId(), scheduled.getUserContext(), activityId, timeoutType, details);
178+
event.getEventId(), scheduled.getUserContext(), null, timeoutType, details);
176179
BiConsumer<byte[], Exception> completionHandle = scheduled.getCompletionCallback();
177180
completionHandle.accept(null, failure);
178181
}

src/main/java/com/uber/cadence/internal/replay/ChildWorkflowDecisionStateMachine.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,15 @@ public void handleCancellationFailureEvent(HistoryEvent event) {
9696
}
9797

9898
@Override
99-
public void cancel(Runnable immediateCancellationCallback) {
99+
public boolean cancel(Runnable immediateCancellationCallback) {
100100
switch (state) {
101101
case STARTED:
102102
stateHistory.add("cancel");
103103
state = DecisionState.CANCELED_AFTER_STARTED;
104104
stateHistory.add(state.toString());
105-
break;
105+
return true;
106106
default:
107-
super.cancel(immediateCancellationCallback);
107+
return super.cancel(immediateCancellationCallback);
108108
}
109109
}
110110

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 57 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,52 @@
1818
package com.uber.cadence.internal.replay;
1919

2020
import com.uber.cadence.HistoryEvent;
21+
import com.uber.cadence.MarkerRecordedEventAttributes;
2122
import com.uber.cadence.StartTimerDecisionAttributes;
2223
import com.uber.cadence.TimerCanceledEventAttributes;
2324
import com.uber.cadence.TimerFiredEventAttributes;
25+
import com.uber.cadence.workflow.Functions.Func;
2426
import java.util.HashMap;
2527
import java.util.Map;
2628
import java.util.concurrent.CancellationException;
2729
import java.util.concurrent.TimeUnit;
2830
import java.util.function.BiConsumer;
2931
import java.util.function.Consumer;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3034

3135
/** Clock that must be used inside workflow definition code to ensure replay determinism. */
3236
final class ClockDecisionContext {
3337

38+
private static final String SIDE_EFFECT_MARKER_NAME = "SideEffect";
39+
40+
private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
41+
3442
private final class TimerCancellationHandler implements Consumer<Exception> {
3543

36-
private final String timerId;
44+
private final long startEventId;
3745

38-
TimerCancellationHandler(String timerId) {
39-
this.timerId = timerId;
46+
TimerCancellationHandler(long timerId) {
47+
this.startEventId = timerId;
4048
}
4149

4250
@Override
4351
public void accept(Exception reason) {
44-
decisions.cancelTimer(timerId, () -> timerCancelled(timerId, reason));
52+
decisions.cancelTimer(startEventId, () -> timerCancelled(startEventId, reason));
4553
}
4654
}
4755

4856
private final DecisionsHelper decisions;
4957

50-
private final Map<String, OpenRequestInfo<?, Long>> scheduledTimers = new HashMap<>();
58+
// key is startedEventId
59+
private final Map<Long, OpenRequestInfo<?, Long>> scheduledTimers = new HashMap<>();
5160

5261
private long replayCurrentTimeMilliseconds;
5362

5463
private boolean replaying = true;
5564

65+
private final Map<Long, byte[]> sideEffectResults = new HashMap<>();
66+
5667
ClockDecisionContext(DecisionsHelper decisions) {
5768
this.decisions = decisions;
5869
}
@@ -81,22 +92,21 @@ Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback)
8192
final OpenRequestInfo<?, Long> context = new OpenRequestInfo<>(firingTime);
8293
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes();
8394
timer.setStartToFireTimeoutSeconds(delaySeconds);
84-
final String timerId = decisions.getNextId();
85-
timer.setTimerId(timerId);
86-
decisions.startTimer(timer, null);
95+
timer.setTimerId(String.valueOf(decisions.getNextId()));
96+
long startEventId = decisions.startTimer(timer, null);
8797
context.setCompletionHandle((ctx, e) -> callback.accept(e));
88-
scheduledTimers.put(timerId, context);
89-
return new ClockDecisionContext.TimerCancellationHandler(timerId);
98+
scheduledTimers.put(startEventId, context);
99+
return new ClockDecisionContext.TimerCancellationHandler(startEventId);
90100
}
91101

92102
void setReplaying(boolean replaying) {
93103
this.replaying = replaying;
94104
}
95105

96106
void handleTimerFired(TimerFiredEventAttributes attributes) {
97-
String timerId = attributes.getTimerId();
98-
if (decisions.handleTimerClosed(timerId)) {
99-
OpenRequestInfo<?, Long> scheduled = scheduledTimers.remove(timerId);
107+
long startedEventId = attributes.getStartedEventId();
108+
if (decisions.handleTimerClosed(attributes)) {
109+
OpenRequestInfo<?, Long> scheduled = scheduledTimers.remove(startedEventId);
100110
if (scheduled != null) {
101111
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
102112
completionCallback.accept(null, null);
@@ -106,14 +116,14 @@ void handleTimerFired(TimerFiredEventAttributes attributes) {
106116

107117
void handleTimerCanceled(HistoryEvent event) {
108118
TimerCanceledEventAttributes attributes = event.getTimerCanceledEventAttributes();
109-
String timerId = attributes.getTimerId();
119+
long startedEventId = attributes.getStartedEventId();
110120
if (decisions.handleTimerCanceled(event)) {
111-
timerCancelled(timerId, null);
121+
timerCancelled(startedEventId, null);
112122
}
113123
}
114124

115-
private void timerCancelled(String timerId, Exception reason) {
116-
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
125+
private void timerCancelled(long startEventId, Exception reason) {
126+
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(startEventId);
117127
if (scheduled == null) {
118128
return;
119129
}
@@ -122,4 +132,34 @@ private void timerCancelled(String timerId, Exception reason) {
122132
exception.initCause(reason);
123133
context.accept(null, exception);
124134
}
135+
136+
byte[] sideEffect(Func<byte[]> func) {
137+
long sideEffectEventId = decisions.getNextDecisionEventId();
138+
byte[] result;
139+
if (replaying) {
140+
result = sideEffectResults.get(sideEffectEventId);
141+
if (result == null) {
142+
throw new Error("No cached result found for SideEffect EventID=" + sideEffectEventId);
143+
}
144+
} else {
145+
try {
146+
result = func.apply();
147+
} catch (Error e) {
148+
throw e;
149+
} catch (Exception e) {
150+
throw new Error("sideEffect function failed", e);
151+
}
152+
}
153+
decisions.recordMarker(SIDE_EFFECT_MARKER_NAME, result);
154+
return result;
155+
}
156+
157+
void handleMarkerRecorded(HistoryEvent event) {
158+
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
159+
if (SIDE_EFFECT_MARKER_NAME.equals(attributes.getMarkerName())) {
160+
sideEffectResults.put(event.getEventId(), attributes.getDetails());
161+
} else {
162+
log.warn("Unexpected marker: " + event);
163+
}
164+
}
125165
}

src/main/java/com/uber/cadence/internal/replay/CompleteWorkflowStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void handleInitiationFailedEvent(HistoryEvent event) {
4646
}
4747

4848
@Override
49-
public void cancel(Runnable immediateCancellationCallback) {
49+
public boolean cancel(Runnable immediateCancellationCallback) {
5050
throw new UnsupportedOperationException();
5151
}
5252

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.WorkflowType;
22+
import com.uber.cadence.workflow.Functions.Func;
2223
import com.uber.cadence.workflow.Promise;
2324
import com.uber.m3.tally.Scope;
2425
import java.time.Duration;
@@ -112,6 +113,20 @@ Consumer<Exception> signalWorkflowExecution(
112113
*/
113114
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
114115

116+
/**
117+
* Executes the provided function once, records its result into the workflow history. The recorded
118+
* result on history will be returned without executing the provided function during replay. This
119+
* guarantees the deterministic requirement for workflow as the exact same result will be returned
120+
* in replay. Common use case is to run some short non-deterministic code in workflow, like
121+
* getting random number or new UUID. The only way to fail SideEffect is to throw {@link Error}
122+
* which causes decision task failure. The decision task after timeout is rescheduled and
123+
* re-executed giving SideEffect another chance to succeed.
124+
*
125+
* @param func function that is called once to return a value.
126+
* @return value of the side effect.
127+
*/
128+
byte[] sideEffect(Func<byte[]> func);
129+
115130
/** @return scope to be used for metrics reporting. */
116131
Scope getMetricsScope();
117132

0 commit comments

Comments
 (0)