Skip to content

Commit cb780da

Browse files
authored
Fixed test server previous started eventId (#136)
1 parent f9a6265 commit cb780da

File tree

4 files changed

+24
-11
lines changed

4 files changed

+24
-11
lines changed

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public PollForDecisionTaskResponse getDecisionTask() {
112112
return events.getDecisionTask();
113113
}
114114

115-
public long getLastNonReplayEventId() {
115+
public long getPreviousStartedEventId() {
116116
return getDecisionTask().getPreviousStartedEventId();
117117
}
118118
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,8 @@ public void decide() throws Throwable {
343343

344344
private void decideImpl(Functions.Proc query) throws Throwable {
345345
try {
346-
long lastNonReplayedEventId = historyHelper.getLastNonReplayEventId();
346+
long previousDecisionStartedEventId = historyHelper.getPreviousStartedEventId();
347+
long lastNonFailedStartEventId = -1;
347348
// Buffer events until the next DecisionTaskStarted and then process them
348349
// setting current time to the time of DecisionTaskStarted event
349350
HistoryHelper.EventsIterator eventsIterator = historyHelper.getEvents();
@@ -367,6 +368,12 @@ private void decideImpl(Functions.Proc query) throws Throwable {
367368

368369
decisionsHelper.handleDecisionTaskStartedEvent();
369370
if (!eventsIterator.isNextDecisionFailed()) {
371+
if (query == null
372+
&& lastNonFailedStartEventId > 0
373+
&& previousDecisionStartedEventId == 0) {
374+
throw new Error("Unexpected 0 previousDecisionStartedEventId: ");
375+
}
376+
lastNonFailedStartEventId = event.getEventId();
370377
// Cadence timestamp is in nanoseconds
371378
long replayCurrentTimeMilliseconds = event.getTimestamp() / MILLION;
372379
context.setReplayCurrentTimeMilliseconds(replayCurrentTimeMilliseconds);
@@ -379,7 +386,7 @@ private void decideImpl(Functions.Proc query) throws Throwable {
379386
}
380387
}
381388
for (HistoryEvent event : decisionCompletionToStartEvents) {
382-
if (event.getEventId() >= lastNonReplayedEventId) {
389+
if (event.getEventId() >= previousDecisionStartedEventId) {
383390
context.setReplaying(false);
384391
}
385392
EventType eventType = event.getEventType();

src/main/java/com/uber/cadence/internal/testservice/StateMachines.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -143,17 +143,20 @@ static final class WorkflowData {}
143143

144144
static final class DecisionTaskData {
145145

146+
final long previousStartedEventId;
147+
146148
final TestWorkflowStore store;
147149

148-
long previousStartedEventId = -1;
150+
long startedEventId = -1;
149151

150152
PollForDecisionTaskResponse decisionTask;
151153

152154
long scheduledEventId = -1;
153155

154156
int attempt;
155157

156-
DecisionTaskData(TestWorkflowStore store) {
158+
DecisionTaskData(long previousStartedEventId, TestWorkflowStore store) {
159+
this.previousStartedEventId = previousStartedEventId;
157160
this.store = store;
158161
}
159162
}
@@ -212,8 +215,9 @@ static StateMachine<WorkflowData> newWorkflowStateMachine() {
212215
.add(CANCELLATION_REQUESTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutWorkflow);
213216
}
214217

215-
static StateMachine<DecisionTaskData> newDecisionStateMachine(TestWorkflowStore store) {
216-
return new StateMachine<>(new DecisionTaskData(store))
218+
static StateMachine<DecisionTaskData> newDecisionStateMachine(
219+
long previousStartedEventId, TestWorkflowStore store) {
220+
return new StateMachine<>(new DecisionTaskData(previousStartedEventId, store))
217221
.add(NONE, INITIATE, INITIATED, StateMachines::scheduleDecisionTask)
218222
.add(INITIATED, START, STARTED, StateMachines::startDecisionTask)
219223
.add(STARTED, COMPLETE, COMPLETED, StateMachines::completeDecisionTask)
@@ -694,7 +698,7 @@ private static void startDecisionTask(
694698
throw new InternalServiceError(entityNotExistsError.toString());
695699
}
696700
data.decisionTask.setHistory(new History().setEvents(events));
697-
data.previousStartedEventId = startedEventId;
701+
data.startedEventId = startedEventId;
698702
data.attempt++;
699703
});
700704
}
@@ -746,7 +750,7 @@ private static void failDecisionTask(
746750
.setIdentity(request.getIdentity())
747751
.setCause(request.getCause())
748752
.setDetails(request.getDetails())
749-
.setStartedEventId(data.previousStartedEventId)
753+
.setStartedEventId(data.startedEventId)
750754
.setScheduledEventId(data.scheduledEventId);
751755
HistoryEvent event =
752756
new HistoryEvent()
@@ -759,7 +763,7 @@ private static void timeoutDecisionTask(
759763
RequestContext ctx, DecisionTaskData data, Object ignored, long notUsed) {
760764
DecisionTaskTimedOutEventAttributes a =
761765
new DecisionTaskTimedOutEventAttributes()
762-
.setStartedEventId(data.previousStartedEventId)
766+
.setStartedEventId(data.startedEventId)
763767
.setTimeoutType(TimeoutType.START_TO_CLOSE)
764768
.setScheduledEventId(data.scheduledEventId);
765769
HistoryEvent event =

src/main/java/com/uber/cadence/internal/testservice/TestWorkflowMutableStateImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ void apply(RequestContext ctx)
129129
private final Map<String, StateMachine<SignalExternalData>> externalSignals = new HashMap<>();
130130
private StateMachine<WorkflowData> workflow;
131131
private StateMachine<DecisionTaskData> decision;
132+
private long lastNonFailedDecisionStartEventId;
132133
private final Map<String, CompletableFuture<QueryWorkflowResponse>> queries =
133134
new ConcurrentHashMap<>();
134135

@@ -252,6 +253,7 @@ public void completeDecisionTask(int historySize, RespondDecisionTaskCompletedRe
252253
for (RequestContext deferredCtx : this.concurrentToDecision) {
253254
ctx.add(deferredCtx);
254255
}
256+
lastNonFailedDecisionStartEventId = this.decision.getData().startedEventId;
255257
this.decision = null;
256258
boolean completed =
257259
workflow.getState() == StateMachines.State.COMPLETED
@@ -823,7 +825,7 @@ private void scheduleDecision(RequestContext ctx) throws InternalServiceError {
823825
}
824826
throw new InternalServiceError("unexpected decision state: " + decision.getState());
825827
}
826-
this.decision = StateMachines.newDecisionStateMachine(store);
828+
this.decision = StateMachines.newDecisionStateMachine(lastNonFailedDecisionStartEventId, store);
827829
decision.action(StateMachines.Action.INITIATE, ctx, startRequest, 0);
828830
ctx.lockTimer();
829831
}

0 commit comments

Comments
 (0)