Skip to content

Commit 089ae4a

Browse files
authored
Added Workflow.mutableSideEffect (#161)
1 parent b29f54f commit 089ae4a

17 files changed

+370
-11
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 112 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20+
import com.uber.cadence.EventType;
2021
import com.uber.cadence.HistoryEvent;
2122
import com.uber.cadence.MarkerRecordedEventAttributes;
2223
import com.uber.cadence.StartTimerDecisionAttributes;
2324
import com.uber.cadence.TimerCanceledEventAttributes;
2425
import com.uber.cadence.TimerFiredEventAttributes;
26+
import com.uber.cadence.internal.replay.DecisionContext.MutableSideEffectData;
2527
import com.uber.cadence.workflow.Functions.Func;
28+
import com.uber.cadence.workflow.Functions.Func1;
2629
import java.util.HashMap;
2730
import java.util.Map;
31+
import java.util.Optional;
2832
import java.util.concurrent.CancellationException;
2933
import java.util.concurrent.TimeUnit;
3034
import java.util.function.BiConsumer;
@@ -36,6 +40,7 @@
3640
final class ClockDecisionContext {
3741

3842
private static final String SIDE_EFFECT_MARKER_NAME = "SideEffect";
43+
private static final String MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect";
3944

4045
private static final Logger log = LoggerFactory.getLogger(ReplayDecider.class);
4146

@@ -53,6 +58,31 @@ public void accept(Exception reason) {
5358
}
5459
}
5560

61+
private static final class MutableSideEffectResult {
62+
63+
private final byte[] data;
64+
65+
/**
66+
* Count of how many times the mutableSideEffect was called since the last marker recorded. It
67+
* is used to ensure that an updated value is returned after the same exact number of times
68+
* during a replay.
69+
*/
70+
private int accessCount;
71+
72+
private MutableSideEffectResult(byte[] data) {
73+
this.data = data;
74+
}
75+
76+
public byte[] getData() {
77+
accessCount++;
78+
return data;
79+
}
80+
81+
public int getAccessCount() {
82+
return accessCount;
83+
}
84+
}
85+
5686
private final DecisionsHelper decisions;
5787

5888
// key is startedEventId
@@ -62,7 +92,10 @@ public void accept(Exception reason) {
6292

6393
private boolean replaying = true;
6494

95+
// Key is side effect marker eventId
6596
private final Map<Long, byte[]> sideEffectResults = new HashMap<>();
97+
// Key is mutableSideEffect id
98+
private final Map<String, MutableSideEffectResult> mutableSideEffectResults = new HashMap<>();
6699

67100
ClockDecisionContext(DecisionsHelper decisions) {
68101
this.decisions = decisions;
@@ -96,7 +129,7 @@ Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback)
96129
long startEventId = decisions.startTimer(timer, null);
97130
context.setCompletionHandle((ctx, e) -> callback.accept(e));
98131
scheduledTimers.put(startEventId, context);
99-
return new ClockDecisionContext.TimerCancellationHandler(startEventId);
132+
return new TimerCancellationHandler(startEventId);
100133
}
101134

102135
void setReplaying(boolean replaying) {
@@ -154,11 +187,87 @@ byte[] sideEffect(Func<byte[]> func) {
154187
return result;
155188
}
156189

190+
/**
191+
* @param id mutable side effect id
192+
* @param func given the value from the last marker returns value to store. If result is empty
193+
* nothing is recorded into the history.
194+
* @return the latest value returned by func
195+
*/
196+
Optional<byte[]> mutableSideEffect(
197+
String id,
198+
Func1<MutableSideEffectData, byte[]> markerDataSerializer,
199+
Func1<byte[], MutableSideEffectData> markerDataDeserializer,
200+
Func1<Optional<byte[]>, Optional<byte[]>> func) {
201+
MutableSideEffectResult result = mutableSideEffectResults.get(id);
202+
Optional<byte[]> stored;
203+
if (result == null) {
204+
stored = Optional.empty();
205+
} else {
206+
stored = Optional.of(result.getData());
207+
}
208+
long eventId = decisions.getNextDecisionEventId();
209+
int accessCount = result == null ? 0 : result.getAccessCount();
210+
if (replaying) {
211+
212+
Optional<byte[]> data =
213+
getSideEffectDataFromHistory(eventId, id, accessCount, markerDataDeserializer);
214+
if (data.isPresent()) {
215+
// Need to insert marker to ensure that eventId is incremented
216+
recordMutableSideEffectMarker(id, eventId, data.get(), accessCount, markerDataSerializer);
217+
return data;
218+
}
219+
return stored;
220+
}
221+
Optional<byte[]> toStore = func.apply(stored);
222+
if (toStore.isPresent()) {
223+
byte[] data = toStore.get();
224+
recordMutableSideEffectMarker(id, eventId, data, accessCount, markerDataSerializer);
225+
return toStore;
226+
}
227+
return stored;
228+
}
229+
230+
private Optional<byte[]> getSideEffectDataFromHistory(
231+
long eventId,
232+
String mutableSideEffectId,
233+
int expectedAcccessCount,
234+
Func1<byte[], MutableSideEffectData> markerDataDeserializer) {
235+
HistoryEvent event = decisions.getDecisionEvent(eventId);
236+
if (event.getEventType() != EventType.MarkerRecorded) {
237+
return Optional.empty();
238+
}
239+
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
240+
String name = attributes.getMarkerName();
241+
if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name)) {
242+
return Optional.empty();
243+
}
244+
MutableSideEffectData markerData = markerDataDeserializer.apply(attributes.getDetails());
245+
// access count is used to not return data from the marker before the recorded number of calls
246+
if (!mutableSideEffectId.equals(markerData.getId())
247+
|| markerData.getAccessCount() > expectedAcccessCount) {
248+
return Optional.empty();
249+
}
250+
return Optional.of(markerData.getData());
251+
}
252+
253+
private void recordMutableSideEffectMarker(
254+
String id,
255+
long eventId,
256+
byte[] data,
257+
int accessCount,
258+
Func1<MutableSideEffectData, byte[]> markerDataSerializer) {
259+
MutableSideEffectData dataObject = new MutableSideEffectData(id, eventId, data, accessCount);
260+
byte[] details = markerDataSerializer.apply(dataObject);
261+
mutableSideEffectResults.put(id, new MutableSideEffectResult(data));
262+
decisions.recordMarker(MUTABLE_SIDE_EFFECT_MARKER_NAME, details);
263+
}
264+
157265
void handleMarkerRecorded(HistoryEvent event) {
158266
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
159-
if (SIDE_EFFECT_MARKER_NAME.equals(attributes.getMarkerName())) {
267+
String name = attributes.getMarkerName();
268+
if (SIDE_EFFECT_MARKER_NAME.equals(name)) {
160269
sideEffectResults.put(event.getEventId(), attributes.getDetails());
161-
} else {
270+
} else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME.equals(name)) {
162271
log.warn("Unexpected marker: " + event);
163272
}
164273
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import com.uber.cadence.WorkflowExecution;
2121
import com.uber.cadence.WorkflowType;
2222
import com.uber.cadence.workflow.Functions.Func;
23+
import com.uber.cadence.workflow.Functions.Func1;
2324
import com.uber.cadence.workflow.Promise;
2425
import com.uber.m3.tally.Scope;
2526
import java.time.Duration;
27+
import java.util.Optional;
2628
import java.util.function.BiConsumer;
2729
import java.util.function.Consumer;
2830

@@ -32,6 +34,37 @@
3234
*/
3335
public interface DecisionContext extends ReplayAware {
3436

37+
final class MutableSideEffectData {
38+
39+
private final String id;
40+
private final long eventId;
41+
private final byte[] data;
42+
private final int accessCount;
43+
44+
public MutableSideEffectData(String id, long eventId, byte[] data, int accessCount) {
45+
this.id = id;
46+
this.eventId = eventId;
47+
this.data = data;
48+
this.accessCount = accessCount;
49+
}
50+
51+
public String getId() {
52+
return id;
53+
}
54+
55+
public long getEventId() {
56+
return eventId;
57+
}
58+
59+
public byte[] getData() {
60+
return data;
61+
}
62+
63+
public int getAccessCount() {
64+
return accessCount;
65+
}
66+
}
67+
3568
WorkflowExecution getWorkflowExecution();
3669

3770
// TODO: Add to Cadence
@@ -97,6 +130,12 @@ Consumer<Exception> signalWorkflowExecution(
97130
/** Deterministic unique Id generator */
98131
String generateUniqueId();
99132

133+
Optional<byte[]> mutableSideEffect(
134+
String id,
135+
Func1<MutableSideEffectData, byte[]> markerDataSerializer,
136+
Func1<byte[], MutableSideEffectData> markerDataDeserializer,
137+
Func1<Optional<byte[]>, Optional<byte[]>> func);
138+
100139
/**
101140
* @return time of the {@link com.uber.cadence.PollForDecisionTaskResponse} start event of the
102141
* decision being processed or replayed.

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
import com.uber.cadence.WorkflowType;
2626
import com.uber.cadence.internal.metrics.ReplayAwareScope;
2727
import com.uber.cadence.workflow.Functions.Func;
28+
import com.uber.cadence.workflow.Functions.Func1;
2829
import com.uber.cadence.workflow.Promise;
2930
import com.uber.cadence.workflow.Workflow;
3031
import com.uber.m3.tally.Scope;
3132
import java.time.Duration;
33+
import java.util.Optional;
3234
import java.util.function.BiConsumer;
3335
import java.util.function.Consumer;
3436

@@ -197,6 +199,15 @@ public byte[] sideEffect(Func<byte[]> func) {
197199
return workflowClock.sideEffect(func);
198200
}
199201

202+
@Override
203+
public Optional<byte[]> mutableSideEffect(
204+
String id,
205+
Func1<MutableSideEffectData, byte[]> markerDataSerializer,
206+
Func1<byte[], MutableSideEffectData> markerDataDeserializer,
207+
Func1<Optional<byte[]>, Optional<byte[]>> func) {
208+
return workflowClock.mutableSideEffect(id, markerDataSerializer, markerDataDeserializer, func);
209+
}
210+
200211
@Override
201212
public long currentTimeMillis() {
202213
return workflowClock.currentTimeMillis();

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
5252
import com.uber.cadence.WorkflowType;
5353
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
54+
import com.uber.cadence.internal.replay.HistoryHelper.DecisionEvents;
5455
import com.uber.cadence.internal.worker.WorkflowExecutionException;
5556
import java.util.ArrayList;
5657
import java.util.Arrays;
@@ -86,6 +87,8 @@ class DecisionsHelper {
8687
*/
8788
private long nextDecisionEventId;
8889

90+
private DecisionEvents decisionEvents;
91+
8992
/** Use access-order to ensure that decisions are emitted in order of their creation */
9093
private final Map<DecisionId, DecisionStateMachine> decisions =
9194
new LinkedHashMap<>(100, 0.75f, true);
@@ -514,8 +517,9 @@ private boolean isCompletionEvent(Decision decision) {
514517
}
515518
}
516519

517-
public void handleDecisionTaskStartedEvent(long nextDecisionEventId) {
518-
this.nextDecisionEventId = nextDecisionEventId;
520+
public void handleDecisionTaskStartedEvent(DecisionEvents decision) {
521+
this.decisionEvents = decision;
522+
this.nextDecisionEventId = decision.getNextDecisionEventId();
519523
}
520524

521525
void notifyDecisionSent() {
@@ -599,4 +603,8 @@ String getNextId() {
599603
}
600604
return String.valueOf(nextDecisionEventId);
601605
}
606+
607+
public HistoryEvent getDecisionEvent(long eventId) {
608+
return decisionEvents.getDecisionEvent(eventId);
609+
}
602610
}

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,14 @@ public List<HistoryEvent> getDecisionEvents() {
8282
return decisionEvents;
8383
}
8484

85+
public HistoryEvent getDecisionEvent(long eventId) {
86+
int index = (int) (eventId - nextDecisionEventId);
87+
if (index < 0 || index > decisionEvents.size()) {
88+
throw new IllegalArgumentException("No decision event found at eventId=" + eventId);
89+
}
90+
return decisionEvents.get(index);
91+
}
92+
8593
public List<HistoryEvent> getMarkers() {
8694
return markers;
8795
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ private void decideImpl(Functions.Proc query) throws Throwable {
363363
context.setReplaying(decision.isReplay());
364364
context.setReplayCurrentTimeMilliseconds(decision.getReplayCurrentTimeMilliseconds());
365365

366-
decisionsHelper.handleDecisionTaskStartedEvent(decision.getNextDecisionEventId());
366+
decisionsHelper.handleDecisionTaskStartedEvent(decision);
367367
// Markers must be cached first as their data is needed when processing events.
368368
for (HistoryEvent event : decision.getMarkers()) {
369369
processEvent(event);

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.uber.cadence.internal.replay.SignalExternalWorkflowParameters;
2828
import com.uber.cadence.internal.replay.StartChildWorkflowExecutionParameters;
2929
import com.uber.cadence.workflow.Functions.Func;
30+
import com.uber.cadence.workflow.Functions.Func1;
3031
import com.uber.cadence.workflow.Promise;
3132
import com.uber.m3.tally.Scope;
3233
import java.time.Duration;
@@ -504,6 +505,15 @@ public String generateUniqueId() {
504505
throw new UnsupportedOperationException("not implemented");
505506
}
506507

508+
@Override
509+
public Optional<byte[]> mutableSideEffect(
510+
String id,
511+
Func1<MutableSideEffectData, byte[]> markerDataSerializer,
512+
Func1<byte[], MutableSideEffectData> markerDataDeserializer,
513+
Func1<Optional<byte[]>, Optional<byte[]>> func) {
514+
throw new UnsupportedOperationException("not implemented");
515+
}
516+
507517
@Override
508518
public long currentTimeMillis() {
509519
throw new UnsupportedOperationException("not implemented");

0 commit comments

Comments
 (0)