17
17
18
18
package com .uber .cadence .internal .replay ;
19
19
20
- import com .uber .cadence .EventType ;
21
20
import com .uber .cadence .HistoryEvent ;
22
21
import com .uber .cadence .MarkerRecordedEventAttributes ;
23
22
import com .uber .cadence .StartTimerDecisionAttributes ;
24
23
import com .uber .cadence .TimerCanceledEventAttributes ;
25
24
import com .uber .cadence .TimerFiredEventAttributes ;
26
- import com .uber .cadence .internal .replay .DecisionContext .MutableSideEffectData ;
25
+ import com .uber .cadence .converter .DataConverter ;
26
+ import com .uber .cadence .internal .replay .MarkerHandler .MarkerData ;
27
+ import com .uber .cadence .internal .sync .WorkflowInternal ;
27
28
import com .uber .cadence .workflow .Functions .Func ;
28
29
import com .uber .cadence .workflow .Functions .Func1 ;
29
30
import java .util .HashMap ;
33
34
import java .util .concurrent .TimeUnit ;
34
35
import java .util .function .BiConsumer ;
35
36
import java .util .function .Consumer ;
37
+ import java .util .function .Predicate ;
36
38
import org .slf4j .Logger ;
37
39
import org .slf4j .LoggerFactory ;
38
40
@@ -41,6 +43,7 @@ final class ClockDecisionContext {
41
43
42
44
private static final String SIDE_EFFECT_MARKER_NAME = "SideEffect" ;
43
45
private static final String MUTABLE_SIDE_EFFECT_MARKER_NAME = "MutableSideEffect" ;
46
+ public static final String VERSION_MARKER_NAME = "Version" ;
44
47
45
48
private static final Logger log = LoggerFactory .getLogger (ReplayDecider .class );
46
49
@@ -58,31 +61,6 @@ public void accept(Exception reason) {
58
61
}
59
62
}
60
63
61
- private static final class MutableSideEffectResult {
62
-
63
- private final byte [] data ;
64
-
65
- /**
66
- * Count of how many times the mutableSideEffect was called since the last marker recorded. It
67
- * is used to ensure that an updated value is returned after the same exact number of times
68
- * during a replay.
69
- */
70
- private int accessCount ;
71
-
72
- private MutableSideEffectResult (byte [] data ) {
73
- this .data = data ;
74
- }
75
-
76
- public byte [] getData () {
77
- accessCount ++;
78
- return data ;
79
- }
80
-
81
- public int getAccessCount () {
82
- return accessCount ;
83
- }
84
- }
85
-
86
64
private final DecisionsHelper decisions ;
87
65
88
66
// key is startedEventId
@@ -94,11 +72,15 @@ public int getAccessCount() {
94
72
95
73
// Key is side effect marker eventId
96
74
private final Map <Long , byte []> sideEffectResults = new HashMap <>();
97
- // Key is mutableSideEffect id
98
- private final Map <String , MutableSideEffectResult > mutableSideEffectResults = new HashMap <>();
75
+
76
+ private final MarkerHandler mutableSideEffectHandler ;
77
+ private final MarkerHandler versionHandler ;
99
78
100
79
ClockDecisionContext (DecisionsHelper decisions ) {
101
80
this .decisions = decisions ;
81
+ mutableSideEffectHandler =
82
+ new MarkerHandler (decisions , MUTABLE_SIDE_EFFECT_MARKER_NAME , () -> replaying );
83
+ versionHandler = new MarkerHandler (decisions , VERSION_MARKER_NAME , () -> replaying );
102
84
}
103
85
104
86
long currentTimeMillis () {
@@ -125,8 +107,7 @@ Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback)
125
107
final OpenRequestInfo <?, Long > context = new OpenRequestInfo <>(firingTime );
126
108
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes ();
127
109
timer .setStartToFireTimeoutSeconds (delaySeconds );
128
- timer .setTimerId (String .valueOf (decisions .getNextId ()));
129
- long startEventId = decisions .startTimer (timer , null );
110
+ long startEventId = decisions .startTimer (timer );
130
111
context .setCompletionHandle ((ctx , e ) -> callback .accept (e ));
131
112
scheduledTimers .put (startEventId , context );
132
113
return new TimerCancellationHandler (startEventId );
@@ -167,6 +148,7 @@ private void timerCancelled(long startEventId, Exception reason) {
167
148
}
168
149
169
150
byte [] sideEffect (Func <byte []> func ) {
151
+ decisions .addAllMissingVersionMarker (false , Optional .empty ());
170
152
long sideEffectEventId = decisions .getNextDecisionEventId ();
171
153
byte [] result ;
172
154
if (replaying ) {
@@ -194,81 +176,54 @@ byte[] sideEffect(Func<byte[]> func) {
194
176
* @return the latest value returned by func
195
177
*/
196
178
Optional <byte []> mutableSideEffect (
197
- String id ,
198
- Func1 <MutableSideEffectData , byte []> markerDataSerializer ,
199
- Func1 <byte [], MutableSideEffectData > markerDataDeserializer ,
200
- Func1 <Optional <byte []>, Optional <byte []>> func ) {
201
- MutableSideEffectResult result = mutableSideEffectResults .get (id );
202
- Optional <byte []> stored ;
203
- if (result == null ) {
204
- stored = Optional .empty ();
205
- } else {
206
- stored = Optional .of (result .getData ());
207
- }
208
- long eventId = decisions .getNextDecisionEventId ();
209
- int accessCount = result == null ? 0 : result .getAccessCount ();
210
- if (replaying ) {
211
-
212
- Optional <byte []> data =
213
- getSideEffectDataFromHistory (eventId , id , accessCount , markerDataDeserializer );
214
- if (data .isPresent ()) {
215
- // Need to insert marker to ensure that eventId is incremented
216
- recordMutableSideEffectMarker (id , eventId , data .get (), accessCount , markerDataSerializer );
217
- return data ;
218
- }
219
- return stored ;
220
- }
221
- Optional <byte []> toStore = func .apply (stored );
222
- if (toStore .isPresent ()) {
223
- byte [] data = toStore .get ();
224
- recordMutableSideEffectMarker (id , eventId , data , accessCount , markerDataSerializer );
225
- return toStore ;
226
- }
227
- return stored ;
228
- }
229
-
230
- private Optional <byte []> getSideEffectDataFromHistory (
231
- long eventId ,
232
- String mutableSideEffectId ,
233
- int expectedAcccessCount ,
234
- Func1 <byte [], MutableSideEffectData > markerDataDeserializer ) {
235
- HistoryEvent event = decisions .getDecisionEvent (eventId );
236
- if (event .getEventType () != EventType .MarkerRecorded ) {
237
- return Optional .empty ();
238
- }
239
- MarkerRecordedEventAttributes attributes = event .getMarkerRecordedEventAttributes ();
240
- String name = attributes .getMarkerName ();
241
- if (!MUTABLE_SIDE_EFFECT_MARKER_NAME .equals (name )) {
242
- return Optional .empty ();
243
- }
244
- MutableSideEffectData markerData = markerDataDeserializer .apply (attributes .getDetails ());
245
- // access count is used to not return data from the marker before the recorded number of calls
246
- if (!mutableSideEffectId .equals (markerData .getId ())
247
- || markerData .getAccessCount () > expectedAcccessCount ) {
248
- return Optional .empty ();
249
- }
250
- return Optional .of (markerData .getData ());
251
- }
252
-
253
- private void recordMutableSideEffectMarker (
254
- String id ,
255
- long eventId ,
256
- byte [] data ,
257
- int accessCount ,
258
- Func1 <MutableSideEffectData , byte []> markerDataSerializer ) {
259
- MutableSideEffectData dataObject = new MutableSideEffectData (id , eventId , data , accessCount );
260
- byte [] details = markerDataSerializer .apply (dataObject );
261
- mutableSideEffectResults .put (id , new MutableSideEffectResult (data ));
262
- decisions .recordMarker (MUTABLE_SIDE_EFFECT_MARKER_NAME , details );
179
+ String id , DataConverter converter , Func1 <Optional <byte []>, Optional <byte []>> func ) {
180
+ decisions .addAllMissingVersionMarker (false , Optional .empty ());
181
+ return mutableSideEffectHandler .handle (id , converter , func );
263
182
}
264
183
265
184
void handleMarkerRecorded (HistoryEvent event ) {
266
185
MarkerRecordedEventAttributes attributes = event .getMarkerRecordedEventAttributes ();
267
186
String name = attributes .getMarkerName ();
268
187
if (SIDE_EFFECT_MARKER_NAME .equals (name )) {
269
188
sideEffectResults .put (event .getEventId (), attributes .getDetails ());
270
- } else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME .equals (name )) {
189
+ } else if (!MUTABLE_SIDE_EFFECT_MARKER_NAME .equals (name ) && ! VERSION_MARKER_NAME . equals ( name ) ) {
271
190
log .warn ("Unexpected marker: " + event );
272
191
}
273
192
}
193
+
194
+ int getVersion (String changeId , DataConverter converter , int minSupported , int maxSupported ) {
195
+ Predicate <byte []> changeIdEquals =
196
+ (bytesInEvent ) -> {
197
+ MarkerData markerData = converter .fromData (bytesInEvent , MarkerData .class );
198
+ return markerData .getId ().equals (changeId );
199
+ };
200
+ decisions .addAllMissingVersionMarker (true , Optional .of (changeIdEquals ));
201
+
202
+ Optional <byte []> result =
203
+ versionHandler .handle (
204
+ changeId ,
205
+ converter ,
206
+ (stored ) -> {
207
+ if (stored .isPresent ()) {
208
+ return Optional .empty ();
209
+ }
210
+ return Optional .of (converter .toData (maxSupported ));
211
+ });
212
+
213
+ if (!result .isPresent ()) {
214
+ return WorkflowInternal .DEFAULT_VERSION ;
215
+ }
216
+ int version = converter .fromData (result .get (), Integer .class );
217
+ validateVersion (changeId , version , minSupported , maxSupported );
218
+ return version ;
219
+ }
220
+
221
+ private void validateVersion (String changeID , int version , int minSupported , int maxSupported ) {
222
+ if (version < minSupported || version > maxSupported ) {
223
+ throw new Error (
224
+ String .format (
225
+ "Version %d of changeID %s is not supported. Supported version is between %d and %d." ,
226
+ version , changeID , minSupported , maxSupported ));
227
+ }
228
+ }
274
229
}
0 commit comments