Skip to content

Commit 5f1f8c9

Browse files
committed
Cancellation types for Nexus operations invoked by workflows
1 parent e1016bc commit 5f1f8c9

File tree

9 files changed

+1162
-37
lines changed

9 files changed

+1162
-37
lines changed

temporalio/bridge/Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/sdk-core

Submodule sdk-core updated 47 files

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
298298
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
299299
input: InputT
300300
schedule_to_close_timeout: Optional[timedelta]
301+
cancellation_type: temporalio.workflow.NexusOperationCancellationType
301302
headers: Optional[Mapping[str, str]]
302303
output_type: Optional[Type[OutputT]] = None
303304

temporalio/worker/_workflow_instance.py

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import temporalio.bridge.proto.activity_result
5555
import temporalio.bridge.proto.child_workflow
5656
import temporalio.bridge.proto.common
57+
import temporalio.bridge.proto.nexus
5758
import temporalio.bridge.proto.workflow_activation
5859
import temporalio.bridge.proto.workflow_commands
5960
import temporalio.bridge.proto.workflow_completion
6061
import temporalio.common
6162
import temporalio.converter
6263
import temporalio.exceptions
63-
import temporalio.nexus
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

@@ -883,9 +883,17 @@ def _apply_resolve_nexus_operation(
883883
) -> None:
884884
handle = self._pending_nexus_operations.pop(job.seq, None)
885885
if not handle:
886-
raise RuntimeError(
887-
f"Failed to find nexus operation handle for job sequence number {job.seq}"
888-
)
886+
# One way this can occur is:
887+
# 1. Cancel request issued with cancellation_type=WaitRequested.
888+
# 2. Server receives nexus cancel handler task completion and writes
889+
# NexusOperationCancelRequestCompleted / NexusOperationCancelRequestFailed. On
890+
# consuming this event, core sends an activation resolving the handle future as
891+
# completed / failed.
892+
# 4. Subsequently, the nexus operation completes as completed/failed, causing the server
893+
# to write NexusOperationCompleted / NexusOperationFailed. On consuming this event,
894+
# core sends an activation which would attempt to resolve the handle future as
895+
# completed / failed, but it has already been resolved.
896+
return
889897

890898
# Handle the four oneof variants of NexusOperationResult
891899
result = job.result
@@ -1502,9 +1510,10 @@ async def workflow_start_nexus_operation(
15021510
service: str,
15031511
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
15041512
input: Any,
1505-
output_type: Optional[Type[OutputT]] = None,
1506-
schedule_to_close_timeout: Optional[timedelta] = None,
1507-
headers: Optional[Mapping[str, str]] = None,
1513+
output_type: Optional[Type[OutputT]],
1514+
schedule_to_close_timeout: Optional[timedelta],
1515+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1516+
headers: Optional[Mapping[str, str]],
15081517
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15091518
# start_nexus_operation
15101519
return await self._outbound.start_nexus_operation(
@@ -1515,6 +1524,7 @@ async def workflow_start_nexus_operation(
15151524
input=input,
15161525
output_type=output_type,
15171526
schedule_to_close_timeout=schedule_to_close_timeout,
1527+
cancellation_type=cancellation_type,
15181528
headers=headers,
15191529
)
15201530
)
@@ -1826,20 +1836,19 @@ async def run_child() -> Any:
18261836
async def _outbound_start_nexus_operation(
18271837
self, input: StartNexusOperationInput[Any, OutputT]
18281838
) -> _NexusOperationHandle[OutputT]:
1829-
# A Nexus operation handle contains two futures: self._start_fut is resolved as a
1830-
# result of the Nexus operation starting (activation job:
1831-
# resolve_nexus_operation_start), and self._result_fut is resolved as a result of
1832-
# the Nexus operation completing (activation job: resolve_nexus_operation). The
1833-
# handle itself corresponds to an asyncio.Task which waits on self.result_fut,
1834-
# handling CancelledError by emitting a RequestCancelNexusOperation command. We do
1835-
# not return the handle until we receive resolve_nexus_operation_start, like
1836-
# ChildWorkflowHandle and unlike ActivityHandle. Note that a Nexus operation may
1837-
# complete synchronously (in which case both jobs will be sent in the same
1838-
# activation, and start will be resolved without an operation token), or
1839-
# asynchronously (in which case start they may be sent in separate activations,
1840-
# and start will be resolved with an operation token). See comments in
1841-
# tests/worker/test_nexus.py for worked examples of the evolution of the resulting
1842-
# handle state machine in the sync and async Nexus response cases.
1839+
# A Nexus operation handle contains two futures: self._start_fut is resolved as a result of
1840+
# the Nexus operation starting (activation job: resolve_nexus_operation_start), and
1841+
# self._result_fut is resolved as a result of the Nexus operation completing (activation
1842+
# job: resolve_nexus_operation). The handle itself corresponds to an asyncio.Task which
1843+
# waits on self.result_fut, handling CancelledError by emitting a
1844+
# RequestCancelNexusOperation command. We do not return the handle until we receive
1845+
# resolve_nexus_operation_start, like ChildWorkflowHandle and unlike ActivityHandle. Note
1846+
# that a Nexus operation may complete synchronously (in which case both jobs will be sent in
1847+
# the same activation, and start will be resolved without an operation token), or
1848+
# asynchronously (in which case they may be sent in separate activations, and start will be
1849+
# resolved with an operation token). See comments in tests/worker/test_nexus.py for worked
1850+
# examples of the evolution of the resulting handle state machine in the sync and async
1851+
# Nexus response cases.
18431852
handle: _NexusOperationHandle[OutputT]
18441853

18451854
async def operation_handle_fn() -> OutputT:
@@ -2757,7 +2766,7 @@ def _apply_schedule_command(
27572766
if self._input.retry_policy:
27582767
self._input.retry_policy.apply_to_proto(v.retry_policy)
27592768
v.cancellation_type = cast(
2760-
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
2769+
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27612770
int(self._input.cancellation_type),
27622771
)
27632772

@@ -2893,7 +2902,7 @@ def _apply_start_command(self) -> None:
28932902
if self._input.task_timeout:
28942903
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
28952904
v.parent_close_policy = cast(
2896-
"temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType",
2905+
temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType,
28972906
int(self._input.parent_close_policy),
28982907
)
28992908
v.workflow_id_reuse_policy = cast(
@@ -2915,7 +2924,7 @@ def _apply_start_command(self) -> None:
29152924
self._input.search_attributes, v.search_attributes
29162925
)
29172926
v.cancellation_type = cast(
2918-
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
2927+
temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType,
29192928
int(self._input.cancellation_type),
29202929
)
29212930
if self._input.versioning_intent:
@@ -3011,11 +3020,6 @@ def __init__(
30113020

30123021
@property
30133022
def operation_token(self) -> Optional[str]:
3014-
# TODO(nexus-preview): How should this behave?
3015-
# Java has a separate class that only exists if the operation token exists:
3016-
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26
3017-
# And Go similar:
3018-
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771
30193023
try:
30203024
return self._start_fut.result()
30213025
except BaseException:
@@ -3064,6 +3068,11 @@ def _apply_schedule_command(self) -> None:
30643068
v.schedule_to_close_timeout.FromTimedelta(
30653069
self._input.schedule_to_close_timeout
30663070
)
3071+
v.cancellation_type = cast(
3072+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
3073+
int(self._input.cancellation_type),
3074+
)
3075+
30673076
if self._input.headers:
30683077
for key, val in self._input.headers.items():
30693078
v.nexus_header[key] = val

temporalio/workflow.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -858,9 +858,10 @@ async def workflow_start_nexus_operation(
858858
service: str,
859859
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
860860
input: Any,
861-
output_type: Optional[Type[OutputT]] = None,
862-
schedule_to_close_timeout: Optional[timedelta] = None,
863-
headers: Optional[Mapping[str, str]] = None,
861+
output_type: Optional[Type[OutputT]],
862+
schedule_to_close_timeout: Optional[timedelta],
863+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
864+
headers: Optional[Mapping[str, str]],
864865
) -> NexusOperationHandle[OutputT]: ...
865866

866867
@abstractmethod
@@ -1321,9 +1322,9 @@ async def sleep(
13211322
This can be in single-line Temporal markdown format.
13221323
"""
13231324
await _Runtime.current().workflow_sleep(
1324-
duration=duration.total_seconds()
1325-
if isinstance(duration, timedelta)
1326-
else duration,
1325+
duration=(
1326+
duration.total_seconds() if isinstance(duration, timedelta) else duration
1327+
),
13271328
summary=summary,
13281329
)
13291330

@@ -4412,6 +4413,8 @@ class NexusOperationHandle(Generic[OutputT]):
44124413
This API is experimental and unstable.
44134414
"""
44144415

4416+
# TODO(nexus-preview): should attempts to instantiate directly throw?
4417+
44154418
def cancel(self) -> bool:
44164419
"""Request cancellation of the operation."""
44174420
raise NotImplementedError
@@ -5137,6 +5140,43 @@ def _to_proto(self) -> temporalio.bridge.proto.common.VersioningIntent.ValueType
51375140
ServiceT = TypeVar("ServiceT")
51385141

51395142

5143+
class NexusOperationCancellationType(IntEnum):
5144+
"""Defines behavior of a Nexus operation when the caller workflow initiates cancellation.
5145+
5146+
Pass one of these values to :py:meth:`NexusClient.start_operation` to define cancellation
5147+
behavior.
5148+
5149+
To initiate cancellation, use :py:meth:`NexusOperationHandle.cancel` and then `await` the
5150+
operation handle. This will result in a :py:class:`exceptions.NexusOperationError`. The values
5151+
of this enum define what is guaranteed to have happened by that point.
5152+
"""
5153+
5154+
ABANDON = int(temporalio.bridge.proto.nexus.NexusOperationCancellationType.ABANDON)
5155+
"""Do not send any cancellation request to the operation handler; just report cancellation to the caller"""
5156+
5157+
TRY_CANCEL = int(
5158+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.TRY_CANCEL
5159+
)
5160+
"""Send a cancellation request but immediately report cancellation to the caller. Note that this
5161+
does not guarantee that cancellation is delivered to the operation handler if the caller exits
5162+
before the delivery is done.
5163+
"""
5164+
5165+
WAIT_REQUESTED = int(
5166+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_REQUESTED
5167+
)
5168+
"""Send a cancellation request and wait for confirmation that the request was received.
5169+
Does not wait for the operation to complete.
5170+
"""
5171+
5172+
WAIT_COMPLETED = int(
5173+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED
5174+
)
5175+
"""Send a cancellation request and wait for the operation to complete.
5176+
Note that the operation may not complete as cancelled (for example, if it catches the
5177+
:py:exc:`asyncio.CancelledError` resulting from the cancellation request)."""
5178+
5179+
51405180
class NexusClient(ABC, Generic[ServiceT]):
51415181
"""A client for invoking Nexus operations.
51425182
@@ -5167,6 +5207,7 @@ async def start_operation(
51675207
*,
51685208
output_type: Optional[Type[OutputT]] = None,
51695209
schedule_to_close_timeout: Optional[timedelta] = None,
5210+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
51705211
headers: Optional[Mapping[str, str]] = None,
51715212
) -> NexusOperationHandle[OutputT]: ...
51725213

@@ -5180,6 +5221,7 @@ async def start_operation(
51805221
*,
51815222
output_type: Optional[Type[OutputT]] = None,
51825223
schedule_to_close_timeout: Optional[timedelta] = None,
5224+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
51835225
headers: Optional[Mapping[str, str]] = None,
51845226
) -> NexusOperationHandle[OutputT]: ...
51855227

@@ -5196,6 +5238,7 @@ async def start_operation(
51965238
*,
51975239
output_type: Optional[Type[OutputT]] = None,
51985240
schedule_to_close_timeout: Optional[timedelta] = None,
5241+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
51995242
headers: Optional[Mapping[str, str]] = None,
52005243
) -> NexusOperationHandle[OutputT]: ...
52015244

@@ -5212,6 +5255,7 @@ async def start_operation(
52125255
*,
52135256
output_type: Optional[Type[OutputT]] = None,
52145257
schedule_to_close_timeout: Optional[timedelta] = None,
5258+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52155259
headers: Optional[Mapping[str, str]] = None,
52165260
) -> NexusOperationHandle[OutputT]: ...
52175261

@@ -5228,6 +5272,7 @@ async def start_operation(
52285272
*,
52295273
output_type: Optional[Type[OutputT]] = None,
52305274
schedule_to_close_timeout: Optional[timedelta] = None,
5275+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52315276
headers: Optional[Mapping[str, str]] = None,
52325277
) -> NexusOperationHandle[OutputT]: ...
52335278

@@ -5239,6 +5284,7 @@ async def start_operation(
52395284
*,
52405285
output_type: Optional[Type[OutputT]] = None,
52415286
schedule_to_close_timeout: Optional[timedelta] = None,
5287+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52425288
headers: Optional[Mapping[str, str]] = None,
52435289
) -> Any:
52445290
"""Start a Nexus operation and return its handle.
@@ -5268,6 +5314,7 @@ async def execute_operation(
52685314
*,
52695315
output_type: Optional[Type[OutputT]] = None,
52705316
schedule_to_close_timeout: Optional[timedelta] = None,
5317+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52715318
headers: Optional[Mapping[str, str]] = None,
52725319
) -> OutputT: ...
52735320

@@ -5281,6 +5328,7 @@ async def execute_operation(
52815328
*,
52825329
output_type: Optional[Type[OutputT]] = None,
52835330
schedule_to_close_timeout: Optional[timedelta] = None,
5331+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
52845332
headers: Optional[Mapping[str, str]] = None,
52855333
) -> OutputT: ...
52865334

@@ -5297,6 +5345,7 @@ async def execute_operation(
52975345
*,
52985346
output_type: Optional[Type[OutputT]] = None,
52995347
schedule_to_close_timeout: Optional[timedelta] = None,
5348+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53005349
headers: Optional[Mapping[str, str]] = None,
53015350
) -> OutputT: ...
53025351

@@ -5316,6 +5365,7 @@ async def execute_operation(
53165365
*,
53175366
output_type: Optional[Type[OutputT]] = None,
53185367
schedule_to_close_timeout: Optional[timedelta] = None,
5368+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53195369
headers: Optional[Mapping[str, str]] = None,
53205370
) -> OutputT: ...
53215371

@@ -5332,6 +5382,7 @@ async def execute_operation(
53325382
*,
53335383
output_type: Optional[Type[OutputT]] = None,
53345384
schedule_to_close_timeout: Optional[timedelta] = None,
5385+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53355386
headers: Optional[Mapping[str, str]] = None,
53365387
) -> OutputT: ...
53375388

@@ -5343,6 +5394,7 @@ async def execute_operation(
53435394
*,
53445395
output_type: Optional[Type[OutputT]] = None,
53455396
schedule_to_close_timeout: Optional[timedelta] = None,
5397+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53465398
headers: Optional[Mapping[str, str]] = None,
53475399
) -> Any:
53485400
"""Execute a Nexus operation and return its result.
@@ -5394,6 +5446,7 @@ async def start_operation(
53945446
*,
53955447
output_type: Optional[Type] = None,
53965448
schedule_to_close_timeout: Optional[timedelta] = None,
5449+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
53975450
headers: Optional[Mapping[str, str]] = None,
53985451
) -> Any:
53995452
return (
@@ -5404,6 +5457,7 @@ async def start_operation(
54045457
input=input,
54055458
output_type=output_type,
54065459
schedule_to_close_timeout=schedule_to_close_timeout,
5460+
cancellation_type=cancellation_type,
54075461
headers=headers,
54085462
)
54095463
)
@@ -5415,13 +5469,15 @@ async def execute_operation(
54155469
*,
54165470
output_type: Optional[Type] = None,
54175471
schedule_to_close_timeout: Optional[timedelta] = None,
5472+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_COMPLETED,
54185473
headers: Optional[Mapping[str, str]] = None,
54195474
) -> Any:
54205475
handle = await self.start_operation(
54215476
operation,
54225477
input,
54235478
output_type=output_type,
54245479
schedule_to_close_timeout=schedule_to_close_timeout,
5480+
cancellation_type=cancellation_type,
54255481
headers=headers,
54265482
)
54275483
return await handle

tests/conftest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ async def env(env_type: str) -> AsyncGenerator[WorkflowEnvironment, None]:
118118
"system.enableDeploymentVersions=true",
119119
"--dynamic-config-value",
120120
"frontend.activityAPIsEnabled=true",
121+
"--dynamic-config-value",
122+
"component.nexusoperations.recordCancelRequestCompletionEvents=true",
121123
"--http-port",
122124
str(http_port),
123125
],

0 commit comments

Comments
 (0)