Skip to content

Commit e7e3fa6

Browse files
authored
Implement Nexus operation cancellation types (#2520)
* Implement Nexus operation cancellation types
1 parent bdc94f7 commit e7e3fa6

File tree

20 files changed

+1019
-115
lines changed

20 files changed

+1019
-115
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ jobs:
105105
--dynamic-config-value matching.useNewMatcher=true \
106106
--dynamic-config-value system.refreshNexusEndpointsMinWait=1000 \
107107
--dynamic-config-value component.callbacks.allowedAddresses='[{"Pattern":"*","AllowInsecure":true}]' \
108+
--dynamic-config-value component.nexusoperations.recordCancelRequestCompletionEvents=true \
108109
--dynamic-config-value frontend.workerVersioningWorkflowAPIs=true \
109110
--dynamic-config-value frontend.activityAPIsEnabled=true \
110111
--dynamic-config-value system.enableDeploymentVersions=true \

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContext.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,14 @@
22

33
import com.uber.m3.tally.Scope;
44
import io.temporal.api.command.v1.ContinueAsNewWorkflowExecutionCommandAttributes;
5-
import io.temporal.api.command.v1.ScheduleNexusOperationCommandAttributes;
65
import io.temporal.api.command.v1.SignalExternalWorkflowExecutionCommandAttributes;
76
import io.temporal.api.common.v1.*;
87
import io.temporal.api.failure.v1.Failure;
98
import io.temporal.api.sdk.v1.UserMetadata;
109
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
1110
import io.temporal.common.RetryOptions;
1211
import io.temporal.internal.common.SdkFlag;
13-
import io.temporal.internal.statemachines.ExecuteActivityParameters;
14-
import io.temporal.internal.statemachines.ExecuteLocalActivityParameters;
15-
import io.temporal.internal.statemachines.LocalActivityCallback;
16-
import io.temporal.internal.statemachines.StartChildWorkflowExecutionParameters;
12+
import io.temporal.internal.statemachines.*;
1713
import io.temporal.workflow.Functions;
1814
import io.temporal.workflow.Functions.Func;
1915
import io.temporal.workflow.Functions.Func1;
@@ -162,17 +158,15 @@ Functions.Proc1<Exception> startChildWorkflow(
162158
/**
163159
* Start a Nexus operation.
164160
*
165-
* @param attributes nexus operation attributes
166-
* @param metadata user metadata to be associated with the operation.
161+
* @param parameters encapsulates all the information required to schedule a Nexus operation
167162
* @param startedCallback callback that is called when the operation is start if async, or
168163
* completes if it is sync.
169164
* @param completionCallback callback that is called upon child workflow completion or failure
170165
* @return cancellation handle. Invoke {@link io.temporal.workflow.Functions.Proc1#apply(Object)}
171166
* to cancel activity task.
172167
*/
173168
Functions.Proc1<Exception> startNexusOperation(
174-
ScheduleNexusOperationCommandAttributes attributes,
175-
@Nullable UserMetadata metadata,
169+
StartNexusOperationParameters parameters,
176170
Functions.Proc2<Optional<String>, Failure> startedCallback,
177171
Functions.Proc2<Optional<Payload>, Failure> completionCallback);
178172

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowContextImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,11 @@ public Functions.Proc1<Exception> startChildWorkflow(
212212

213213
@Override
214214
public Functions.Proc1<Exception> startNexusOperation(
215-
ScheduleNexusOperationCommandAttributes attributes,
216-
@Nullable UserMetadata metadata,
215+
StartNexusOperationParameters parameters,
217216
Functions.Proc2<Optional<String>, Failure> startedCallback,
218217
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
219218
Functions.Proc cancellationHandler =
220-
workflowStateMachines.startNexusOperation(
221-
attributes, metadata, startedCallback, completionCallback);
219+
workflowStateMachines.startNexusOperation(parameters, startedCallback, completionCallback);
222220
return (exception) -> cancellationHandler.apply();
223221
}
224222

temporal-sdk/src/main/java/io/temporal/internal/statemachines/CancelNexusOperationStateMachine.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.temporal.api.command.v1.RequestCancelNexusOperationCommandAttributes;
55
import io.temporal.api.enums.v1.CommandType;
66
import io.temporal.api.enums.v1.EventType;
7+
import io.temporal.api.failure.v1.Failure;
78
import io.temporal.workflow.Functions;
89

910
/** CancelNexusOperationStateMachine manges a request to cancel a nexus operation. */
@@ -15,23 +16,29 @@ final class CancelNexusOperationStateMachine
1516

1617
private final RequestCancelNexusOperationCommandAttributes requestCancelNexusAttributes;
1718

19+
private final Functions.Proc2<Void, Failure> completionCallback;
20+
1821
/**
1922
* @param attributes attributes to use to cancel a nexus operation
2023
* @param commandSink sink to send commands
2124
*/
2225
public static void newInstance(
2326
RequestCancelNexusOperationCommandAttributes attributes,
27+
Functions.Proc2<Void, Failure> completionCallback,
2428
Functions.Proc1<CancellableCommand> commandSink,
2529
Functions.Proc1<StateMachine> stateMachineSink) {
26-
new CancelNexusOperationStateMachine(attributes, commandSink, stateMachineSink);
30+
new CancelNexusOperationStateMachine(
31+
attributes, completionCallback, commandSink, stateMachineSink);
2732
}
2833

2934
private CancelNexusOperationStateMachine(
3035
RequestCancelNexusOperationCommandAttributes attributes,
36+
Functions.Proc2<Void, Failure> completionCallback,
3137
Functions.Proc1<CancellableCommand> commandSink,
3238
Functions.Proc1<StateMachine> stateMachineSink) {
3339
super(STATE_MACHINE_DEFINITION, commandSink, stateMachineSink);
3440
this.requestCancelNexusAttributes = attributes;
41+
this.completionCallback = completionCallback;
3542
explicitEvent(ExplicitEvent.SCHEDULE);
3643
}
3744

@@ -42,14 +49,19 @@ enum ExplicitEvent {
4249
enum State {
4350
CREATED,
4451
REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_CREATED,
52+
REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_RECORDED,
4553
CANCEL_REQUESTED,
54+
REQUEST_CANCEL_FAILED,
4655
}
4756

4857
public static final StateMachineDefinition<State, ExplicitEvent, CancelNexusOperationStateMachine>
4958
STATE_MACHINE_DEFINITION =
5059
StateMachineDefinition
5160
.<State, ExplicitEvent, CancelNexusOperationStateMachine>newInstance(
52-
"CancelNexusOperation", State.CREATED, State.CANCEL_REQUESTED)
61+
"CancelNexusOperation",
62+
State.CREATED,
63+
State.CANCEL_REQUESTED,
64+
State.REQUEST_CANCEL_FAILED)
5365
.add(
5466
State.CREATED,
5567
ExplicitEvent.SCHEDULE,
@@ -62,8 +74,18 @@ enum State {
6274
.add(
6375
State.REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_CREATED,
6476
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUESTED,
77+
State.REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_RECORDED,
78+
EntityStateMachineInitialCommand::setInitialCommandEventId)
79+
.add(
80+
State.REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_RECORDED,
81+
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED,
6582
State.CANCEL_REQUESTED,
66-
CancelNexusOperationStateMachine::notifyCompleted);
83+
CancelNexusOperationStateMachine::notifyCompleted)
84+
.add(
85+
State.REQUEST_CANCEL_NEXUS_OPERATION_COMMAND_RECORDED,
86+
EventType.EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED,
87+
State.REQUEST_CANCEL_FAILED,
88+
CancelNexusOperationStateMachine::notifyFailed);
6789

6890
private void createCancelNexusCommand() {
6991
addCommand(
@@ -74,6 +96,12 @@ private void createCancelNexusCommand() {
7496
}
7597

7698
private void notifyCompleted() {
77-
setInitialCommandEventId();
99+
completionCallback.apply(null, null);
100+
}
101+
102+
private void notifyFailed() {
103+
Failure failure =
104+
currentEvent.getNexusOperationCancelRequestFailedEventAttributes().getFailure();
105+
completionCallback.apply(null, failure);
78106
}
79107
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/NexusOperationStateMachine.java

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ final class NexusOperationStateMachine
3737
private final String service;
3838
private final String operation;
3939

40+
public boolean isAsync() {
41+
return async;
42+
}
43+
4044
public boolean isCancellable() {
4145
return State.SCHEDULE_COMMAND_CREATED == getState();
4246
}
@@ -145,25 +149,13 @@ enum State {
145149

146150
private void cancelNexusOperationCommand() {
147151
cancelCommand();
148-
Failure canceledFailure =
152+
Failure cause =
149153
Failure.newBuilder()
150154
.setSource(JAVA_SDK)
151155
.setMessage("operation canceled before it was started")
152156
.setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
153157
.build();
154-
NexusOperationFailureInfo nexusFailureInfo =
155-
NexusOperationFailureInfo.newBuilder()
156-
.setEndpoint(endpoint)
157-
.setService(service)
158-
.setOperation(operation)
159-
.setScheduledEventId(getInitialCommandEventId())
160-
.build();
161-
Failure failure =
162-
Failure.newBuilder()
163-
.setNexusOperationExecutionFailureInfo(nexusFailureInfo)
164-
.setCause(canceledFailure)
165-
.setMessage(NEXUS_OPERATION_CANCELED_MESSAGE)
166-
.build();
158+
Failure failure = createCancelNexusOperationFailure(cause);
167159
startedCallback.apply(Optional.empty(), failure);
168160
completionCallback.apply(Optional.empty(), failure);
169161
}
@@ -263,4 +255,27 @@ public void createScheduleNexusTaskCommand() {
263255
scheduleAttributes = null;
264256
metadata = null;
265257
}
258+
259+
public Failure createCancelNexusOperationFailure(Failure cause) {
260+
if (cause == null) {
261+
cause =
262+
Failure.newBuilder()
263+
.setSource(JAVA_SDK)
264+
.setMessage("operation canceled")
265+
.setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
266+
.build();
267+
}
268+
NexusOperationFailureInfo nexusFailureInfo =
269+
NexusOperationFailureInfo.newBuilder()
270+
.setEndpoint(endpoint)
271+
.setService(service)
272+
.setOperation(operation)
273+
.setScheduledEventId(getInitialCommandEventId())
274+
.build();
275+
return Failure.newBuilder()
276+
.setNexusOperationExecutionFailureInfo(nexusFailureInfo)
277+
.setCause(cause)
278+
.setMessage(NEXUS_OPERATION_CANCELED_MESSAGE)
279+
.build();
280+
}
266281
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.temporal.internal.statemachines;
2+
3+
import io.temporal.api.command.v1.ScheduleNexusOperationCommandAttributes;
4+
import io.temporal.api.sdk.v1.UserMetadata;
5+
import io.temporal.workflow.NexusOperationCancellationType;
6+
import javax.annotation.Nullable;
7+
8+
public class StartNexusOperationParameters {
9+
10+
private final ScheduleNexusOperationCommandAttributes.Builder attributes;
11+
private final NexusOperationCancellationType cancellationType;
12+
private final UserMetadata metadata;
13+
14+
public StartNexusOperationParameters(
15+
ScheduleNexusOperationCommandAttributes.Builder attributes,
16+
NexusOperationCancellationType cancellationType,
17+
@Nullable UserMetadata metadata) {
18+
this.attributes = attributes;
19+
this.cancellationType = cancellationType;
20+
this.metadata = metadata;
21+
}
22+
23+
public ScheduleNexusOperationCommandAttributes.Builder getAttributes() {
24+
return attributes;
25+
}
26+
27+
public NexusOperationCancellationType getCancellationType() {
28+
return cancellationType;
29+
}
30+
31+
public @Nullable UserMetadata getMetadata() {
32+
return metadata;
33+
}
34+
}

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WorkflowStateMachines.java

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.temporal.api.command.v1.*;
1414
import io.temporal.api.common.v1.*;
1515
import io.temporal.api.enums.v1.EventType;
16+
import io.temporal.api.failure.v1.CanceledFailureInfo;
1617
import io.temporal.api.failure.v1.Failure;
1718
import io.temporal.api.history.v1.*;
1819
import io.temporal.api.protocol.v1.Message;
@@ -29,6 +30,7 @@
2930
import io.temporal.worker.WorkflowImplementationOptions;
3031
import io.temporal.workflow.ChildWorkflowCancellationType;
3132
import io.temporal.workflow.Functions;
33+
import io.temporal.workflow.NexusOperationCancellationType;
3234
import java.nio.charset.StandardCharsets;
3335
import java.util.*;
3436
import javax.annotation.Nonnull;
@@ -985,32 +987,71 @@ public Functions.Proc startChildWorkflow(
985987
}
986988

987989
public Functions.Proc startNexusOperation(
988-
ScheduleNexusOperationCommandAttributes attributes,
989-
@Nullable UserMetadata metadata,
990+
StartNexusOperationParameters parameters,
990991
Functions.Proc2<Optional<String>, Failure> startedCallback,
991992
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
992993
checkEventLoopExecuting();
994+
NexusOperationCancellationType cancellationType = parameters.getCancellationType();
993995
NexusOperationStateMachine operation =
994996
NexusOperationStateMachine.newInstance(
995-
attributes,
996-
metadata,
997+
parameters.getAttributes().build(),
998+
parameters.getMetadata(),
997999
startedCallback,
9981000
completionCallback,
9991001
commandSink,
10001002
stateMachineSink);
10011003
return () -> {
1004+
if (cancellationType == NexusOperationCancellationType.ABANDON) {
1005+
notifyNexusOperationCanceled(operation, startedCallback, completionCallback);
1006+
eventLoop();
1007+
return;
1008+
}
10021009
if (operation.isCancellable()) {
10031010
operation.cancel();
1011+
return;
10041012
}
10051013
if (!operation.isFinalState()) {
10061014
requestCancelNexusOperation(
10071015
RequestCancelNexusOperationCommandAttributes.newBuilder()
10081016
.setScheduledEventId(operation.getInitialCommandEventId())
1009-
.build());
1017+
.build(),
1018+
(r, f) -> {
1019+
if (cancellationType == NexusOperationCancellationType.WAIT_REQUESTED) {
1020+
notifyNexusOperationCanceled(f, operation, startedCallback, completionCallback);
1021+
}
1022+
});
1023+
if (cancellationType == NexusOperationCancellationType.TRY_CANCEL) {
1024+
notifyNexusOperationCanceled(operation, startedCallback, completionCallback);
1025+
eventLoop();
1026+
}
10101027
}
10111028
};
10121029
}
10131030

1031+
private void notifyNexusOperationCanceled(
1032+
NexusOperationStateMachine operation,
1033+
Functions.Proc2<Optional<String>, Failure> startedCallback,
1034+
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
1035+
Failure cause =
1036+
Failure.newBuilder()
1037+
.setMessage("operation canceled")
1038+
.setCanceledFailureInfo(CanceledFailureInfo.getDefaultInstance())
1039+
.build();
1040+
notifyNexusOperationCanceled(cause, operation, startedCallback, completionCallback);
1041+
}
1042+
1043+
private void notifyNexusOperationCanceled(
1044+
Failure cause,
1045+
NexusOperationStateMachine operation,
1046+
Functions.Proc2<Optional<String>, Failure> startedCallback,
1047+
Functions.Proc2<Optional<Payload>, Failure> completionCallback) {
1048+
Failure failure = operation.createCancelNexusOperationFailure(cause);
1049+
if (!operation.isAsync()) {
1050+
startedCallback.apply(Optional.empty(), failure);
1051+
}
1052+
completionCallback.apply(Optional.empty(), failure);
1053+
}
1054+
10141055
private void notifyChildCanceled(
10151056
Functions.Proc2<Optional<Payloads>, Exception> completionCallback) {
10161057
CanceledFailure failure = new CanceledFailure("Child canceled");
@@ -1044,10 +1085,15 @@ public void requestCancelExternalWorkflowExecution(
10441085

10451086
/**
10461087
* @param attributes attributes to use to cancel a nexus operation
1088+
* @param completionCallback one of NexusOperationCancelRequestCompleted or
1089+
* NexusOperationCancelRequestFailed events
10471090
*/
1048-
public void requestCancelNexusOperation(RequestCancelNexusOperationCommandAttributes attributes) {
1091+
public void requestCancelNexusOperation(
1092+
RequestCancelNexusOperationCommandAttributes attributes,
1093+
Functions.Proc2<Void, Failure> completionCallback) {
10491094
checkEventLoopExecuting();
1050-
CancelNexusOperationStateMachine.newInstance(attributes, commandSink, stateMachineSink);
1095+
CancelNexusOperationStateMachine.newInstance(
1096+
attributes, completionCallback, commandSink, stateMachineSink);
10511097
}
10521098

10531099
public void upsertSearchAttributes(SearchAttributes attributes) {
@@ -1540,6 +1586,12 @@ private OptionalLong getInitialCommandEventId(HistoryEvent event) {
15401586
case EVENT_TYPE_NEXUS_OPERATION_TIMED_OUT:
15411587
return OptionalLong.of(
15421588
event.getNexusOperationTimedOutEventAttributes().getScheduledEventId());
1589+
case EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED:
1590+
return OptionalLong.of(
1591+
event.getNexusOperationCancelRequestCompletedEventAttributes().getRequestedEventId());
1592+
case EVENT_TYPE_NEXUS_OPERATION_CANCEL_REQUEST_FAILED:
1593+
return OptionalLong.of(
1594+
event.getNexusOperationCancelRequestFailedEventAttributes().getRequestedEventId());
15431595
case EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
15441596
case EVENT_TYPE_TIMER_STARTED:
15451597
case EVENT_TYPE_MARKER_RECORDED:

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -804,10 +804,13 @@ public <R> ExecuteNexusOperationOutput<R> executeNexusOperation(
804804
makeUserMetaData(
805805
input.getOptions().getSummary(), null, dataConverterWithCurrentWorkflowContext);
806806

807+
StartNexusOperationParameters parameters =
808+
new StartNexusOperationParameters(
809+
attributes, input.getOptions().getCancellationType(), userMetadata);
810+
807811
Functions.Proc1<Exception> cancellationCallback =
808812
replayContext.startNexusOperation(
809-
attributes.build(),
810-
userMetadata,
813+
parameters,
811814
(operationExec, failure) -> {
812815
if (failure != null) {
813816
runner.executeInWorkflowThread(

0 commit comments

Comments
 (0)