Skip to content

Commit ee2f5d0

Browse files
Add update protocol commands (#1780)
Add update protocol commands
1 parent a73e9d9 commit ee2f5d0

File tree

5 files changed

+315
-40
lines changed

5 files changed

+315
-40
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/UpdateProtocolStateMachine.java

Lines changed: 86 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
import com.google.protobuf.Any;
2424
import com.google.protobuf.InvalidProtocolBufferException;
25+
import io.temporal.api.command.v1.Command;
26+
import io.temporal.api.command.v1.ProtocolMessageCommandAttributes;
2527
import io.temporal.api.common.v1.Payloads;
28+
import io.temporal.api.enums.v1.CommandType;
29+
import io.temporal.api.enums.v1.EventType;
2630
import io.temporal.api.failure.v1.Failure;
2731
import io.temporal.api.protocol.v1.Message;
2832
import io.temporal.api.update.v1.Acceptance;
@@ -53,7 +57,14 @@ enum State {
5357
NEW,
5458
REQUEST_INITIATED,
5559
ACCEPTED,
60+
ACCEPTED_COMMAND_CREATED,
61+
ACCEPTED_COMMAND_RECORDED,
5662
COMPLETED,
63+
COMPLETED_COMMAND_CREATED,
64+
COMPLETED_COMMAND_RECORDED,
65+
COMPLETED_IMMEDIATELY,
66+
COMPLETED_IMMEDIATELY_COMMAND_CREATED,
67+
COMPLETED_IMMEDIATELY_COMMAND_RECORDED
5768
}
5869

5970
private static final Logger log = LoggerFactory.getLogger(UpdateProtocolStateMachine.class);
@@ -67,19 +78,74 @@ enum State {
6778
private String requestMsgId;
6879
private long requestSeqID;
6980
private Request initialRequest;
81+
private String messageId;
7082

7183
public static final StateMachineDefinition<State, ExplicitEvent, UpdateProtocolStateMachine>
7284
STATE_MACHINE_DEFINITION =
7385
StateMachineDefinition.<State, ExplicitEvent, UpdateProtocolStateMachine>newInstance(
74-
"Update", State.NEW, State.COMPLETED)
86+
"Update", State.NEW, State.COMPLETED_COMMAND_RECORDED)
7587
.add(
7688
State.NEW,
7789
ProtocolType.UPDATE_V1,
7890
State.REQUEST_INITIATED,
7991
UpdateProtocolStateMachine::triggerUpdate)
80-
.add(State.REQUEST_INITIATED, ExplicitEvent.ACCEPT, State.ACCEPTED)
81-
.add(State.REQUEST_INITIATED, ExplicitEvent.REJECT, State.COMPLETED)
82-
.add(State.ACCEPTED, ExplicitEvent.COMPLETE, State.COMPLETED);
92+
.add(
93+
State.REQUEST_INITIATED,
94+
ExplicitEvent.ACCEPT,
95+
State.ACCEPTED,
96+
UpdateProtocolStateMachine::sendCommandMessage)
97+
.add(
98+
State.ACCEPTED,
99+
CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
100+
State.ACCEPTED_COMMAND_CREATED)
101+
.add(
102+
State.ACCEPTED_COMMAND_CREATED,
103+
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
104+
State.ACCEPTED_COMMAND_RECORDED)
105+
.add(
106+
State.ACCEPTED_COMMAND_RECORDED,
107+
ExplicitEvent.COMPLETE,
108+
State.COMPLETED,
109+
UpdateProtocolStateMachine::sendCommandMessage)
110+
.add(
111+
State.COMPLETED,
112+
CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
113+
State.COMPLETED_COMMAND_CREATED)
114+
.add(
115+
State.COMPLETED_COMMAND_CREATED,
116+
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED,
117+
State.COMPLETED_COMMAND_RECORDED)
118+
// Handle the validation failure case
119+
.add(State.REQUEST_INITIATED, ExplicitEvent.REJECT, State.COMPLETED_COMMAND_RECORDED)
120+
// Handle an edge case when the update handle completes immediately. The state machine
121+
// should then expect
122+
// to see two protocol command messages back to back then two update events.
123+
.add(
124+
State.ACCEPTED,
125+
ExplicitEvent.COMPLETE,
126+
State.COMPLETED_IMMEDIATELY,
127+
UpdateProtocolStateMachine::sendCommandMessage)
128+
.add(
129+
State.COMPLETED_IMMEDIATELY,
130+
CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
131+
State.COMPLETED_IMMEDIATELY_COMMAND_CREATED)
132+
.add(
133+
State.COMPLETED_IMMEDIATELY_COMMAND_CREATED,
134+
CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE,
135+
State.COMPLETED_IMMEDIATELY_COMMAND_RECORDED)
136+
.add(
137+
State.COMPLETED_IMMEDIATELY_COMMAND_RECORDED,
138+
EventType.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
139+
State.COMPLETED_COMMAND_CREATED)
140+
// Handle an edge case when an update handle completes after it has sent the protocol
141+
// message command
142+
// but has not seen the corresponding event. This can happen if the update handle runs
143+
// a local activity
144+
.add(
145+
State.ACCEPTED_COMMAND_CREATED,
146+
ExplicitEvent.COMPLETE,
147+
State.COMPLETED_IMMEDIATELY_COMMAND_CREATED,
148+
UpdateProtocolStateMachine::sendCommandMessage);
83149

84150
public static UpdateProtocolStateMachine newInstance(
85151
Functions.Func<Boolean> replaying,
@@ -115,10 +181,18 @@ void triggerUpdate() {
115181
UpdateMessage updateMessage =
116182
new UpdateMessage(this.currentMessage, new UpdateProtocolCallbackImpl());
117183

118-
// TODO send ProtocolMessage command when server supports
119184
updateHandle.apply(updateMessage);
120185
}
121186

187+
void sendCommandMessage() {
188+
addCommand(
189+
Command.newBuilder()
190+
.setCommandType(CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE)
191+
.setProtocolMessageCommandAttributes(
192+
ProtocolMessageCommandAttributes.newBuilder().setMessageId(messageId))
193+
.build());
194+
}
195+
122196
public void accept() {
123197
Acceptance acceptResponse =
124198
Acceptance.newBuilder()
@@ -127,13 +201,13 @@ public void accept() {
127201
.setAcceptedRequest(initialRequest)
128202
.build();
129203

204+
messageId = requestMsgId + "/accept";
130205
sendHandle.apply(
131206
Message.newBuilder()
132-
.setId(requestMsgId + "/accept")
207+
.setId(messageId)
133208
.setProtocolInstanceId(protoInstanceID)
134209
.setBody(Any.pack(acceptResponse))
135210
.build());
136-
// TODO send ProtocolMessage command when server supports
137211
explicitEvent(ExplicitEvent.ACCEPT);
138212
}
139213

@@ -146,9 +220,10 @@ public void reject(Failure failure) {
146220
.setFailure(failure)
147221
.build();
148222

223+
String messageId = requestMsgId + "/reject";
149224
sendHandle.apply(
150225
Message.newBuilder()
151-
.setId(requestMsgId + "/reject")
226+
.setId(messageId)
152227
.setProtocolInstanceId(protoInstanceID)
153228
.setBody(Any.pack(rejectResponse))
154229
.build());
@@ -165,10 +240,11 @@ public void complete(Optional<Payloads> payload, Failure failure) {
165240

166241
Response outcomeResponse =
167242
Response.newBuilder().setOutcome(outcome).setMeta(initialRequest.getMeta()).build();
168-
// TODO send ProtocolMessage command when server supports
243+
244+
messageId = requestMsgId + "/complete";
169245
sendHandle.apply(
170246
Message.newBuilder()
171-
.setId(requestMsgId + "/complete")
247+
.setId(messageId)
172248
.setProtocolInstanceId(protoInstanceID)
173249
.setBody(Any.pack(outcomeResponse))
174250
.build());

temporal-sdk/src/main/java/io/temporal/internal/statemachines/UpdateProtocolStateMachine.puml

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,19 @@
2525
title Update State Transitions
2626

2727
[*] --> NEW
28-
ACCEPTED --> COMPLETED: COMPLETE
28+
ACCEPTED --> ACCEPTED_COMMAND_CREATED: PROTOCOL_MESSAGE
29+
ACCEPTED --> COMPLETED_IMMEDIATELY: COMPLETE
30+
ACCEPTED_COMMAND_CREATED --> ACCEPTED_COMMAND_RECORDED: WORKFLOW_EXECUTION_UPDATE_ACCEPTED
31+
ACCEPTED_COMMAND_CREATED --> COMPLETED_IMMEDIATELY_COMMAND_CREATED: COMPLETE
32+
ACCEPTED_COMMAND_RECORDED --> COMPLETED: COMPLETE
33+
COMPLETED --> COMPLETED_COMMAND_CREATED: PROTOCOL_MESSAGE
34+
COMPLETED_COMMAND_CREATED --> COMPLETED_COMMAND_RECORDED: WORKFLOW_EXECUTION_UPDATE_COMPLETED
35+
COMPLETED_IMMEDIATELY --> COMPLETED_IMMEDIATELY_COMMAND_CREATED: PROTOCOL_MESSAGE
36+
COMPLETED_IMMEDIATELY_COMMAND_CREATED --> COMPLETED_IMMEDIATELY_COMMAND_RECORDED: PROTOCOL_MESSAGE
37+
COMPLETED_IMMEDIATELY_COMMAND_RECORDED --> COMPLETED_COMMAND_CREATED: WORKFLOW_EXECUTION_UPDATE_ACCEPTED
2938
NEW --> REQUEST_INITIATED: UPDATE_V1
3039
REQUEST_INITIATED --> ACCEPTED: ACCEPT
31-
REQUEST_INITIATED --> COMPLETED: REJECT
32-
COMPLETED --> [*]
40+
REQUEST_INITIATED --> COMPLETED_COMMAND_RECORDED: REJECT
41+
COMPLETED_COMMAND_RECORDED --> [*]
3342
center footer Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3443
@enduml

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
package io.temporal.internal.statemachines;
2222

23+
import static io.temporal.api.enums.v1.CommandType.COMMAND_TYPE_PROTOCOL_MESSAGE;
2324
import static io.temporal.internal.common.WorkflowExecutionUtils.getEventTypeForCommand;
2425
import static io.temporal.internal.common.WorkflowExecutionUtils.isCommandEvent;
2526
import static io.temporal.serviceclient.CheckedExceptionWrapper.unwrap;
@@ -28,14 +29,7 @@
2829
import com.google.common.base.Preconditions;
2930
import com.google.common.base.Strings;
3031
import com.google.protobuf.Any;
31-
import io.temporal.api.command.v1.CancelWorkflowExecutionCommandAttributes;
32-
import io.temporal.api.command.v1.Command;
33-
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
34-
import io.temporal.api.command.v1.RequestCancelExternalWorkflowExecutionCommandAttributes;
35-
import io.temporal.api.command.v1.ScheduleActivityTaskCommandAttributes;
36-
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
37-
import io.temporal.api.command.v1.StartChildWorkflowExecutionCommandAttributes;
38-
import io.temporal.api.command.v1.StartTimerCommandAttributes;
32+
import io.temporal.api.command.v1.*;
3933
import io.temporal.api.common.v1.Payloads;
4034
import io.temporal.api.common.v1.SearchAttributes;
4135
import io.temporal.api.common.v1.WorkflowExecution;
@@ -387,14 +381,7 @@ private void handleCommandEvent(HistoryEvent event) {
387381
if (handleLocalActivityMarker(event)) {
388382
return;
389383
}
390-
// Currently Update events are command events that have no
391-
// associated command so there is nothing to handle currently.
392-
// Once the server supports ProtocolMessageCommand we can handle them here.
393-
// TODO(https://github.com/temporalio/sdk-java/issues/1744)
394-
if (event.hasWorkflowExecutionUpdateAcceptedEventAttributes()
395-
|| event.hasWorkflowExecutionUpdateCompletedEventAttributes()) {
396-
return;
397-
}
384+
398385
// Match event to the next command in the stateMachine queue.
399386
// After matching the command is notified about the event and is removed from the
400387
// queue.
@@ -952,6 +939,43 @@ public Functions.Proc scheduleLocalActivityTask(
952939
private void validateCommand(Command command, HistoryEvent event) {
953940
// TODO(maxim): Add more thorough validation logic. For example check if activity IDs are
954941
// matching.
942+
943+
// ProtocolMessageCommand is different from other commands because it can be associated with
944+
// multiple types of events
945+
// TODO(#1781) Validate protocol message is expected type.
946+
if (command.getCommandType() == COMMAND_TYPE_PROTOCOL_MESSAGE) {
947+
ProtocolMessageCommandAttributes commandAttributes =
948+
command.getProtocolMessageCommandAttributes();
949+
switch (event.getEventType()) {
950+
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
951+
assertMatch(
952+
command,
953+
event,
954+
"messageType",
955+
true,
956+
commandAttributes.getMessageId().endsWith("accept"));
957+
break;
958+
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_REJECTED:
959+
assertMatch(
960+
command,
961+
event,
962+
"messageType",
963+
true,
964+
commandAttributes.getMessageId().endsWith("reject"));
965+
break;
966+
case EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_COMPLETED:
967+
assertMatch(
968+
command,
969+
event,
970+
"messageType",
971+
true,
972+
commandAttributes.getMessageId().endsWith("complete"));
973+
break;
974+
default:
975+
throw new IllegalArgumentException("Unexpected event type: " + event.getEventType());
976+
}
977+
return;
978+
}
955979
assertMatch(
956980
command,
957981
event,
@@ -1021,6 +1045,7 @@ private void validateCommand(Command command, HistoryEvent event) {
10211045
case COMMAND_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES:
10221046
case COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION:
10231047
case COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
1048+
case COMMAND_TYPE_PROTOCOL_MESSAGE:
10241049
break;
10251050
case UNRECOGNIZED:
10261051
case COMMAND_TYPE_UNSPECIFIED:

0 commit comments

Comments
 (0)