Skip to content

Commit a0147d5

Browse files
authored
Nexus cancellation types (#981)
* Cancellation types for Nexus operations invoked by workflows * update core to include cancellation types PR
1 parent 55d5d7c commit a0147d5

File tree

8 files changed

+1099
-38
lines changed

8 files changed

+1099
-38
lines changed

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
298298
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
299299
input: InputT
300300
schedule_to_close_timeout: Optional[timedelta]
301+
cancellation_type: temporalio.workflow.NexusOperationCancellationType
301302
headers: Optional[Mapping[str, str]]
302303
output_type: Optional[Type[OutputT]] = None
303304

temporalio/worker/_workflow_instance.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import temporalio.bridge.proto.activity_result
5555
import temporalio.bridge.proto.child_workflow
5656
import temporalio.bridge.proto.common
57+
import temporalio.bridge.proto.nexus
5758
import temporalio.bridge.proto.workflow_activation
5859
import temporalio.bridge.proto.workflow_commands
5960
import temporalio.bridge.proto.workflow_completion
6061
import temporalio.common
6162
import temporalio.converter
6263
import temporalio.exceptions
63-
import temporalio.nexus
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

@@ -881,9 +881,17 @@ def _apply_resolve_nexus_operation(
881881
) -> None:
882882
handle = self._pending_nexus_operations.pop(job.seq, None)
883883
if not handle:
884-
raise RuntimeError(
885-
f"Failed to find nexus operation handle for job sequence number {job.seq}"
886-
)
884+
# One way this can occur is:
885+
# 1. Cancel request issued with cancellation_type=WaitRequested.
886+
# 2. Server receives nexus cancel handler task completion and writes
887+
# NexusOperationCancelRequestCompleted / NexusOperationCancelRequestFailed. On
888+
# consuming this event, core sends an activation resolving the handle future as
889+
# completed / failed.
890+
# 4. Subsequently, the nexus operation completes as completed/failed, causing the server
891+
# to write NexusOperationCompleted / NexusOperationFailed. On consuming this event,
892+
# core sends an activation which would attempt to resolve the handle future as
893+
# completed / failed, but it has already been resolved.
894+
return
887895

888896
# Handle the four oneof variants of NexusOperationResult
889897
result = job.result
@@ -1500,9 +1508,10 @@ async def workflow_start_nexus_operation(
15001508
service: str,
15011509
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
15021510
input: Any,
1503-
output_type: Optional[Type[OutputT]] = None,
1504-
schedule_to_close_timeout: Optional[timedelta] = None,
1505-
headers: Optional[Mapping[str, str]] = None,
1511+
output_type: Optional[Type[OutputT]],
1512+
schedule_to_close_timeout: Optional[timedelta],
1513+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1514+
headers: Optional[Mapping[str, str]],
15061515
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15071516
# start_nexus_operation
15081517
return await self._outbound.start_nexus_operation(
@@ -1513,6 +1522,7 @@ async def workflow_start_nexus_operation(
15131522
input=input,
15141523
output_type=output_type,
15151524
schedule_to_close_timeout=schedule_to_close_timeout,
1525+
cancellation_type=cancellation_type,
15161526
headers=headers,
15171527
)
15181528
)
@@ -1824,20 +1834,19 @@ async def run_child() -> Any:
18241834
async def _outbound_start_nexus_operation(
18251835
self, input: StartNexusOperationInput[Any, OutputT]
18261836
) -> _NexusOperationHandle[OutputT]:
1827-
# A Nexus operation handle contains two futures: self._start_fut is resolved as a
1828-
# result of the Nexus operation starting (activation job:
1829-
# resolve_nexus_operation_start), and self._result_fut is resolved as a result of
1830-
# the Nexus operation completing (activation job: resolve_nexus_operation). The
1831-
# handle itself corresponds to an asyncio.Task which waits on self.result_fut,
1832-
# handling CancelledError by emitting a RequestCancelNexusOperation command. We do
1833-
# not return the handle until we receive resolve_nexus_operation_start, like
1834-
# ChildWorkflowHandle and unlike ActivityHandle. Note that a Nexus operation may
1835-
# complete synchronously (in which case both jobs will be sent in the same
1836-
# activation, and start will be resolved without an operation token), or
1837-
# asynchronously (in which case start they may be sent in separate activations,
1838-
# and start will be resolved with an operation token). See comments in
1839-
# tests/worker/test_nexus.py for worked examples of the evolution of the resulting
1840-
# handle state machine in the sync and async Nexus response cases.
1837+
# A Nexus operation handle contains two futures: self._start_fut is resolved as a result of
1838+
# the Nexus operation starting (activation job: resolve_nexus_operation_start), and
1839+
# self._result_fut is resolved as a result of the Nexus operation completing (activation
1840+
# job: resolve_nexus_operation). The handle itself corresponds to an asyncio.Task which
1841+
# waits on self.result_fut, handling CancelledError by emitting a
1842+
# RequestCancelNexusOperation command. We do not return the handle until we receive
1843+
# resolve_nexus_operation_start, like ChildWorkflowHandle and unlike ActivityHandle. Note
1844+
# that a Nexus operation may complete synchronously (in which case both jobs will be sent in
1845+
# the same activation, and start will be resolved without an operation token), or
1846+
# asynchronously (in which case they may be sent in separate activations, and start will be
1847+
# resolved with an operation token). See comments in tests/worker/test_nexus.py for worked
1848+
# examples of the evolution of the resulting handle state machine in the sync and async
1849+
# Nexus response cases.
18411850
handle: _NexusOperationHandle[OutputT]
18421851

18431852
async def operation_handle_fn() -> OutputT:
@@ -2758,7 +2767,7 @@ def _apply_schedule_command(
27582767
if self._input.retry_policy:
27592768
self._input.retry_policy.apply_to_proto(v.retry_policy)
27602769
v.cancellation_type = cast(
2761-
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
2770+
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27622771
int(self._input.cancellation_type),
27632772
)
27642773

@@ -2894,7 +2903,7 @@ def _apply_start_command(self) -> None:
28942903
if self._input.task_timeout:
28952904
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
28962905
v.parent_close_policy = cast(
2897-
"temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType",
2906+
temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType,
28982907
int(self._input.parent_close_policy),
28992908
)
29002909
v.workflow_id_reuse_policy = cast(
@@ -2916,7 +2925,7 @@ def _apply_start_command(self) -> None:
29162925
self._input.search_attributes, v.search_attributes
29172926
)
29182927
v.cancellation_type = cast(
2919-
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
2928+
temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType,
29202929
int(self._input.cancellation_type),
29212930
)
29222931
if self._input.versioning_intent:
@@ -3012,11 +3021,6 @@ def __init__(
30123021

30133022
@property
30143023
def operation_token(self) -> Optional[str]:
3015-
# TODO(nexus-preview): How should this behave?
3016-
# Java has a separate class that only exists if the operation token exists:
3017-
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26
3018-
# And Go similar:
3019-
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771
30203024
try:
30213025
return self._start_fut.result()
30223026
except BaseException:
@@ -3065,6 +3069,11 @@ def _apply_schedule_command(self) -> None:
30653069
v.schedule_to_close_timeout.FromTimedelta(
30663070
self._input.schedule_to_close_timeout
30673071
)
3072+
v.cancellation_type = cast(
3073+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
3074+
int(self._input.cancellation_type),
3075+
)
3076+
30683077
if self._input.headers:
30693078
for key, val in self._input.headers.items():
30703079
v.nexus_header[key] = val

temporalio/workflow.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -859,9 +859,10 @@ async def workflow_start_nexus_operation(
859859
service: str,
860860
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
861861
input: Any,
862-
output_type: Optional[Type[OutputT]] = None,
863-
schedule_to_close_timeout: Optional[timedelta] = None,
864-
headers: Optional[Mapping[str, str]] = None,
862+
output_type: Optional[Type[OutputT]],
863+
schedule_to_close_timeout: Optional[timedelta],
864+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
865+
headers: Optional[Mapping[str, str]],
865866
) -> NexusOperationHandle[OutputT]: ...
866867

867868
@abstractmethod
@@ -1322,9 +1323,9 @@ async def sleep(
13221323
This can be in single-line Temporal markdown format.
13231324
"""
13241325
await _Runtime.current().workflow_sleep(
1325-
duration=duration.total_seconds()
1326-
if isinstance(duration, timedelta)
1327-
else duration,
1326+
duration=(
1327+
duration.total_seconds() if isinstance(duration, timedelta) else duration
1328+
),
13281329
summary=summary,
13291330
)
13301331

@@ -4413,6 +4414,8 @@ class NexusOperationHandle(Generic[OutputT]):
44134414
This API is experimental and unstable.
44144415
"""
44154416

4417+
# TODO(nexus-preview): should attempts to instantiate directly throw?
4418+
44164419
def cancel(self) -> bool:
44174420
"""Request cancellation of the operation."""
44184421
raise NotImplementedError
@@ -5138,6 +5141,43 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
51385141
ServiceT = TypeVar("ServiceT")
51395142

51405143

5144+
class NexusOperationCancellationType(IntEnum):
5145+
"""Defines behavior of a Nexus operation when the caller workflow initiates cancellation.
5146+
5147+
Pass one of these values to :py:meth:`NexusClient.start_operation` to define cancellation
5148+
behavior.
5149+
5150+
To initiate cancellation, use :py:meth:`NexusOperationHandle.cancel` and then `await` the
5151+
operation handle. This will result in a :py:class:`exceptions.NexusOperationError`. The values
5152+
of this enum define what is guaranteed to have happened by that point.
5153+
"""
5154+
5155+
ABANDON = int(temporalio.bridge.proto.nexus.NexusOperationCancellationType.ABANDON)
5156+
"""Do not send any cancellation request to the operation handler; just report cancellation to the caller"""
5157+
5158+
TRY_CANCEL = int(
5159+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.TRY_CANCEL
5160+
)
5161+
"""Send a cancellation request but immediately report cancellation to the caller. Note that this
5162+
does not guarantee that cancellation is delivered to the operation handler if the caller exits
5163+
before the delivery is done.
5164+
"""
5165+
5166+
WAIT_REQUESTED = int(
5167+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_REQUESTED
5168+
)
5169+
"""Send a cancellation request and wait for confirmation that the request was received.
5170+
Does not wait for the operation to complete.
5171+
"""
5172+
5173+
WAIT_COMPLETED = int(
5174+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED
5175+
)
5176+
"""Send a cancellation request and wait for the operation to complete.
5177+
Note that the operation may not complete as cancelled (for example, if it catches the
5178+
:py:exc:`asyncio.CancelledError` resulting from the cancellation request)."""
5179+
5180+
51415181
class NexusClient(ABC, Generic[ServiceT]):
51425182
"""A client for invoking Nexus operations.
51435183
@@ -5168,6 +5208,7 @@ async def start_operation(
51685208
*,
51695209
output_type: Optional[Type[OutputT]] = None,
51705210
schedule_to_close_timeout: Optional[timedelta] = None,
5211+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
51715212
headers: Optional[Mapping[str, str]] = None,
51725213
) -> NexusOperationHandle[OutputT]: ...
51735214

@@ -5181,6 +5222,7 @@ async def start_operation(
51815222
*,
51825223
output_type: Optional[Type[OutputT]] = None,
51835224
schedule_to_close_timeout: Optional[timedelta] = None,
5225+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
51845226
headers: Optional[Mapping[str, str]] = None,
51855227
) -> NexusOperationHandle[OutputT]: ...
51865228

@@ -5197,6 +5239,7 @@ async def start_operation(
51975239
*,
51985240
output_type: Optional[Type[OutputT]] = None,
51995241
schedule_to_close_timeout: Optional[timedelta] = None,
5242+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52005243
headers: Optional[Mapping[str, str]] = None,
52015244
) -> NexusOperationHandle[OutputT]: ...
52025245

@@ -5213,6 +5256,7 @@ async def start_operation(
52135256
*,
52145257
output_type: Optional[Type[OutputT]] = None,
52155258
schedule_to_close_timeout: Optional[timedelta] = None,
5259+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52165260
headers: Optional[Mapping[str, str]] = None,
52175261
) -> NexusOperationHandle[OutputT]: ...
52185262

@@ -5229,6 +5273,7 @@ async def start_operation(
52295273
*,
52305274
output_type: Optional[Type[OutputT]] = None,
52315275
schedule_to_close_timeout: Optional[timedelta] = None,
5276+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52325277
headers: Optional[Mapping[str, str]] = None,
52335278
) -> NexusOperationHandle[OutputT]: ...
52345279

@@ -5240,6 +5285,7 @@ async def start_operation(
52405285
*,
52415286
output_type: Optional[Type[OutputT]] = None,
52425287
schedule_to_close_timeout: Optional[timedelta] = None,
5288+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52435289
headers: Optional[Mapping[str, str]] = None,
52445290
) -> Any:
52455291
"""Start a Nexus operation and return its handle.
@@ -5269,6 +5315,7 @@ async def execute_operation(
52695315
*,
52705316
output_type: Optional[Type[OutputT]] = None,
52715317
schedule_to_close_timeout: Optional[timedelta] = None,
5318+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52725319
headers: Optional[Mapping[str, str]] = None,
52735320
) -> OutputT: ...
52745321

@@ -5282,6 +5329,7 @@ async def execute_operation(
52825329
*,
52835330
output_type: Optional[Type[OutputT]] = None,
52845331
schedule_to_close_timeout: Optional[timedelta] = None,
5332+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52855333
headers: Optional[Mapping[str, str]] = None,
52865334
) -> OutputT: ...
52875335

@@ -5298,6 +5346,7 @@ async def execute_operation(
52985346
*,
52995347
output_type: Optional[Type[OutputT]] = None,
53005348
schedule_to_close_timeout: Optional[timedelta] = None,
5349+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53015350
headers: Optional[Mapping[str, str]] = None,
53025351
) -> OutputT: ...
53035352

@@ -5317,6 +5366,7 @@ async def execute_operation(
53175366
*,
53185367
output_type: Optional[Type[OutputT]] = None,
53195368
schedule_to_close_timeout: Optional[timedelta] = None,
5369+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53205370
headers: Optional[Mapping[str, str]] = None,
53215371
) -> OutputT: ...
53225372

@@ -5333,6 +5383,7 @@ async def execute_operation(
53335383
*,
53345384
output_type: Optional[Type[OutputT]] = None,
53355385
schedule_to_close_timeout: Optional[timedelta] = None,
5386+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53365387
headers: Optional[Mapping[str, str]] = None,
53375388
) -> OutputT: ...
53385389

@@ -5344,6 +5395,7 @@ async def execute_operation(
53445395
*,
53455396
output_type: Optional[Type[OutputT]] = None,
53465397
schedule_to_close_timeout: Optional[timedelta] = None,
5398+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53475399
headers: Optional[Mapping[str, str]] = None,
53485400
) -> Any:
53495401
"""Execute a Nexus operation and return its result.
@@ -5395,6 +5447,7 @@ async def start_operation(
53955447
*,
53965448
output_type: Optional[Type] = None,
53975449
schedule_to_close_timeout: Optional[timedelta] = None,
5450+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53985451
headers: Optional[Mapping[str, str]] = None,
53995452
) -> Any:
54005453
return (
@@ -5405,6 +5458,7 @@ async def start_operation(
54055458
input=input,
54065459
output_type=output_type,
54075460
schedule_to_close_timeout=schedule_to_close_timeout,
5461+
cancellation_type=cancellation_type,
54085462
headers=headers,
54095463
)
54105464
)
@@ -5416,13 +5470,15 @@ async def execute_operation(
54165470
*,
54175471
output_type: Optional[Type] = None,
54185472
schedule_to_close_timeout: Optional[timedelta] = None,
5473+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54195474
headers: Optional[Mapping[str, str]] = None,
54205475
) -> Any:
54215476
handle = await self.start_operation(
54225477
operation,
54235478
input,
54245479
output_type=output_type,
54255480
schedule_to_close_timeout=schedule_to_close_timeout,
5481+
cancellation_type=cancellation_type,
54265482
headers=headers,
54275483
)
54285484
return await handle

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
118118
"system.enableDeploymentVersions=true",
119119
"--dynamic-config-value",
120120
"frontend.activityAPIsEnabled=true",
121+
"--dynamic-config-value",
122+
"component.nexusoperations.recordCancelRequestCompletionEvents=true",
121123
"--http-port",
122124
str(http_port),
123125
],

0 commit comments

Comments
 (0)