Skip to content

Commit 365724a

Browse files
authored
Fix get version to return the same version during and after replay (#463)
1 parent 83973b3 commit 365724a

File tree

3 files changed

+51
-22
lines changed

3 files changed

+51
-22
lines changed

src/main/java/com/uber/cadence/internal/replay/ClockDecisionContext.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,8 @@ int getVersion(String changeId, DataConverter converter, int minSupported, int m
311311
}
312312

313313
private void validateVersion(String changeID, int version, int minSupported, int maxSupported) {
314-
if (version < minSupported || version > maxSupported) {
314+
if ((version < minSupported || version > maxSupported)
315+
&& version != WorkflowInternal.DEFAULT_VERSION) {
315316
throw new Error(
316317
String.format(
317318
"Version %d of changeID %s is not supported. Supported version is between %d and %d.",

src/main/java/com/uber/cadence/internal/replay/MarkerHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.uber.cadence.HistoryEvent;
2323
import com.uber.cadence.MarkerRecordedEventAttributes;
2424
import com.uber.cadence.converter.DataConverter;
25+
import com.uber.cadence.internal.sync.WorkflowInternal;
2526
import com.uber.cadence.workflow.Functions.Func1;
2627
import com.uber.m3.util.ImmutableMap;
2728
import java.nio.ByteBuffer;
@@ -212,6 +213,12 @@ Optional<byte[]> handle(
212213
recordMutableMarker(id, eventId, data.get(), accessCount, converter);
213214
return data;
214215
}
216+
217+
if (!stored.isPresent()) {
218+
mutableMarkerResults.put(
219+
id, new MarkerResult(converter.toData(WorkflowInternal.DEFAULT_VERSION)));
220+
}
221+
215222
return stored;
216223
}
217224
Optional<byte[]> toStore = func.apply(stored);

src/test/java/com/uber/cadence/workflow/WorkflowTest.java

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4037,42 +4037,29 @@ public String execute(String taskList) {
40374037

40384038
// Test adding a version check in non-replay code.
40394039
int version = Workflow.getVersion("test_change", Workflow.DEFAULT_VERSION, 1);
4040-
String result = "";
4041-
if (version == Workflow.DEFAULT_VERSION) {
4042-
result += "activity" + testActivities.activity1(1);
4043-
} else {
4044-
result += testActivities.activity2("activity2", 2); // This is executed.
4045-
}
4040+
assertEquals(version, 1);
4041+
String result = testActivities.activity2("activity2", 2);
40464042

40474043
// Test version change in non-replay code.
40484044
version = Workflow.getVersion("test_change", 1, 2);
4049-
if (version == 1) {
4050-
result += "activity" + testActivities.activity1(1); // This is executed.
4051-
} else {
4052-
result += testActivities.activity2("activity2", 2);
4053-
}
4045+
assertEquals(version, 1);
4046+
result += "activity" + testActivities.activity1(1);
40544047

40554048
// Test adding a version check in replay code.
40564049
if (!getVersionExecuted.contains(taskList + "-test_change_2")) {
40574050
result += "activity" + testActivities.activity1(1); // This is executed in non-replay mode.
40584051
getVersionExecuted.add(taskList + "-test_change_2");
40594052
} else {
40604053
int version2 = Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 1);
4061-
if (version2 == Workflow.DEFAULT_VERSION) {
4062-
result += "activity" + testActivities.activity1(1); // This is executed in replay mode.
4063-
} else {
4064-
result += testActivities.activity2("activity2", 2);
4065-
}
4054+
assertEquals(version2, Workflow.DEFAULT_VERSION);
4055+
result += "activity" + testActivities.activity1(1);
40664056
}
40674057

40684058
// Test get version in replay mode.
40694059
Workflow.sleep(1000);
40704060
version = Workflow.getVersion("test_change", 1, 2);
4071-
if (version == 1) {
4072-
result += "activity" + testActivities.activity1(1); // This is executed.
4073-
} else {
4074-
result += testActivities.activity2("activity2", 2);
4075-
}
4061+
assertEquals(version, 1);
4062+
result += "activity" + testActivities.activity1(1);
40764063

40774064
return result;
40784065
}
@@ -4097,6 +4084,40 @@ public void testGetVersion() {
40974084
"executeActivity customActivity1");
40984085
}
40994086

4087+
public static class TestGetVersionWorkflow2Impl implements TestWorkflow1 {
4088+
4089+
@Override
4090+
public String execute(String taskList) {
4091+
// Test adding a version check in replay code.
4092+
if (!getVersionExecuted.contains(taskList + "-test_change_2")) {
4093+
getVersionExecuted.add(taskList + "-test_change_2");
4094+
Workflow.sleep(Duration.ofHours(1));
4095+
} else {
4096+
int version2 = Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 1);
4097+
Workflow.sleep(Duration.ofHours(1));
4098+
int version3 = Workflow.getVersion("test_change_2", Workflow.DEFAULT_VERSION, 1);
4099+
4100+
assertEquals(version2, version3);
4101+
}
4102+
4103+
return "test";
4104+
}
4105+
}
4106+
4107+
@Test
4108+
public void testGetVersion2() {
4109+
Assume.assumeFalse("skipping for docker tests", useExternalService);
4110+
4111+
startWorkerFor(TestGetVersionWorkflow2Impl.class);
4112+
TestWorkflow1 workflowStub =
4113+
workflowClient.newWorkflowStub(
4114+
TestWorkflow1.class,
4115+
newWorkflowOptionsBuilder(taskList)
4116+
.setExecutionStartToCloseTimeout(Duration.ofHours(2))
4117+
.build());
4118+
workflowStub.execute(taskList);
4119+
}
4120+
41004121
static CompletableFuture<Boolean> executionStarted = new CompletableFuture<>();
41014122

41024123
public static class TestGetVersionWithoutDecisionEventWorkflowImpl

0 commit comments

Comments
 (0)