Skip to content

Commit f47812c

Browse files
Liang Meimfateev
authored andcommitted
Timer firing fix (#329)
Service doesn't guarantee that timestamps assigned to events are always increasing and that if timer for 10 seconds fired the timestamp of its DecisionTaskStarted is 10 seconds after the timestamp of DecisionTaskStarted when timer was started. This change ensures that Workflow.currentTimeMillis fixes these problems.
1 parent 0d12425 commit f47812c

File tree

4 files changed

+316
-4
lines changed

4 files changed

+316
-4
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,10 @@ private long replayTimeUpdatedAtMillis() {
114114
}
115115

116116
void setReplayCurrentTimeMilliseconds(long replayCurrentTimeMilliseconds) {
117-
this.replayCurrentTimeMilliseconds = replayCurrentTimeMilliseconds;
118-
this.replayTimeUpdatedAtMillis = System.currentTimeMillis();
117+
if (this.replayCurrentTimeMilliseconds < replayCurrentTimeMilliseconds) {
118+
this.replayCurrentTimeMilliseconds = replayCurrentTimeMilliseconds;
119+
this.replayTimeUpdatedAtMillis = System.currentTimeMillis();
120+
}
119121
}
120122

121123
boolean isReplaying() {
@@ -150,6 +152,12 @@ void handleTimerFired(TimerFiredEventAttributes attributes) {
150152
if (decisions.handleTimerClosed(attributes)) {
151153
OpenRequestInfo<?, Long> scheduled = scheduledTimers.remove(startedEventId);
152154
if (scheduled != null) {
155+
// Server doesn't guarantee that the timer fire timestamp is larger or equal of the
156+
// expected fire time. So fix the time or timer firing will be ignored.
157+
long firingTime = scheduled.getUserContext();
158+
if (replayCurrentTimeMilliseconds < firingTime) {
159+
setReplayCurrentTimeMilliseconds(firingTime);
160+
}
153161
BiConsumer<?, Exception> completionCallback = scheduled.getCompletionCallback();
154162
completionCallback.accept(null, null);
155163
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,15 @@
1717

1818
package com.uber.cadence.internal.replay;
1919

20-
import com.uber.cadence.*;
20+
import com.uber.cadence.ChildPolicy;
21+
import com.uber.cadence.DecisionTaskFailedCause;
22+
import com.uber.cadence.DecisionTaskFailedEventAttributes;
23+
import com.uber.cadence.HistoryEvent;
24+
import com.uber.cadence.PollForDecisionTaskResponse;
25+
import com.uber.cadence.TimerFiredEventAttributes;
26+
import com.uber.cadence.WorkflowExecution;
27+
import com.uber.cadence.WorkflowExecutionStartedEventAttributes;
28+
import com.uber.cadence.WorkflowType;
2129
import com.uber.cadence.converter.DataConverter;
2230
import com.uber.cadence.internal.metrics.ReplayAwareScope;
2331
import com.uber.cadence.internal.worker.LocalActivityWorker;
@@ -34,9 +42,13 @@
3442
import java.util.function.BiConsumer;
3543
import java.util.function.BiFunction;
3644
import java.util.function.Consumer;
45+
import org.slf4j.Logger;
46+
import org.slf4j.LoggerFactory;
3747

3848
final class DecisionContextImpl implements DecisionContext, HistoryEventHandler {
3949

50+
private static final Logger log = LoggerFactory.getLogger(DecisionContextImpl.class);
51+
4052
private final ActivityDecisionContext activityClient;
4153
private final WorkflowDecisionContext workflowClient;
4254
private final ClockDecisionContext workflowClock;
@@ -203,7 +215,15 @@ public void continueAsNewOnCompletion(ContinueAsNewWorkflowExecutionParameters p
203215

204216
void setReplayCurrentTimeMilliseconds(long replayCurrentTimeMilliseconds) {
205217
if (replayCurrentTimeMilliseconds < workflowClock.currentTimeMillis()) {
206-
throw new IllegalArgumentException("workflow clock moved back");
218+
if (log.isWarnEnabled()) {
219+
log.warn(
220+
"Trying to set workflow clock back from "
221+
+ workflowClock.currentTimeMillis()
222+
+ " to "
223+
+ replayCurrentTimeMilliseconds
224+
+ ". This will be a no-op.");
225+
}
226+
return;
207227
}
208228
workflowClock.setReplayCurrentTimeMilliseconds(replayCurrentTimeMilliseconds);
209229
}

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4703,6 +4703,65 @@ public void testWorkflowReset() throws Exception {
47034703
"resetWorkflowHistory.json", TestWorkflowResetReplayWorkflow.class);
47044704
}
47054705

4706+
public interface GreetingWorkflow {
4707+
4708+
@WorkflowMethod
4709+
void createGreeting(String name);
4710+
}
4711+
4712+
public interface GreetingActivities {
4713+
@ActivityMethod(scheduleToCloseTimeoutSeconds = 60)
4714+
String composeGreeting(String string);
4715+
}
4716+
4717+
static class GreetingActivitiesImpl implements GreetingActivities {
4718+
@Override
4719+
public String composeGreeting(String string) {
4720+
try {
4721+
Thread.sleep(10000);
4722+
} catch (Exception e) {
4723+
System.out.println("Exception");
4724+
}
4725+
return "greetings: " + string;
4726+
}
4727+
}
4728+
4729+
/** GreetingWorkflow implementation that updates greeting after sleeping for 5 seconds. */
4730+
public static class TimerFiringWorkflowImpl implements GreetingWorkflow {
4731+
4732+
private final GreetingActivities activities =
4733+
Workflow.newActivityStub(GreetingActivities.class);
4734+
4735+
@Override
4736+
public void createGreeting(String name) {
4737+
Promise<String> promiseString1 = Async.function(() -> activities.composeGreeting("1"));
4738+
Promise<String> promiseString2 = Async.function(() -> "aString2");
4739+
4740+
Set<Promise<String>> promiseSet = new HashSet<>();
4741+
promiseSet.add(promiseString1);
4742+
promiseSet.add(promiseString2);
4743+
Workflow.await(
4744+
Duration.ofSeconds(30), () -> promiseSet.stream().anyMatch(Promise::isCompleted));
4745+
4746+
promiseString1.get();
4747+
Workflow.sleep(Duration.ofSeconds(20));
4748+
promiseString2.get();
4749+
}
4750+
}
4751+
4752+
// Server doesn't guarantee that the timer fire timestamp is larger or equal of the
4753+
// expected fire time. This test ensures that client still fires timer in this case.
4754+
@Test
4755+
public void testTimerFiringTimestampEarlierThanExpected() throws Exception {
4756+
4757+
// Avoid executing 4 times
4758+
Assume.assumeFalse("skipping for docker tests", useExternalService);
4759+
Assume.assumeFalse("skipping for sticky off", stickyOff);
4760+
4761+
WorkflowReplayer.replayWorkflowExecutionFromResource(
4762+
"timerfiring.json", TimerFiringWorkflowImpl.class);
4763+
}
4764+
47064765
private static class FilteredTrace {
47074766

47084767
private final List<String> impl = Collections.synchronizedList(new ArrayList<>());

src/test/resources/timerfiring.json

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
[
2+
{
3+
"eventId": 1,
4+
"timestamp": 1561674301723220900,
5+
"eventType": "WorkflowExecutionStarted",
6+
"version": -24,
7+
"workflowExecutionStartedEventAttributes": {
8+
"workflowType": {
9+
"name": "GreetingWorkflow::createGreeting"
10+
},
11+
"taskList": {
12+
"name": "HelloQuery"
13+
},
14+
"input": "IldvcmxkIg==",
15+
"executionStartToCloseTimeoutSeconds": 60,
16+
"taskStartToCloseTimeoutSeconds": 10,
17+
"identity": "",
18+
"attempt": 0,
19+
"firstDecisionTaskBackoffSeconds": 0
20+
}
21+
},
22+
{
23+
"eventId": 2,
24+
"timestamp": 1561674301723316900,
25+
"eventType": "DecisionTaskScheduled",
26+
"version": -24,
27+
"decisionTaskScheduledEventAttributes": {
28+
"taskList": {
29+
"name": "HelloQuery"
30+
},
31+
"startToCloseTimeoutSeconds": 10,
32+
"attempt": 0
33+
}
34+
},
35+
{
36+
"eventId": 3,
37+
"timestamp": 1561674301738571000,
38+
"eventType": "DecisionTaskStarted",
39+
"version": -24,
40+
"decisionTaskStartedEventAttributes": {
41+
"scheduledEventId": 2,
42+
"identity": "16885@meiliang-C02XQ1JRJGH6",
43+
"requestId": "e19f40c0-cfd9-4854-a76a-635f80281a0f"
44+
}
45+
},
46+
{
47+
"eventId": 4,
48+
"timestamp": 1561674301879285400,
49+
"eventType": "DecisionTaskCompleted",
50+
"version": -24,
51+
"decisionTaskCompletedEventAttributes": {
52+
"scheduledEventId": 2,
53+
"startedEventId": 3,
54+
"identity": "16885@meiliang-C02XQ1JRJGH6"
55+
}
56+
},
57+
{
58+
"eventId": 5,
59+
"timestamp": 1561674301879306600,
60+
"eventType": "ActivityTaskScheduled",
61+
"version": -24,
62+
"activityTaskScheduledEventAttributes": {
63+
"activityId": "0",
64+
"activityType": {
65+
"name": "GreetingActivities::composeGreeting"
66+
},
67+
"taskList": {
68+
"name": "HelloQuery"
69+
},
70+
"input": "IjEi",
71+
"scheduleToCloseTimeoutSeconds": 60,
72+
"scheduleToStartTimeoutSeconds": 60,
73+
"startToCloseTimeoutSeconds": 60,
74+
"heartbeatTimeoutSeconds": 60,
75+
"decisionTaskCompletedEventId": 4
76+
}
77+
},
78+
{
79+
"eventId": 6,
80+
"timestamp": 1561674301879365300,
81+
"eventType": "TimerStarted",
82+
"version": -24,
83+
"timerStartedEventAttributes": {
84+
"timerId": "1",
85+
"startToFireTimeoutSeconds": 30,
86+
"decisionTaskCompletedEventId": 4
87+
}
88+
},
89+
{
90+
"eventId": 7,
91+
"timestamp": 1561674301887961900,
92+
"eventType": "ActivityTaskStarted",
93+
"version": -24,
94+
"activityTaskStartedEventAttributes": {
95+
"scheduledEventId": 5,
96+
"identity": "16885@meiliang-C02XQ1JRJGH6",
97+
"requestId": "5118c58e-86f8-42bc-8e59-f84974491dc5",
98+
"attempt": 0
99+
}
100+
},
101+
{
102+
"eventId": 8,
103+
"timestamp": 1561674311891785800,
104+
"eventType": "ActivityTaskCompleted",
105+
"version": -24,
106+
"activityTaskCompletedEventAttributes": {
107+
"result": "ImdyZWV0aW5nczogMSI=",
108+
"scheduledEventId": 5,
109+
"startedEventId": 7,
110+
"identity": "16885@meiliang-C02XQ1JRJGH6"
111+
}
112+
},
113+
{
114+
"eventId": 9,
115+
"timestamp": 1561674311891815400,
116+
"eventType": "DecisionTaskScheduled",
117+
"version": -24,
118+
"decisionTaskScheduledEventAttributes": {
119+
"taskList": {
120+
"name": "meiliang-C02XQ1JRJGH6:79cdd2f2-b1c9-45b3-ad4e-0d1829641d57"
121+
},
122+
"startToCloseTimeoutSeconds": 10,
123+
"attempt": 0
124+
}
125+
},
126+
{
127+
"eventId": 10,
128+
"timestamp": 1561674311904648000,
129+
"eventType": "DecisionTaskStarted",
130+
"version": -24,
131+
"decisionTaskStartedEventAttributes": {
132+
"scheduledEventId": 9,
133+
"identity": "79cdd2f2-b1c9-45b3-ad4e-0d1829641d57",
134+
"requestId": "5f419fbf-bba3-4f19-818f-498fd4fd45a1"
135+
}
136+
},
137+
{
138+
"eventId": 11,
139+
"timestamp": 1561674311932559600,
140+
"eventType": "DecisionTaskCompleted",
141+
"version": -24,
142+
"decisionTaskCompletedEventAttributes": {
143+
"scheduledEventId": 9,
144+
"startedEventId": 10,
145+
"identity": "16885@meiliang-C02XQ1JRJGH6"
146+
}
147+
},
148+
{
149+
"eventId": 12,
150+
"timestamp": 1561674311932572000,
151+
"eventType": "TimerCanceled",
152+
"version": -24,
153+
"timerCanceledEventAttributes": {
154+
"timerId": "1",
155+
"startedEventId": 6,
156+
"decisionTaskCompletedEventId": 11,
157+
"identity": "16885@meiliang-C02XQ1JRJGH6"
158+
}
159+
},
160+
{
161+
"eventId": 13,
162+
"timestamp": 1561674311932666300,
163+
"eventType": "TimerStarted",
164+
"version": -24,
165+
"timerStartedEventAttributes": {
166+
"timerId": "2",
167+
"startToFireTimeoutSeconds": 20,
168+
"decisionTaskCompletedEventId": 11
169+
}
170+
},
171+
{
172+
"eventId": 14,
173+
"timestamp": 1561674331881914700,
174+
"eventType": "TimerFired",
175+
"version": -24,
176+
"timerFiredEventAttributes": {
177+
"timerId": "2",
178+
"startedEventId": 13
179+
}
180+
},
181+
{
182+
"eventId": 15,
183+
"timestamp": 1561674331881939900,
184+
"eventType": "DecisionTaskScheduled",
185+
"version": -24,
186+
"decisionTaskScheduledEventAttributes": {
187+
"taskList": {
188+
"name": "meiliang-C02XQ1JRJGH6:79cdd2f2-b1c9-45b3-ad4e-0d1829641d57"
189+
},
190+
"startToCloseTimeoutSeconds": 10,
191+
"attempt": 0
192+
}
193+
},
194+
{
195+
"eventId": 16,
196+
"timestamp": 1561674331891620700,
197+
"eventType": "DecisionTaskStarted",
198+
"version": -24,
199+
"decisionTaskStartedEventAttributes": {
200+
"scheduledEventId": 15,
201+
"identity": "79cdd2f2-b1c9-45b3-ad4e-0d1829641d57",
202+
"requestId": "f2094ebe-cfe3-4cdd-acd5-d96b39f34812"
203+
}
204+
},
205+
{
206+
"eventId": 17,
207+
"timestamp": 1561674331909732900,
208+
"eventType": "DecisionTaskCompleted",
209+
"version": -24,
210+
"decisionTaskCompletedEventAttributes": {
211+
"scheduledEventId": 15,
212+
"startedEventId": 16,
213+
"identity": "16885@meiliang-C02XQ1JRJGH6"
214+
}
215+
},
216+
{
217+
"eventId": 18,
218+
"timestamp": 1561674331909758000,
219+
"eventType": "WorkflowExecutionCompleted",
220+
"version": -24,
221+
"workflowExecutionCompletedEventAttributes": {
222+
"decisionTaskCompletedEventId": 17
223+
}
224+
}
225+
]

0 commit comments

Comments
 (0)