Skip to content

Commit ce02fea

Browse files
author
Liang Mei
authored
Fix getVersion without decision event (#309)
* Fix getVersion without decision event * Add sleep before signal * Use completable future instead of sleep
1 parent 27da7db commit ce02fea

File tree

4 files changed

+58
-17
lines changed

4 files changed

+58
-17
lines changed

src/main/java/com/uber/cadence/internal/replay/DecisionsHelper.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -701,10 +701,6 @@ String getAndIncrementNextId() {
701701
return String.valueOf(idCounter++);
702702
}
703703

704-
HistoryEvent getDecisionEvent(long eventId) {
705-
return decisionEvents.getDecisionEvent(eventId);
706-
}
707-
708704
Optional<HistoryEvent> getOptionalDecisionEvent(long eventId) {
709705
return decisionEvents.getOptionalDecisionEvent(eventId);
710706
}

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,10 @@ public List<HistoryEvent> getEvents() {
7878
return events;
7979
}
8080

81-
public List<HistoryEvent> getDecisionEvents() {
81+
List<HistoryEvent> getDecisionEvents() {
8282
return decisionEvents;
8383
}
8484

85-
HistoryEvent getDecisionEvent(long eventId) {
86-
int index = (int) (eventId - nextDecisionEventId);
87-
if (index < 0 || index >= decisionEvents.size()) {
88-
throw new IllegalArgumentException("No decision event found at eventId=" + eventId);
89-
}
90-
return decisionEvents.get(index);
91-
}
92-
9385
Optional<HistoryEvent> getOptionalDecisionEvent(long eventId) {
9486
int index = (int) (eventId - nextDecisionEventId);
9587
if (index < 0 || index >= decisionEvents.size()) {

src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,12 @@ Optional<byte[]> handle(
134134

135135
private Optional<byte[]> getMarkerDataFromHistory(
136136
long eventId, String markerId, int expectedAcccessCount, DataConverter converter) {
137-
HistoryEvent event = decisions.getDecisionEvent(eventId);
138-
if (event.getEventType() != EventType.MarkerRecorded) {
137+
Optional<HistoryEvent> event = decisions.getOptionalDecisionEvent(eventId);
138+
if (!event.isPresent() || event.get().getEventType() != EventType.MarkerRecorded) {
139139
return Optional.empty();
140140
}
141-
MarkerRecordedEventAttributes attributes = event.getMarkerRecordedEventAttributes();
141+
142+
MarkerRecordedEventAttributes attributes = event.get().getMarkerRecordedEventAttributes();
142143
String name = attributes.getMarkerName();
143144
if (!markerName.equals(name)) {
144145
return Optional.empty();

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public class WorkflowTest {
114114
*/
115115
private static final boolean DEBUGGER_TIMEOUTS = false;
116116

117-
public static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]";
117+
private static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]";
118118

119119
private TracingWorkflowInterceptorFactory tracer;
120120
private static final boolean useDockerService =
@@ -3733,6 +3733,58 @@ public void testGetVersion() {
37333733
"executeActivity customActivity1");
37343734
}
37353735

3736+
static CompletableFuture<Boolean> executionStarted = new CompletableFuture<>();
3737+
public static class TestGetVersionWithoutDecisionEventWorkflowImpl implements TestWorkflowSignaled {
3738+
3739+
CompletablePromise<Boolean> signalReceived = Workflow.newPromise();
3740+
3741+
@Override
3742+
public String execute() {
3743+
try {
3744+
if (!getVersionExecuted.contains("getVersionWithoutDecisionEvent")) {
3745+
// Execute getVersion in non-replay mode.
3746+
getVersionExecuted.add("getVersionWithoutDecisionEvent");
3747+
executionStarted.complete(true);
3748+
signalReceived.get();
3749+
} else {
3750+
// Execute getVersion in replay mode. In this case we have no decision event, only a signal.
3751+
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
3752+
if (version == Workflow.DEFAULT_VERSION) {
3753+
signalReceived.get();
3754+
return "result 1";
3755+
} else {
3756+
return "result 2";
3757+
}
3758+
}
3759+
} catch (Exception e) {
3760+
throw new RuntimeException("failed to get from signal");
3761+
}
3762+
3763+
throw new RuntimeException("unreachable");
3764+
}
3765+
3766+
@Override
3767+
public void signal1(String arg) {
3768+
signalReceived.complete(true);
3769+
}
3770+
}
3771+
3772+
@Test
3773+
public void testGetVersionWithoutDecisionEvent() throws Exception {
3774+
Assume.assumeTrue("skipping as there will be no replay", disableStickyExecution);
3775+
executionStarted = new CompletableFuture<>();
3776+
getVersionExecuted.remove("getVersionWithoutDecisionEvent");
3777+
startWorkerFor(TestGetVersionWithoutDecisionEventWorkflowImpl.class);
3778+
TestWorkflowSignaled workflowStub =
3779+
workflowClient.newWorkflowStub(
3780+
TestWorkflowSignaled.class, newWorkflowOptionsBuilder(taskList).build());
3781+
WorkflowClient.start(workflowStub::execute);
3782+
executionStarted.get();
3783+
workflowStub.signal1("test signal");
3784+
String result = workflowStub.execute();
3785+
assertEquals("result 1", result);
3786+
}
3787+
37363788
// The following test covers the scenario where getVersion call is removed before a
37373789
// non-version-marker decision.
37383790
public static class TestGetVersionRemovedInReplayWorkflowImpl implements TestWorkflow1 {

0 commit comments

Comments
 (0)