Skip to content

Commit 4e820ba

Browse files
authored
Reset Replay Decider State (#211)
1 parent fee3c65 commit 4e820ba

File tree

9 files changed

+449
-87
lines changed

9 files changed

+449
-87
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void accept(Exception reason) {
6666
// key is startedEventId
6767
private final Map<Long, OpenRequestInfo<?, Long>> scheduledTimers = new HashMap<>();
6868

69-
private long replayCurrentTimeMilliseconds;
69+
private long replayCurrentTimeMilliseconds = -1;
7070

7171
private boolean replaying = true;
7272

src/main/java/com/uber/cadence/internal/replay/DeciderCache.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import com.uber.m3.tally.Scope;
2929
import java.util.Objects;
3030
import java.util.UUID;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
public final class DeciderCache {
3335
private final String evictionEntryId = UUID.randomUUID().toString();
3436
private final int maxCacheSize;
3537
private final Scope metricsScope;
3638
private LoadingCache<String, WeightedCacheEntry<Decider>> cache;
39+
private static final Logger log = LoggerFactory.getLogger(DeciderCache.class);
3740

3841
public DeciderCache(int maxCacheSize, Scope scope) {
3942
Preconditions.checkArgument(maxCacheSize > 0, "Max cache size must be greater than 0");
@@ -91,8 +94,7 @@ public void evictNext() {
9194
int remainingSpace = (int) (maxCacheSize - cache.size());
9295
// Force eviction to happen
9396
cache.put(evictionEntryId, new WeightedCacheEntry<>(null, remainingSpace + 1));
94-
cache.invalidate(evictionEntryId);
95-
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
97+
invalidate(evictionEntryId);
9698
}
9799

98100
public void invalidate(PollForDecisionTaskResponse decisionTask) {
@@ -101,6 +103,7 @@ public void invalidate(PollForDecisionTaskResponse decisionTask) {
101103
}
102104

103105
public void invalidate(String runId) {
106+
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
104107
cache.invalidate(runId);
105108
}
106109

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ void setReplayCurrentTimeMilliseconds(long replayCurrentTimeMilliseconds) {
192192
workflowClock.setReplayCurrentTimeMilliseconds(replayCurrentTimeMilliseconds);
193193
}
194194

195+
long getReplayCurrentTimeMilliseconds() {
196+
return workflowClock.currentTimeMillis();
197+
}
198+
195199
@Override
196200
public boolean isReplaying() {
197201
return workflowClock.isReplaying();

src/main/java/com/uber/cadence/internal/replay/DecisionStateMachineBase.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,13 @@
2020
import com.uber.cadence.HistoryEvent;
2121
import java.util.ArrayList;
2222
import java.util.List;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
2325

2426
abstract class DecisionStateMachineBase implements DecisionStateMachine {
2527

28+
private static final Logger log = LoggerFactory.getLogger(DecisionStateMachineBase.class);
29+
2630
protected DecisionState state = DecisionState.CREATED;
2731

2832
protected List<String> stateHistory = new ArrayList<String>();

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,9 +181,13 @@ public void remove() {
181181
static class DecisionEventsIterator implements Iterator<DecisionEvents> {
182182

183183
private EventsIterator events;
184+
private long replayCurrentTimeMilliseconds;
184185

185-
DecisionEventsIterator(DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator) {
186+
DecisionEventsIterator(
187+
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator,
188+
long replayCurrentTimeMilliseconds) {
186189
this.events = new EventsIterator(decisionTaskWithHistoryIterator.getHistory());
190+
this.replayCurrentTimeMilliseconds = replayCurrentTimeMilliseconds;
187191
}
188192

189193
@Override
@@ -196,12 +200,17 @@ public DecisionEvents next() {
196200
List<HistoryEvent> decisionEvents = new ArrayList<>();
197201
List<HistoryEvent> newEvents = new ArrayList<>();
198202
boolean replay = true;
199-
long replayCurrentTimeMilliseconds = -1;
200203
long nextDecisionEventId = -1;
201204
while (events.hasNext()) {
202205
HistoryEvent event = events.next();
203206
EventType eventType = event.getEventType();
204-
// hasNext is for queries that do not have DecisionTaskStarted at the end of the history.
207+
208+
// Sticky workers receive an event history that starts with DecisionTaskCompleted
209+
if (eventType == EventType.DecisionTaskCompleted && nextDecisionEventId == -1) {
210+
nextDecisionEventId = event.getEventId() + 1;
211+
break;
212+
}
213+
205214
if (eventType == EventType.DecisionTaskStarted || !events.hasNext()) {
206215
replayCurrentTimeMilliseconds = TimeUnit.NANOSECONDS.toMillis(event.getTimestamp());
207216
if (!events.hasNext()) {
@@ -216,10 +225,14 @@ public DecisionEvents next() {
216225
continue;
217226
} else if (peekedType == EventType.DecisionTaskCompleted) {
218227
events.next(); // consume DecisionTaskCompleted
219-
nextDecisionEventId = peeked.getEventId() + 1; // +1 for next
228+
nextDecisionEventId = peeked.getEventId() + 1; // +1 for next and skip over completed
220229
break;
221230
} else {
222-
throw new Error("Unexpected event after DecisionTaskStarted: " + peeked);
231+
throw new Error(
232+
"Unexpected event after DecisionTaskStarted: "
233+
+ peeked
234+
+ " DecisionTaskStarted Event: "
235+
+ event);
223236
}
224237
}
225238
newEvents.add(event);
@@ -245,9 +258,9 @@ public DecisionEvents next() {
245258
private final DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator;
246259
private final DecisionEventsIterator iterator;
247260

248-
HistoryHelper(DecisionTaskWithHistoryIterator decisionTasks) {
261+
HistoryHelper(DecisionTaskWithHistoryIterator decisionTasks, long replayCurrentTimeMilliseconds) {
249262
this.decisionTaskWithHistoryIterator = decisionTasks;
250-
this.iterator = new DecisionEventsIterator(decisionTasks);
263+
this.iterator = new DecisionEventsIterator(decisionTasks, replayCurrentTimeMilliseconds);
251264
}
252265

253266
public DecisionEventsIterator getIterator() {

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,9 @@ private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc
374374
DecisionTaskWithHistoryIterator decisionTaskWithHistoryIterator =
375375
new DecisionTaskWithHistoryIteratorImpl(
376376
decisionTask, Duration.ofSeconds(startedEvent.getTaskStartToCloseTimeoutSeconds()));
377-
HistoryHelper historyHelper = new HistoryHelper(decisionTaskWithHistoryIterator);
377+
HistoryHelper historyHelper =
378+
new HistoryHelper(
379+
decisionTaskWithHistoryIterator, context.getReplayCurrentTimeMilliseconds());
378380
DecisionEventsIterator iterator = historyHelper.getIterator();
379381
if ((decisionsHelper.getNextDecisionEventId()
380382
!= historyHelper.getPreviousStartedEventId()
@@ -412,6 +414,8 @@ private void decideImpl(PollForDecisionTaskResponse decisionTask, Functions.Proc
412414
for (HistoryEvent event : decision.getDecisionEvents()) {
413415
processEvent(event);
414416
}
417+
// Reset state to before running the event loop
418+
decisionsHelper.handleDecisionTaskStartedEvent(decision);
415419
}
416420
} catch (Error e) {
417421
metricsScope.counter(MetricsType.DECISION_TASK_ERROR_COUNTER).inc(1);

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,27 @@ public SyncWorkflowWorker(
5454
Duration stickyDecisionScheduleToStartTimeout,
5555
ThreadPoolExecutor workflowThreadPool) {
5656
Objects.requireNonNull(workflowThreadPool);
57+
this.options = options;
5758

5859
factory =
5960
new POJOWorkflowImplementationFactory(
6061
options.getDataConverter(),
6162
workflowThreadPool,
6263
interceptorFactory,
63-
options.getMetricsScope(),
64+
this.options.getMetricsScope(),
6465
cache);
6566

6667
DecisionTaskHandler taskHandler =
6768
new ReplayDecisionTaskHandler(
6869
domain,
6970
factory,
7071
cache,
71-
options,
72+
this.options,
7273
stickyTaskListName,
7374
stickyDecisionScheduleToStartTimeout,
7475
service);
7576

76-
worker = new WorkflowWorker(service, domain, taskList, options, taskHandler);
77-
this.options = options;
77+
worker = new WorkflowWorker(service, domain, taskList, this.options, taskHandler);
7878
}
7979

8080
public void setWorkflowImplementationTypes(Class<?>[] workflowImplementationTypes) {

src/main/java/com/uber/cadence/worker/Worker.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -402,28 +402,25 @@ public Factory(IWorkflowService workflowService, String domain, FactoryOptions f
402402
return;
403403
}
404404

405-
factoryOptions.metricsScope.tagged(
406-
new ImmutableMap.Builder<String, String>(2)
407-
.put(MetricsTag.DOMAIN, domain)
408-
.put(MetricsTag.TASK_LIST, getStickyTaskListName())
409-
.build());
405+
Scope metricsScope =
406+
factoryOptions.metricsScope.tagged(
407+
new ImmutableMap.Builder<String, String>(2)
408+
.put(MetricsTag.DOMAIN, domain)
409+
.put(MetricsTag.TASK_LIST, getHostName())
410+
.build());
410411

411-
this.cache = new DeciderCache(factoryOptions.cacheMaximumSize, factoryOptions.metricsScope);
412+
this.cache = new DeciderCache(factoryOptions.cacheMaximumSize, metricsScope);
412413

413414
dispatcher = new PollDecisionTaskDispatcherFactory(workflowService).create();
414415
stickyPoller =
415416
new Poller<>(
416417
id.toString(),
417418
new WorkflowPollTaskFactory(
418-
workflowService,
419-
domain,
420-
getStickyTaskListName(),
421-
factoryOptions.metricsScope,
422-
id.toString())
419+
workflowService, domain, getStickyTaskListName(), metricsScope, id.toString())
423420
.get(),
424421
dispatcher,
425422
factoryOptions.stickyWorkflowPollerOptions,
426-
factoryOptions.metricsScope);
423+
metricsScope);
427424
}
428425

429426
public Worker newWorker(String taskList) {
@@ -500,7 +497,8 @@ DeciderCache getCache() {
500497
return this.cache;
501498
}
502499

503-
private String getHostName() {
500+
@VisibleForTesting
501+
String getHostName() {
504502
try {
505503
return InetAddress.getLocalHost().getHostName();
506504
} catch (UnknownHostException e) {
@@ -514,18 +512,6 @@ private String getStickyTaskListName() {
514512
: null;
515513
}
516514

517-
private SingleWorkerOptions getDefaultSingleWorkerOptions() {
518-
return Worker.toWorkflowOptions(new Builder().build(), domain, getStickyTaskListName());
519-
}
520-
521-
private PollerOptions getDefaultPollerOptions(SingleWorkerOptions options) {
522-
PollerOptions pollerOptions = options.getPollerOptions();
523-
if (pollerOptions.getPollThreadNamePrefix() == null) {
524-
pollerOptions = new PollerOptions.Builder(pollerOptions).build();
525-
}
526-
return pollerOptions;
527-
}
528-
529515
enum State {
530516
Initial,
531517
Started,

0 commit comments

Comments
 (0)