Skip to content

Commit 2475838

Browse files
Reset lastHandledEventId on speculative WFT (#1881)
1 parent 720a9cb commit 2475838

File tree

3 files changed

+61
-0
lines changed

3 files changed

+61
-0
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,16 @@ public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
202202
}
203203

204204
public void setCurrentStartedEventId(long eventId) {
205+
// We have to drop any state machines (which should only be one workflow task machine)
206+
// created when handling the speculative workflow task
207+
for (long i = this.lastHandledEventId; i > eventId; i--) {
208+
stateMachines.remove(i);
209+
}
205210
this.currentStartedEventId = eventId;
211+
// When we reset the event ID on a speculative WFT we need to move this counter back
212+
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
213+
// always follows the WFT started.
214+
this.lastHandledEventId = eventId + 1;
206215
}
207216

208217
public long getCurrentStartedEventId() {

temporal-sdk/src/test/java/io/temporal/workflow/shared/TestWorkflows.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ public interface WorkflowWithUpdateAndSignal {
214214
@UpdateMethod()
215215
String update(String value);
216216

217+
@UpdateValidatorMethod(updateName = "update")
218+
void validator(String value);
219+
217220
@UpdateMethod
218221
void complete();
219222
}

temporal-sdk/src/test/java/io/temporal/workflow/updateTest/UpdateWithSignalAndQuery.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import static org.junit.Assert.*;
2424

2525
import io.temporal.api.common.v1.WorkflowExecution;
26+
import io.temporal.api.enums.v1.EventType;
2627
import io.temporal.client.WorkflowClient;
2728
import io.temporal.client.WorkflowOptions;
29+
import io.temporal.client.WorkflowUpdateException;
2830
import io.temporal.testing.internal.SDKTestOptions;
2931
import io.temporal.testing.internal.SDKTestWorkflowRule;
3032
import io.temporal.worker.WorkerOptions;
@@ -89,6 +91,45 @@ public void testUpdateWithSignal() {
8991
workflow.execute());
9092
}
9193

94+
@Test
95+
public void testSpeculativeUpdateWithSignal() {
96+
String workflowId = UUID.randomUUID().toString();
97+
WorkflowClient workflowClient = testWorkflowRule.getWorkflowClient();
98+
WorkflowOptions options =
99+
SDKTestOptions.newWorkflowOptionsWithTimeouts(testWorkflowRule.getTaskQueue()).toBuilder()
100+
.setWorkflowId(workflowId)
101+
.build();
102+
TestWorkflows.WorkflowWithUpdateAndSignal workflow =
103+
workflowClient.newWorkflowStub(TestWorkflows.WorkflowWithUpdateAndSignal.class, options);
104+
// To execute workflow client.execute() would do. But we want to start workflow and immediately
105+
// return.
106+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
107+
108+
assertEquals(workflowId, execution.getWorkflowId());
109+
110+
SDKTestWorkflowRule.waitForOKQuery(workflow);
111+
assertEquals("initial", workflow.getState());
112+
113+
for (int i = 0; i < 5; i++) {
114+
assertThrows(WorkflowUpdateException.class, () -> workflow.update(""));
115+
}
116+
117+
workflow.getState();
118+
119+
workflow.signal("signal 1");
120+
121+
assertEquals("update 1", workflow.update("update 1"));
122+
123+
workflow.signal("signal 2");
124+
125+
workflow.complete();
126+
127+
assertEquals(Arrays.asList("signal 1", "update 1", "signal 2"), workflow.execute());
128+
SDKTestWorkflowRule.assertNoHistoryEvent(
129+
workflowClient.fetchHistory(execution.getWorkflowId()).getHistory(),
130+
EventType.EVENT_TYPE_WORKFLOW_TASK_FAILED);
131+
}
132+
92133
public static class TestUpdateWithSignalWorkflowImpl
93134
implements TestWorkflows.WorkflowWithUpdateAndSignal {
94135
String state = "initial";
@@ -116,10 +157,18 @@ public void signal(String value) {
116157

117158
@Override
118159
public String update(String value) {
160+
Workflow.sleep(100);
119161
updatesAndSignals.add(value);
120162
return value;
121163
}
122164

165+
@Override
166+
public void validator(String value) {
167+
if (value.isEmpty()) {
168+
throw new RuntimeException("Empty value");
169+
}
170+
}
171+
123172
@Override
124173
public void complete() {
125174
promise.complete(null);

0 commit comments

Comments
 (0)