Skip to content

Commit 4f781b3

Browse files
Allow SDK to handle speculative workflow task with command events (#2099)
Allow SDK to handle speculative workflow task with command events
1 parent 42b9803 commit 4f781b3

File tree

11 files changed

+271
-23
lines changed

11 files changed

+271
-23
lines changed

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,14 @@ public QueryResult handleDirectQueryWorkflowTask(
218218
}
219219

220220
@Override
221-
public void setCurrentStartedEvenId(Long eventId) {
222-
workflowStateMachines.setLastWFTStartedEventId(eventId);
221+
public void resetStartedEvenId(Long eventId) {
222+
workflowStateMachines.resetStartedEvenId(eventId);
223223
}
224224

225225
private void handleWorkflowTaskImpl(
226226
PollWorkflowTaskQueueResponseOrBuilder workflowTask,
227227
WorkflowHistoryIterator historyIterator) {
228-
workflowStateMachines.setWorklfowStartedEventId(workflowTask.getStartedEventId());
228+
workflowStateMachines.setWorkflowStartedEventId(workflowTask.getStartedEventId());
229229
workflowStateMachines.setReplaying(workflowTask.getPreviousStartedEventId() > 0);
230230
workflowStateMachines.setMessages(workflowTask.getMessagesList());
231231
applyServerHistory(workflowTask.getStartedEventId(), historyIterator);

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ private Result handleWorkflowTaskWithQuery(
139139
workflowTask.getWorkflowType().getName(),
140140
workflowTask,
141141
wftResult,
142-
workflowRunTaskHandler::setCurrentStartedEvenId);
142+
workflowRunTaskHandler::resetStartedEvenId);
143143
}
144144

145145
if (useCache) {

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowRunTaskHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,11 @@ QueryResult handleDirectQueryWorkflowTask(
5757
throws Throwable;
5858

5959
/**
60-
* Reset the workflow event Id.
60+
* Reset the workflow event ID.
6161
*
62-
* @param eventId the event Id to reset the cached state to.
62+
* @param eventId the event ID to reset the cached state to.
6363
*/
64-
void setCurrentStartedEvenId(Long eventId);
64+
void resetStartedEvenId(Long eventId);
6565

6666
void close();
6767
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,21 +217,24 @@ public WorkflowStateMachines(
217217
* and triggered an execution. Used in {@link WorkflowTaskStateMachine} only to understand
218218
* that this workflow task will not have a matching closing event and needs to be executed.
219219
*/
220-
public void setWorklfowStartedEventId(long workflowTaskStartedEventId) {
220+
public void setWorkflowStartedEventId(long workflowTaskStartedEventId) {
221221
this.workflowTaskStartedEventId = workflowTaskStartedEventId;
222222
}
223223

224-
public void setLastWFTStartedEventId(long eventId) {
224+
public void resetStartedEvenId(long eventId) {
225+
// We must reset the last event we handled to be after the last WFT we really completed
226+
// + any command events (since the SDK "processed" those when it emitted the commands). This
227+
// is also equal to what we just processed in the speculative task, minus two, since we
228+
// would've just handled the most recent WFT started event, and we need to drop that & the
229+
// schedule event just before it.
230+
long resetLastHandledEventId = this.lastHandledEventId - 2;
225231
// We have to drop any state machines (which should only be one workflow task machine)
226232
// created when handling the speculative workflow task
227-
for (long i = this.lastHandledEventId; i > eventId; i--) {
233+
for (long i = this.lastHandledEventId; i > resetLastHandledEventId; i--) {
228234
stateMachines.remove(i);
229235
}
230236
this.lastWFTStartedEventId = eventId;
231-
// When we reset the event ID on a speculative WFT we need to move this counter back
232-
// to the last WFT completed to allow new tasks to be processed. Assume the WFT complete
233-
// always follows the WFT started.
234-
this.lastHandledEventId = eventId + 1;
237+
this.lastHandledEventId = resetLastHandledEventId;
235238
}
236239

237240
public long getLastWFTStartedEventId() {

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowTaskHandler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ final class Result {
4141
private final RespondQueryTaskCompletedRequest queryCompleted;
4242
private final RpcRetryOptions requestRetryOptions;
4343
private final boolean completionCommand;
44-
private final Functions.Proc1<Long> eventIdSetHandle;
44+
private final Functions.Proc1<Long> resetEventIdHandle;
4545

4646
public Result(
4747
String workflowType,
@@ -50,14 +50,14 @@ public Result(
5050
RespondQueryTaskCompletedRequest queryCompleted,
5151
RpcRetryOptions requestRetryOptions,
5252
boolean completionCommand,
53-
Functions.Proc1<Long> eventIdSetHandle) {
53+
Functions.Proc1<Long> resetEventIdHandle) {
5454
this.workflowType = workflowType;
5555
this.taskCompleted = taskCompleted;
5656
this.taskFailed = taskFailed;
5757
this.queryCompleted = queryCompleted;
5858
this.requestRetryOptions = requestRetryOptions;
5959
this.completionCommand = completionCommand;
60-
this.eventIdSetHandle = eventIdSetHandle;
60+
this.resetEventIdHandle = resetEventIdHandle;
6161
}
6262

6363
public RespondWorkflowTaskCompletedRequest getTaskCompleted() {
@@ -80,9 +80,9 @@ public boolean isCompletionCommand() {
8080
return completionCommand;
8181
}
8282

83-
public Functions.Proc1<Long> getEventIdSetHandle() {
84-
if (eventIdSetHandle != null) {
85-
return eventIdSetHandle;
83+
public Functions.Proc1<Long> getResetEventIdHandle() {
84+
if (resetEventIdHandle != null) {
85+
return resetEventIdHandle;
8686
}
8787
return (arg) -> {};
8888
}

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public void handle(WorkflowTask task) throws Exception {
363363
// was dropped by resting out event ID.
364364
long resetEventId = response.getResetHistoryEventId();
365365
if (resetEventId != 0) {
366-
result.getEventIdSetHandle().apply(resetEventId);
366+
result.getResetEventIdHandle().apply(resetEventId);
367367
}
368368
nextWFTResponse =
369369
response.hasWorkflowTask()

temporal-sdk/src/test/java/io/temporal/internal/statemachines/TestHistoryBuilder.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ TestHistoryBuilder add(EventType type) {
5151
return this;
5252
}
5353

54+
History getHistory() {
55+
return History.newBuilder().addAllEvents(events).build();
56+
}
57+
5458
long addGetEventId(EventType type) {
5559
return addGetEventId(type, null);
5660
}
@@ -293,7 +297,7 @@ public void handleWorkflowTask(
293297
this.events.subList((int) stateMachines.getLastStartedEventId(), this.events.size());
294298
PeekingIterator<HistoryEvent> history = Iterators.peekingIterator(events.iterator());
295299
HistoryInfo info = getHistoryInfo(replayToTaskIndex);
296-
stateMachines.setWorklfowStartedEventId(info.getWorkflowTaskStartedEventId());
300+
stateMachines.setWorkflowStartedEventId(info.getWorkflowTaskStartedEventId());
297301
stateMachines.setReplaying(info.getPreviousStartedEventId() > 0);
298302

299303
long wftStartedEventId = -1;

temporal-sdk/src/test/java/io/temporal/internal/statemachines/UpdateProtocolStateMachineTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,130 @@ protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder)
451451
}
452452
}
453453

454+
@Test
455+
public void testUpdateRejectedAndReset() throws InvalidProtocolBufferException {
456+
class TestUpdateListener extends TestEntityManagerListenerBase {
457+
458+
@Override
459+
public void buildWorkflow(AsyncWorkflowBuilder<Void> builder) {
460+
builder.<HistoryEvent>add1(
461+
(v, c) ->
462+
stateMachines.newTimer(
463+
StartTimerCommandAttributes.newBuilder()
464+
.setTimerId("timer1")
465+
.setStartToFireTimeout(
466+
ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1)))
467+
.build(),
468+
c));
469+
}
470+
471+
@Override
472+
protected void update(UpdateMessage message, AsyncWorkflowBuilder<Void> builder) {
473+
builder.add(
474+
(r) -> {
475+
message
476+
.getCallbacks()
477+
.reject(converter.exceptionToFailure(new RuntimeException("test failure")));
478+
});
479+
}
480+
481+
@Override
482+
protected void signal(HistoryEvent signalEvent, AsyncWorkflowBuilder<Void> builder) {
483+
builder.<HistoryEvent>add1(
484+
(v, c) ->
485+
stateMachines.newTimer(
486+
StartTimerCommandAttributes.newBuilder()
487+
.setTimerId("timer2")
488+
.setStartToFireTimeout(
489+
ProtobufTimeUtils.toProtoDuration(Duration.ofHours(1)))
490+
.build(),
491+
c));
492+
}
493+
}
494+
495+
/*
496+
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
497+
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
498+
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
499+
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
500+
5: EVENT_TYPE_TIMER_STARTED
501+
6: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
502+
7: EVENT_TYPE_WORKFLOW_TASK_STARTED
503+
*/
504+
505+
TestHistoryBuilder h = new TestHistoryBuilder();
506+
{
507+
TestEntityManagerListenerBase listener = new TestUpdateListener();
508+
stateMachines = newStateMachines(listener);
509+
h.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED);
510+
h.addWorkflowTask();
511+
h.add(
512+
EventType.EVENT_TYPE_TIMER_STARTED,
513+
TimerStartedEventAttributes.newBuilder().setTimerId("timer1"));
514+
h.addWorkflowTaskScheduled();
515+
h.addWorkflowTaskStarted();
516+
}
517+
{
518+
// Full replay
519+
TestEntityManagerListenerBase listener = new TestUpdateListener();
520+
stateMachines = newStateMachines(listener);
521+
Request request =
522+
Request.newBuilder()
523+
.setInput(
524+
Input.newBuilder()
525+
.setName("updateName")
526+
.setArgs(converter.toPayloads("arg").get()))
527+
.build();
528+
stateMachines.setMessages(
529+
Collections.unmodifiableList(
530+
Arrays.asList(
531+
new Message[] {
532+
Message.newBuilder()
533+
.setProtocolInstanceId("protocol_id")
534+
.setId("id")
535+
.setEventId(6)
536+
.setBody(Any.pack(request))
537+
.build(),
538+
})));
539+
List<Command> commands = h.handleWorkflowTaskTakeCommands(stateMachines);
540+
assertEquals(0, commands.size());
541+
List<Message> messages = stateMachines.takeMessages();
542+
assertEquals(1, messages.size());
543+
Rejection rejection = messages.get(0).getBody().unpack(Rejection.class);
544+
assertNotNull(rejection);
545+
assertEquals(request, rejection.getRejectedRequest());
546+
// Simulate the server request to reset the workflow event ID
547+
stateMachines.resetStartedEvenId(3);
548+
// Create a new history after the reset event ID
549+
/*
550+
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
551+
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
552+
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
553+
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
554+
5: EVENT_TYPE_TIMER_STARTED
555+
6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
556+
7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
557+
8: EVENT_TYPE_WORKFLOW_TASK_STARTED
558+
*/
559+
TestHistoryBuilder historyAfterReset = new TestHistoryBuilder();
560+
historyAfterReset.add(EventType.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED);
561+
historyAfterReset.addWorkflowTask();
562+
historyAfterReset.add(
563+
EventType.EVENT_TYPE_TIMER_STARTED,
564+
TimerStartedEventAttributes.newBuilder().setTimerId("timer1"));
565+
historyAfterReset.add(
566+
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED,
567+
WorkflowExecutionSignaledEventAttributes.newBuilder().setSignalName("signal1"));
568+
historyAfterReset.addWorkflowTaskScheduled();
569+
historyAfterReset.addWorkflowTaskStarted();
570+
// Test new history with the old workflow state machines
571+
commands = historyAfterReset.handleWorkflowTaskTakeCommands(stateMachines, 1, 2);
572+
assertEquals(1, commands.size());
573+
messages = stateMachines.takeMessages();
574+
assertEquals(0, messages.size());
575+
}
576+
}
577+
454578
@Test
455579
public void testUpdateAdmittedAndCompletedImmediately() {
456580
class TestUpdateListener extends TestEntityManagerListenerBase {

temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -809,7 +809,7 @@ public QueryResult handleDirectQueryWorkflowTask(
809809
}
810810

811811
@Override
812-
public void setCurrentStartedEvenId(Long event) {}
812+
public void resetStartedEvenId(Long event) {}
813813

814814
@Override
815815
public void close() {

0 commit comments

Comments
 (0)