Skip to content

Commit 6dce3aa

Browse files
authored
Refactored timer handling in ClockDecisionContext (#131)
1 parent d713971 commit 6dce3aa

File tree

7 files changed

+83
-112
lines changed

7 files changed

+83
-112
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 24 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,12 @@
2323
import com.uber.cadence.TimerFiredEventAttributes;
2424
import java.util.HashMap;
2525
import java.util.Map;
26-
import java.util.SortedMap;
27-
import java.util.TreeMap;
2826
import java.util.concurrent.CancellationException;
2927
import java.util.concurrent.TimeUnit;
3028
import java.util.function.BiConsumer;
3129
import java.util.function.Consumer;
3230

33-
/**
34-
* Clock that must be used inside workflow definition code to ensure replay determinism. TODO:
35-
* Refactor to become a helper for managing timers instead of the generic clock class.
36-
*/
31+
/** Clock that must be used inside workflow definition code to ensure replay determinism. */
3732
final class ClockDecisionContext {
3833

3934
private final class TimerCancellationHandler implements Consumer<Exception> {
@@ -46,24 +41,14 @@ private final class TimerCancellationHandler implements Consumer<Exception> {
4641

4742
@Override
4843
public void accept(Exception reason) {
49-
decisions.cancelTimer(
50-
timerId,
51-
() -> {
52-
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
53-
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
54-
CancellationException exception = new CancellationException("Cancelled by request");
55-
exception.initCause(reason);
56-
context.accept(null, exception);
57-
});
44+
decisions.cancelTimer(timerId, () -> timerCancelled(timerId, reason));
5845
}
5946
}
6047

6148
private final DecisionsHelper decisions;
6249

6350
private final Map<String, OpenRequestInfo<?, Long>> scheduledTimers = new HashMap<>();
6451

65-
private final SortedMap<Long, String> timersByFiringTime = new TreeMap<>();
66-
6752
private long replayCurrentTimeMilliseconds;
6853

6954
private boolean replaying = true;
@@ -90,60 +75,18 @@ Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback)
9075
}
9176
if (delaySeconds == 0) {
9277
callback.accept(null);
93-
return Exception -> {};
78+
return null;
9479
}
9580
long firingTime = currentTimeMillis() + TimeUnit.SECONDS.toMillis(delaySeconds);
96-
// As the timer resolution is 1 second it doesn't really make sense to update a timer
97-
// that is less than one second before the already existing.
98-
if (timersByFiringTime.size() > 0) {
99-
long nextTimerFiringTime = timersByFiringTime.firstKey();
100-
if (firingTime > nextTimerFiringTime
101-
|| nextTimerFiringTime - firingTime < TimeUnit.SECONDS.toMillis(1)) {
102-
return null;
103-
}
104-
}
105-
Consumer<Exception> result = null;
106-
if (!timersByFiringTime.containsKey(firingTime)) {
107-
final OpenRequestInfo<?, Long> context = new OpenRequestInfo<>(firingTime);
108-
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes();
109-
timer.setStartToFireTimeoutSeconds(delaySeconds);
110-
final String timerId = decisions.getNextId();
111-
timer.setTimerId(timerId);
112-
decisions.startTimer(timer, null);
113-
context.setCompletionHandle((ctx, Exception) -> callback.accept(null));
114-
scheduledTimers.put(timerId, context);
115-
timersByFiringTime.put(firingTime, timerId);
116-
result = new ClockDecisionContext.TimerCancellationHandler(timerId);
117-
}
118-
SortedMap<Long, String> toCancel = timersByFiringTime.subMap(0L, firingTime);
119-
for (String timerId : toCancel.values()) {
120-
decisions.cancelTimer(
121-
timerId,
122-
() -> {
123-
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
124-
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
125-
CancellationException exception =
126-
new CancellationException("Cancelled as next unblock time changed");
127-
context.accept(null, exception);
128-
});
129-
}
130-
toCancel.clear();
131-
return result;
132-
}
133-
134-
void cancelAllTimers() {
135-
for (String timerId : timersByFiringTime.values()) {
136-
decisions.cancelTimer(
137-
timerId,
138-
() -> {
139-
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
140-
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
141-
CancellationException exception =
142-
new CancellationException("Cancelled as next unblock time changed");
143-
context.accept(null, exception);
144-
});
145-
}
146-
timersByFiringTime.clear();
81+
final OpenRequestInfo<?, Long> context = new OpenRequestInfo<>(firingTime);
82+
final StartTimerDecisionAttributes timer = new StartTimerDecisionAttributes();
83+
timer.setStartToFireTimeoutSeconds(delaySeconds);
84+
final String timerId = decisions.getNextId();
85+
timer.setTimerId(timerId);
86+
decisions.startTimer(timer, null);
87+
context.setCompletionHandle((ctx, e) -> callback.accept(e));
88+
scheduledTimers.put(timerId, context);
89+
return new ClockDecisionContext.TimerCancellationHandler(timerId);
14790
}
14891

14992
void setReplaying(boolean replaying) {
@@ -157,8 +100,6 @@ void handleTimerFired(TimerFiredEventAttributes attributes) {
157100
if (scheduled != null) {
158101
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
159102
completionCallback.accept(null, null);
160-
long firingTime = scheduled.getUserContext();
161-
timersByFiringTime.remove(firingTime);
162103
}
163104
}
164105
}
@@ -167,12 +108,18 @@ void handleTimerCanceled(HistoryEvent event) {
167108
TimerCanceledEventAttributes attributes = event.getTimerCanceledEventAttributes();
168109
String timerId = attributes.getTimerId();
169110
if (decisions.handleTimerCanceled(event)) {
170-
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
171-
if (scheduled != null) {
172-
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
173-
CancellationException exception = new CancellationException("Cancelled by request");
174-
completionCallback.accept(null, exception);
175-
}
111+
timerCancelled(timerId, null);
112+
}
113+
}
114+
115+
private void timerCancelled(String timerId, Exception reason) {
116+
OpenRequestInfo<?, ?> scheduled = scheduledTimers.remove(timerId);
117+
if (scheduled == null) {
118+
return;
176119
}
120+
BiConsumer<?, Exception> context = scheduled.getCompletionCallback();
121+
CancellationException exception = new CancellationException("Cancelled by request");
122+
exception.initCause(reason);
123+
context.accept(null, exception);
177124
}
178125
}

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,4 @@ Consumer<Exception> signalWorkflowExecution(
116116
* @return cancellation handle. Invoke {@link Consumer#accept(Object)} to cancel timer.
117117
*/
118118
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
119-
120-
void cancelAllTimers();
121119
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,11 +163,6 @@ public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> ca
163163
return workflowClock.createTimer(delaySeconds, callback);
164164
}
165165

166-
@Override
167-
public void cancelAllTimers() {
168-
workflowClock.cancelAllTimers();
169-
}
170-
171166
@Override
172167
public long currentTimeMillis() {
173168
return workflowClock.currentTimeMillis();

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,10 @@ void startTimer(StartTimerDecisionAttributes request, Object createTimerUserCont
256256

257257
boolean cancelTimer(String timerId, Runnable immediateCancellationCallback) {
258258
DecisionStateMachine decision = getDecision(new DecisionId(DecisionTarget.TIMER, timerId));
259+
if (decision.isDone()) {
260+
// Cancellation callbacks are not deregistered and might be invoked after timer firing
261+
return true;
262+
}
259263
decision.cancel(immediateCancellationCallback);
260264
return decision.isDone();
261265
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.List;
3333
import java.util.concurrent.CancellationException;
3434
import java.util.concurrent.atomic.AtomicReference;
35+
import java.util.function.Consumer;
3536
import org.slf4j.Logger;
3637
import org.slf4j.LoggerFactory;
3738

@@ -59,6 +60,9 @@ class ReplayDecider {
5960

6061
private WorkflowExecutionException failure;
6162

63+
private long wakeUpTime;
64+
private Consumer<Exception> timerCancellationHandler;
65+
6266
ReplayDecider(
6367
String domain,
6468
ReplayWorkflow workflow,
@@ -229,33 +233,58 @@ private void eventLoop() {
229233
}
230234
}
231235

232-
private void completeWorkflow() {
236+
private void mayBeCompleteWorkflow() {
233237
if (completed) {
234-
if (failure != null) {
235-
decisionsHelper.failWorkflowExecution(failure);
236-
} else if (cancelRequested) {
237-
decisionsHelper.cancelWorkflowExecution();
238+
completeWorkflow();
239+
} else {
240+
updateTimers();
241+
}
242+
}
243+
244+
private void completeWorkflow() {
245+
if (failure != null) {
246+
decisionsHelper.failWorkflowExecution(failure);
247+
} else if (cancelRequested) {
248+
decisionsHelper.cancelWorkflowExecution();
249+
} else {
250+
ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion =
251+
context.getContinueAsNewOnCompletion();
252+
if (continueAsNewOnCompletion != null) {
253+
decisionsHelper.continueAsNewWorkflowExecution(continueAsNewOnCompletion);
238254
} else {
239-
ContinueAsNewWorkflowExecutionParameters continueAsNewOnCompletion =
240-
context.getContinueAsNewOnCompletion();
241-
if (continueAsNewOnCompletion != null) {
242-
decisionsHelper.continueAsNewWorkflowExecution(continueAsNewOnCompletion);
243-
} else {
244-
byte[] workflowOutput = workflow.getOutput();
245-
decisionsHelper.completeWorkflowExecution(workflowOutput);
246-
}
255+
byte[] workflowOutput = workflow.getOutput();
256+
decisionsHelper.completeWorkflowExecution(workflowOutput);
247257
}
248-
} else {
249-
long nextWakeUpTime = workflow.getNextWakeUpTime();
250-
if (nextWakeUpTime == 0) { // No time based waiting
251-
context.cancelAllTimers();
258+
}
259+
}
260+
261+
private void updateTimers() {
262+
long nextWakeUpTime = workflow.getNextWakeUpTime();
263+
if (nextWakeUpTime == 0) {
264+
if (timerCancellationHandler != null) {
265+
timerCancellationHandler.accept(null);
266+
timerCancellationHandler = null;
252267
}
253-
long delayMilliseconds = nextWakeUpTime - context.currentTimeMillis();
254-
if (nextWakeUpTime > context.currentTimeMillis()) {
255-
// Round up to the nearest second as we don't want to deliver a timer
256-
// earlier than requested.
257-
long delaySeconds =
258-
InternalUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
268+
wakeUpTime = nextWakeUpTime;
269+
return;
270+
}
271+
if (wakeUpTime == nextWakeUpTime && timerCancellationHandler != null) {
272+
return; // existing timer
273+
}
274+
long delayMilliseconds = nextWakeUpTime - context.currentTimeMillis();
275+
if (delayMilliseconds < 0) {
276+
throw new IllegalStateException("Negative delayMilliseconds=" + delayMilliseconds);
277+
}
278+
// Round up to the nearest second as we don't want to deliver a timer
279+
// earlier than requested.
280+
long delaySeconds =
281+
InternalUtils.roundUpToSeconds(Duration.ofMillis(delayMilliseconds)).getSeconds();
282+
if (timerCancellationHandler != null) {
283+
timerCancellationHandler.accept(null);
284+
timerCancellationHandler = null;
285+
}
286+
wakeUpTime = nextWakeUpTime;
287+
timerCancellationHandler =
259288
context.createTimer(
260289
delaySeconds,
261290
(t) -> {
@@ -264,8 +293,6 @@ private void completeWorkflow() {
264293
// But no specific timer related action is necessary as Workflow.sleep is just a
265294
// Workflow.await with a time based condition.
266295
});
267-
}
268-
}
269296
}
270297

271298
private void handleDecisionTaskStarted(HistoryEvent event) {}
@@ -359,7 +386,7 @@ private void decideImpl(Functions.Proc query) throws Throwable {
359386
processEvent(event, eventType);
360387
}
361388
eventLoop();
362-
completeWorkflow();
389+
mayBeCompleteWorkflow();
363390
} while (eventsIterator.hasNext());
364391
} finally {
365392
if (query != null) {

src/main/java/com/uber/cadence/internal/testservice/RequestContext.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import java.util.List;
2929
import java.util.Objects;
3030
import java.util.function.LongSupplier;
31-
import org.slf4j.Logger;
32-
import org.slf4j.LoggerFactory;
3331

3432
final class RequestContext {
3533

src/test/java/com/uber/cadence/internal/testing/WorkflowTestingTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,8 @@ public String workflow1(String input) {
485485
try {
486486
activity.activity1("input");
487487
Workflow.sleep(Duration.ofDays(3));
488+
} catch (CancellationException e) {
489+
return "cancelled";
488490
} finally {
489491
s.get();
490492
}

0 commit comments

Comments
 (0)