Skip to content

Commit f922c06

Browse files
Fix bug in VersionStateMachine causing wrong version to get returned (#1841)
Fix a bug in VersionStateMachine causing DEFAULT_VERSION to get returned incorrectly if the same change ID was read multiple times in a Workflow.
1 parent 622edc0 commit f922c06

File tree

9 files changed

+1199
-15
lines changed

9 files changed

+1199
-15
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/VersionStateMachine.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -368,13 +368,26 @@ private VersionStateMachine(
368368
this.commandSink = Objects.requireNonNull(commandSink);
369369
this.stateMachineSink = stateMachineSink;
370370
}
371-
372-
public State getVersion(
371+
/**
372+
* Get the version for this state machine.
373+
*
374+
* @param minSupported min version supported for the change
375+
* @param maxSupported max version supported for the change
376+
* @param callback used to return version
377+
* @return True if the identifier is not present in history
378+
*/
379+
public boolean getVersion(
373380
int minSupported, int maxSupported, Functions.Proc2<Integer, RuntimeException> callback) {
374381
InvocationStateMachine ism = new InvocationStateMachine(minSupported, maxSupported, callback);
375382
ism.explicitEvent(ExplicitEvent.CHECK_EXECUTION_STATE);
376383
ism.explicitEvent(ExplicitEvent.SCHEDULE);
377-
return ism.getState();
384+
// If the state is SKIPPED_REPLAYING that means we:
385+
// 1. Are replaying
386+
// 2. We don't have a preloadedVersion
387+
// This means either this version marker did not exist in the original execution or
388+
// the version marker did exist, but was in an earlier WFT. If the version marker was in a
389+
// previous WFT then the version field should have a value.
390+
return !(ism.getState() == VersionStateMachine.State.SKIPPED_REPLAYING && version == null);
378391
}
379392

380393
public void handleNonMatchingEvent(HistoryEvent event) {

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -926,18 +926,16 @@ public boolean getVersion(
926926
return VersionStateMachine.newInstance(
927927
changeId, this::isReplaying, commandSink, stateMachineSink);
928928
});
929-
VersionStateMachine.State state =
930-
stateMachine.getVersion(
931-
minSupported,
932-
maxSupported,
933-
(v, e) -> {
934-
callback.apply(v, e);
935-
// without this getVersion call will trigger the end of WFT,
936-
// instead we want to prepare subsequent commands and unblock the execution one more
937-
// time.
938-
eventLoop();
939-
});
940-
return state != VersionStateMachine.State.SKIPPED_REPLAYING;
929+
return stateMachine.getVersion(
930+
minSupported,
931+
maxSupported,
932+
(v, e) -> {
933+
callback.apply(v, e);
934+
// without this getVersion call will trigger the end of WFT,
935+
// instead we want to prepare subsequent commands and unblock the execution one more
936+
// time.
937+
eventLoop();
938+
});
941939
}
942940

943941
public List<ExecuteLocalActivityParameters> takeLocalActivityRequests() {
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.versionTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.client.WorkflowClient;
26+
import io.temporal.client.WorkflowStub;
27+
import io.temporal.testing.internal.SDKTestOptions;
28+
import io.temporal.testing.internal.SDKTestWorkflowRule;
29+
import io.temporal.worker.WorkerOptions;
30+
import io.temporal.workflow.Workflow;
31+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
32+
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
33+
import io.temporal.workflow.shared.TestWorkflows;
34+
import java.time.Duration;
35+
import org.junit.Rule;
36+
import org.junit.Test;
37+
38+
public class GetVersionDefaultInSignalTest {
39+
40+
@Rule
41+
public SDKTestWorkflowRule testWorkflowRule =
42+
SDKTestWorkflowRule.newBuilder()
43+
.setWorkflowTypes(TestGetVersionWorkflowImpl.class)
44+
.setActivityImplementations(new TestActivitiesImpl())
45+
// Forcing a replay. Full history arrived from a normal queue causing a replay.
46+
.setWorkerOptions(
47+
WorkerOptions.newBuilder()
48+
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
49+
.build())
50+
.build();
51+
52+
@Test
53+
public void testGetVersionDefaultInSignal() throws InterruptedException {
54+
TestWorkflows.TestSignaledWorkflow workflow =
55+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflows.TestSignaledWorkflow.class);
56+
WorkflowClient.start(workflow::execute);
57+
58+
WorkflowStub workflowStub = WorkflowStub.fromTyped(workflow);
59+
SDKTestWorkflowRule.waitForOKQuery(workflowStub);
60+
61+
workflow.signal(testWorkflowRule.getTaskQueue());
62+
workflow.signal(testWorkflowRule.getTaskQueue());
63+
testWorkflowRule.invalidateWorkflowCache();
64+
workflow.signal(testWorkflowRule.getTaskQueue());
65+
66+
String result = workflowStub.getResult(String.class);
67+
assertEquals("1", result);
68+
}
69+
70+
public static class TestGetVersionWorkflowImpl implements TestWorkflows.TestSignaledWorkflow {
71+
int signalCounter = 0;
72+
73+
@Override
74+
public String execute() {
75+
int version =
76+
io.temporal.workflow.Workflow.getVersion(
77+
"testMarker", io.temporal.workflow.Workflow.DEFAULT_VERSION, 1);
78+
Workflow.await(() -> signalCounter >= 3);
79+
return String.valueOf(version);
80+
}
81+
82+
@Override
83+
public void signal(String taskQueue) {
84+
VariousTestActivities testActivities =
85+
Workflow.newActivityStub(
86+
VariousTestActivities.class,
87+
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));
88+
89+
int version =
90+
io.temporal.workflow.Workflow.getVersion(
91+
"testMarker", io.temporal.workflow.Workflow.DEFAULT_VERSION, 1);
92+
if (version == 1) {
93+
testActivities.activity1(1);
94+
} else {
95+
testActivities.activity();
96+
}
97+
signalCounter++;
98+
}
99+
}
100+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.versionTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.testing.WorkflowReplayer;
26+
import io.temporal.testing.internal.SDKTestOptions;
27+
import io.temporal.testing.internal.SDKTestWorkflowRule;
28+
import io.temporal.worker.WorkerOptions;
29+
import io.temporal.workflow.Workflow;
30+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
31+
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
32+
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
33+
import io.temporal.workflow.unsafe.WorkflowUnsafe;
34+
import java.time.Duration;
35+
import org.junit.Rule;
36+
import org.junit.Test;
37+
38+
public class GetVersionMultipleCallsDefaultTest {
39+
@Rule
40+
public SDKTestWorkflowRule testWorkflowRule =
41+
SDKTestWorkflowRule.newBuilder()
42+
.setWorkflowTypes(TestGetVersionWorkflowImpl.class)
43+
.setActivityImplementations(new TestActivitiesImpl())
44+
// Forcing a replay. Full history arrived from a normal queue causing a replay.
45+
.setWorkerOptions(
46+
WorkerOptions.newBuilder()
47+
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
48+
.build())
49+
.build();
50+
51+
@Test
52+
public void testGetVersionMultipleCallsDefault() {
53+
TestWorkflow1 workflowStub =
54+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
55+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
56+
assertEquals("activity1", result);
57+
}
58+
59+
@Test
60+
public void testGetVersionMultipleCallsReplay() throws Exception {
61+
WorkflowReplayer.replayWorkflowExecutionFromResource(
62+
"testGetVersionMultipleCallsHistoryDefault.json",
63+
GetVersionMultipleCallsDefaultTest.TestGetVersionWorkflowImpl.class);
64+
}
65+
66+
public static class TestGetVersionWorkflowImpl implements TestWorkflow1 {
67+
@Override
68+
public String execute(String taskQueue) {
69+
VariousTestActivities testActivities =
70+
Workflow.newActivityStub(
71+
VariousTestActivities.class,
72+
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));
73+
System.out.println("Calling getVersion for the fist time");
74+
if (WorkflowUnsafe.isReplaying()) {
75+
int version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
76+
assertEquals(version, Workflow.DEFAULT_VERSION);
77+
78+
// Try again in the same WFT
79+
version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
80+
assertEquals(version, Workflow.DEFAULT_VERSION);
81+
}
82+
83+
// Create a new WFT by sleeping
84+
Workflow.sleep(1000);
85+
int version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
86+
assertEquals(version, Workflow.DEFAULT_VERSION);
87+
88+
String result = "activity" + testActivities.activity1(1);
89+
90+
version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
91+
assertEquals(version, Workflow.DEFAULT_VERSION);
92+
return result;
93+
}
94+
}
95+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.versionTests;
22+
23+
import static org.junit.Assert.assertEquals;
24+
25+
import io.temporal.testing.WorkflowReplayer;
26+
import io.temporal.testing.internal.SDKTestOptions;
27+
import io.temporal.testing.internal.SDKTestWorkflowRule;
28+
import io.temporal.worker.WorkerOptions;
29+
import io.temporal.workflow.Workflow;
30+
import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl;
31+
import io.temporal.workflow.shared.TestActivities.VariousTestActivities;
32+
import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1;
33+
import java.time.Duration;
34+
import org.junit.Rule;
35+
import org.junit.Test;
36+
37+
public class GetVersionMultipleCallsTest {
38+
@Rule
39+
public SDKTestWorkflowRule testWorkflowRule =
40+
SDKTestWorkflowRule.newBuilder()
41+
.setWorkflowTypes(TestGetVersionWorkflowImpl.class)
42+
.setActivityImplementations(new TestActivitiesImpl())
43+
// Forcing a replay. Full history arrived from a normal queue causing a replay.
44+
.setWorkerOptions(
45+
WorkerOptions.newBuilder()
46+
.setStickyQueueScheduleToStartTimeout(Duration.ZERO)
47+
.build())
48+
.build();
49+
50+
@Test
51+
public void testGetVersionMultipleCalls() {
52+
TestWorkflow1 workflowStub =
53+
testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class);
54+
String result = workflowStub.execute(testWorkflowRule.getTaskQueue());
55+
assertEquals("activity1", result);
56+
}
57+
58+
@Test
59+
public void testGetVersionMultipleCallsReplay() throws Exception {
60+
WorkflowReplayer.replayWorkflowExecutionFromResource(
61+
"testGetVersionMultipleCallsHistory.json",
62+
GetVersionMultipleCallsTest.TestGetVersionWorkflowImpl.class);
63+
}
64+
65+
public static class TestGetVersionWorkflowImpl implements TestWorkflow1 {
66+
@Override
67+
public String execute(String taskQueue) {
68+
VariousTestActivities testActivities =
69+
Workflow.newActivityStub(
70+
VariousTestActivities.class,
71+
SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue));
72+
int version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
73+
assertEquals(version, 1);
74+
75+
// Try again in the same WFT
76+
version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
77+
assertEquals(version, 1);
78+
79+
// Create a new WFT by sleeping
80+
Workflow.sleep(1000);
81+
version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
82+
assertEquals(version, 1);
83+
String result = "activity" + testActivities.activity1(1);
84+
85+
version = Workflow.getVersion("changeId", Workflow.DEFAULT_VERSION, 1);
86+
assertEquals(version, 1);
87+
88+
return result;
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)