Skip to content

Commit ce90d24

Browse files
Enable speculative workflow tasks (#2390)
* Enabled speculative workflow task with command events * add check for real server * even -> event
1 parent 8b17a52 commit ce90d24

File tree

7 files changed

+153
-8
lines changed

7 files changed

+153
-8
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,8 @@ public QueryResult handleDirectQueryWorkflowTask(
218218
}
219219

220220
@Override
221-
public void resetStartedEvenId(Long eventId) {
222-
workflowStateMachines.resetStartedEvenId(eventId);
221+
public void resetStartedEventId(Long eventId) {
222+
workflowStateMachines.resetStartedEventId(eventId);
223223
}
224224

225225
private void handleWorkflowTaskImpl(

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private Result handleWorkflowTaskWithQuery(
139139
workflowTask.getWorkflowType().getName(),
140140
workflowTask,
141141
wftResult,
142-
workflowRunTaskHandler::resetStartedEvenId);
142+
workflowRunTaskHandler::resetStartedEventId);
143143
}
144144

145145
if (useCache) {
@@ -230,7 +230,11 @@ private Result createCompletedWFTRequest(
230230
.setNonfirstLocalActivityExecutionAttempts(
231231
result.getNonfirstLocalActivityAttempts())
232232
.build())
233-
.setReturnNewWorkflowTask(result.isForceWorkflowTask());
233+
.setReturnNewWorkflowTask(result.isForceWorkflowTask())
234+
.setCapabilities(
235+
RespondWorkflowTaskCompletedRequest.Capabilities.newBuilder()
236+
.setDiscardSpeculativeWorkflowTaskWithEvents(true)
237+
.build());
234238

235239
if (stickyTaskQueue != null
236240
&& (stickyTaskQueueScheduleToStartTimeout == null

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ QueryResult handleDirectQueryWorkflowTask(
6161
*
6262
* @param eventId the event ID to reset the cached state to.
6363
*/
64-
void resetStartedEvenId(Long eventId);
64+
void resetStartedEventId(Long eventId);
6565

6666
void close();
6767
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public void setWorkflowStartedEventId(long workflowTaskStartedEventId) {
222222
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
223223
}
224224

225-
public void resetStartedEvenId(long eventId) {
225+
public void resetStartedEventId(long eventId) {
226226
// We must reset the last event we handled to be after the last WFT we really completed
227227
// + any command events (since the SDK "processed" those when it emitted the commands). This
228228
// is also equal to what we just processed in the speculative task, minus two, since we

temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -547,7 +547,7 @@ protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder<Void> build
547547
assertNotNull(rejection);
548548
assertEquals(request, rejection.getRejectedRequest());
549549
// Simulate the server request to reset the workflow event ID
550-
stateMachines.resetStartedEvenId(3);
550+
stateMachines.resetStartedEventId(3);
551551
// Create a new history after the reset event ID
552552
/*
553553
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED

temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ public QueryResult handleDirectQueryWorkflowTask(
809809
}
810810

811811
@Override
812-
public void resetStartedEvenId(Long event) {}
812+
public void resetStartedEventId(Long event) {}
813813

814814
@Override
815815
public void close() {
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.workflow.updateTest;
22+
23+
import static org.junit.Assert.assertEquals;
24+
import static org.junit.Assert.assertThrows;
25+
import static org.junit.Assume.assumeTrue;
26+
27+
import io.temporal.activity.ActivityOptions;
28+
import io.temporal.api.common.v1.WorkflowExecution;
29+
import io.temporal.client.*;
30+
import io.temporal.testing.internal.SDKTestOptions;
31+
import io.temporal.testing.internal.SDKTestWorkflowRule;
32+
import io.temporal.workflow.Async;
33+
import io.temporal.workflow.CompletablePromise;
34+
import io.temporal.workflow.Workflow;
35+
import io.temporal.workflow.shared.TestActivities;
36+
import io.temporal.workflow.shared.TestWorkflows.WorkflowWithUpdate;
37+
import java.time.Duration;
38+
import java.util.Optional;
39+
import java.util.Random;
40+
import java.util.UUID;
41+
import org.junit.Rule;
42+
import org.junit.Test;
43+
44+
public class SpeculativeUpdateTest {
45+
@Rule
46+
public SDKTestWorkflowRule testWorkflowRule =
47+
SDKTestWorkflowRule.newBuilder()
48+
.setWorkflowTypes(TestUpdateWorkflowImpl.class)
49+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
50+
.build();
51+
52+
@Test(timeout = 60000)
53+
public void speculativeUpdateRejected() {
54+
assumeTrue(
55+
"Test Server doesn't support speculative update yet",
56+
SDKTestWorkflowRule.useExternalService);
57+
58+
String workflowId = UUID.randomUUID().toString();
59+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
60+
WorkflowOptions options =
61+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
62+
.setWorkflowId(workflowId)
63+
.build();
64+
WorkflowWithUpdate workflow = workflowClient.newWorkflowStub(WorkflowWithUpdate.class, options);
65+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
66+
67+
workflow.update(3, "test value");
68+
// This update is going to be rejected, the resulting workflow task will not appear in history
69+
assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject"));
70+
71+
assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject"));
72+
// Create more events to make sure the server persists the workflow tasks
73+
workflow.update(12, "test value");
74+
// This update is going to be rejected, the resulting workflow task will appear in history
75+
assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject"));
76+
77+
assertThrows(WorkflowUpdateException.class, () -> workflow.update(0, "reject"));
78+
79+
workflow.complete();
80+
String result =
81+
testWorkflowRule
82+
.getWorkflowClient()
83+
.newUntypedWorkflowStub(execution, Optional.empty())
84+
.getResult(String.class);
85+
assertEquals("", result);
86+
}
87+
88+
public static class TestUpdateWorkflowImpl implements WorkflowWithUpdate {
89+
String state = "initial";
90+
CompletablePromise<Void> promise = Workflow.newPromise();
91+
92+
private final TestActivities.VariousTestActivities activities =
93+
Workflow.newActivityStub(
94+
TestActivities.VariousTestActivities.class,
95+
ActivityOptions.newBuilder()
96+
.setScheduleToCloseTimeout(Duration.ofSeconds(200))
97+
.build());
98+
99+
@Override
100+
public String execute() {
101+
promise.get();
102+
return "";
103+
}
104+
105+
@Override
106+
public String getState() {
107+
return state;
108+
}
109+
110+
@Override
111+
public String update(Integer index, String value) {
112+
Random random = Workflow.newRandom();
113+
for (int i = 0; i <= index; i++) {
114+
int choice = random.nextInt(3);
115+
if (choice == 0) {
116+
Async.function(activities::sleepActivity, new Long(10000), 0);
117+
} else if (choice == 1) {
118+
Workflow.getVersion("test version " + i, Workflow.DEFAULT_VERSION, 1);
119+
} else {
120+
Workflow.newTimer(Duration.ofMillis(10));
121+
}
122+
}
123+
return value;
124+
}
125+
126+
@Override
127+
public void updateValidator(Integer index, String value) {
128+
if (value.equals("reject")) {
129+
throw new RuntimeException("Rejecting update");
130+
}
131+
}
132+
133+
@Override
134+
public void complete() {
135+
promise.complete(null);
136+
}
137+
138+
@Override
139+
public void completeValidator() {}
140+
}
141+
}

0 commit comments

Comments
 (0)