Skip to content

Commit 98d879e

Browse files
committed
Test Nexus cancellation
1 parent d5e6dad commit 98d879e

File tree

4 files changed

+265
-15
lines changed

4 files changed

+265
-15
lines changed

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
298298
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
299299
input: InputT
300300
schedule_to_close_timeout: Optional[timedelta]
301+
cancellation_type: temporalio.workflow.NexusOperationCancellationType
301302
headers: Optional[Mapping[str, str]]
302303
output_type: Optional[type[OutputT]] = None
303304

temporalio/worker/_workflow_instance.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import temporalio.bridge.proto.activity_result
5555
import temporalio.bridge.proto.child_workflow
5656
import temporalio.bridge.proto.common
57+
import temporalio.bridge.proto.nexus
5758
import temporalio.bridge.proto.workflow_activation
5859
import temporalio.bridge.proto.workflow_commands
5960
import temporalio.bridge.proto.workflow_completion
6061
import temporalio.common
6162
import temporalio.converter
6263
import temporalio.exceptions
63-
import temporalio.nexus
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

@@ -1502,9 +1502,10 @@ async def workflow_start_nexus_operation(
15021502
service: str,
15031503
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
15041504
input: Any,
1505-
output_type: Optional[Type[OutputT]] = None,
1506-
schedule_to_close_timeout: Optional[timedelta] = None,
1507-
headers: Optional[Mapping[str, str]] = None,
1505+
output_type: Optional[Type[OutputT]],
1506+
schedule_to_close_timeout: Optional[timedelta],
1507+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1508+
headers: Optional[Mapping[str, str]],
15081509
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15091510
# start_nexus_operation
15101511
return await self._outbound.start_nexus_operation(
@@ -1515,6 +1516,7 @@ async def workflow_start_nexus_operation(
15151516
input=input,
15161517
output_type=output_type,
15171518
schedule_to_close_timeout=schedule_to_close_timeout,
1519+
cancellation_type=cancellation_type,
15181520
headers=headers,
15191521
)
15201522
)
@@ -2757,7 +2759,7 @@ def _apply_schedule_command(
27572759
if self._input.retry_policy:
27582760
self._input.retry_policy.apply_to_proto(v.retry_policy)
27592761
v.cancellation_type = cast(
2760-
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
2762+
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27612763
int(self._input.cancellation_type),
27622764
)
27632765

@@ -2893,7 +2895,7 @@ def _apply_start_command(self) -> None:
28932895
if self._input.task_timeout:
28942896
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
28952897
v.parent_close_policy = cast(
2896-
"temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType",
2898+
temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType,
28972899
int(self._input.parent_close_policy),
28982900
)
28992901
v.workflow_id_reuse_policy = cast(
@@ -2915,7 +2917,7 @@ def _apply_start_command(self) -> None:
29152917
self._input.search_attributes, v.search_attributes
29162918
)
29172919
v.cancellation_type = cast(
2918-
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
2920+
temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType,
29192921
int(self._input.cancellation_type),
29202922
)
29212923
if self._input.versioning_intent:
@@ -3011,11 +3013,6 @@ def __init__(
30113013

30123014
@property
30133015
def operation_token(self) -> Optional[str]:
3014-
# TODO(nexus-preview): How should this behave?
3015-
# Java has a separate class that only exists if the operation token exists:
3016-
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26
3017-
# And Go similar:
3018-
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771
30193016
try:
30203017
return self._start_fut.result()
30213018
except BaseException:
@@ -3064,6 +3061,11 @@ def _apply_schedule_command(self) -> None:
30643061
v.schedule_to_close_timeout.FromTimedelta(
30653062
self._input.schedule_to_close_timeout
30663063
)
3064+
v.cancellation_type = cast(
3065+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
3066+
int(self._input.cancellation_type),
3067+
)
3068+
30673069
if self._input.headers:
30683070
for key, val in self._input.headers.items():
30693071
v.nexus_header[key] = val

temporalio/workflow.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -858,9 +858,10 @@ async def workflow_start_nexus_operation(
858858
service: str,
859859
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
860860
input: Any,
861-
output_type: Optional[Type[OutputT]] = None,
862-
schedule_to_close_timeout: Optional[timedelta] = None,
863-
headers: Optional[Mapping[str, str]] = None,
861+
output_type: Optional[Type[OutputT]],
862+
schedule_to_close_timeout: Optional[timedelta],
863+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
864+
headers: Optional[Mapping[str, str]],
864865
) -> NexusOperationHandle[OutputT]: ...
865866

866867
@abstractmethod
@@ -5169,6 +5170,7 @@ async def start_operation(
51695170
*,
51705171
output_type: Optional[type[OutputT]] = None,
51715172
schedule_to_close_timeout: Optional[timedelta] = None,
5173+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
51725174
headers: Optional[Mapping[str, str]] = None,
51735175
) -> NexusOperationHandle[OutputT]: ...
51745176

@@ -5182,6 +5184,7 @@ async def start_operation(
51825184
*,
51835185
output_type: Optional[type[OutputT]] = None,
51845186
schedule_to_close_timeout: Optional[timedelta] = None,
5187+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
51855188
headers: Optional[Mapping[str, str]] = None,
51865189
) -> NexusOperationHandle[OutputT]: ...
51875190

@@ -5198,6 +5201,7 @@ async def start_operation(
51985201
*,
51995202
output_type: Optional[type[OutputT]] = None,
52005203
schedule_to_close_timeout: Optional[timedelta] = None,
5204+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52015205
headers: Optional[Mapping[str, str]] = None,
52025206
) -> NexusOperationHandle[OutputT]: ...
52035207

@@ -5214,6 +5218,7 @@ async def start_operation(
52145218
*,
52155219
output_type: Optional[type[OutputT]] = None,
52165220
schedule_to_close_timeout: Optional[timedelta] = None,
5221+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52175222
headers: Optional[Mapping[str, str]] = None,
52185223
) -> NexusOperationHandle[OutputT]: ...
52195224

@@ -5230,6 +5235,7 @@ async def start_operation(
52305235
*,
52315236
output_type: Optional[type[OutputT]] = None,
52325237
schedule_to_close_timeout: Optional[timedelta] = None,
5238+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52335239
headers: Optional[Mapping[str, str]] = None,
52345240
) -> NexusOperationHandle[OutputT]: ...
52355241

@@ -5270,6 +5276,7 @@ async def execute_operation(
52705276
*,
52715277
output_type: Optional[type[OutputT]] = None,
52725278
schedule_to_close_timeout: Optional[timedelta] = None,
5279+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52735280
headers: Optional[Mapping[str, str]] = None,
52745281
) -> OutputT: ...
52755282

@@ -5283,6 +5290,7 @@ async def execute_operation(
52835290
*,
52845291
output_type: Optional[type[OutputT]] = None,
52855292
schedule_to_close_timeout: Optional[timedelta] = None,
5293+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52865294
headers: Optional[Mapping[str, str]] = None,
52875295
) -> OutputT: ...
52885296

@@ -5315,6 +5323,7 @@ async def execute_operation(
53155323
*,
53165324
output_type: Optional[type[OutputT]] = None,
53175325
schedule_to_close_timeout: Optional[timedelta] = None,
5326+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53185327
headers: Optional[Mapping[str, str]] = None,
53195328
) -> OutputT: ...
53205329

@@ -5331,6 +5340,7 @@ async def execute_operation(
53315340
*,
53325341
output_type: Optional[type[OutputT]] = None,
53335342
schedule_to_close_timeout: Optional[timedelta] = None,
5343+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53345344
headers: Optional[Mapping[str, str]] = None,
53355345
) -> OutputT: ...
53365346

@@ -5393,6 +5403,7 @@ async def start_operation(
53935403
*,
53945404
output_type: Optional[type] = None,
53955405
schedule_to_close_timeout: Optional[timedelta] = None,
5406+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53965407
headers: Optional[Mapping[str, str]] = None,
53975408
) -> Any:
53985409
return (
@@ -5403,6 +5414,7 @@ async def start_operation(
54035414
input=input,
54045415
output_type=output_type,
54055416
schedule_to_close_timeout=schedule_to_close_timeout,
5417+
cancellation_type=cancellation_type,
54065418
headers=headers,
54075419
)
54085420
)
@@ -5414,13 +5426,15 @@ async def execute_operation(
54145426
*,
54155427
output_type: Optional[type] = None,
54165428
schedule_to_close_timeout: Optional[timedelta] = None,
5429+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
54175430
headers: Optional[Mapping[str, str]] = None,
54185431
) -> Any:
54195432
handle = await self.start_operation(
54205433
operation,
54215434
input,
54225435
output_type=output_type,
54235436
schedule_to_close_timeout=schedule_to_close_timeout,
5437+
cancellation_type=cancellation_type,
54245438
headers=headers,
54255439
)
54265440
return await handle

0 commit comments

Comments
 (0)