Skip to content

Commit 85af2dd

Browse files
committed
Test Nexus cancellation
1 parent 2275943 commit 85af2dd

File tree

4 files changed

+265
-15
lines changed

4 files changed

+265
-15
lines changed

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,7 @@ class StartNexusOperationInput(Generic[InputT, OutputT]):
298298
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]]
299299
input: InputT
300300
schedule_to_close_timeout: Optional[timedelta]
301+
cancellation_type: temporalio.workflow.NexusOperationCancellationType
301302
headers: Optional[Mapping[str, str]]
302303
output_type: Optional[type[OutputT]] = None
303304

temporalio/worker/_workflow_instance.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@
5454
import temporalio.bridge.proto.activity_result
5555
import temporalio.bridge.proto.child_workflow
5656
import temporalio.bridge.proto.common
57+
import temporalio.bridge.proto.nexus
5758
import temporalio.bridge.proto.workflow_activation
5859
import temporalio.bridge.proto.workflow_commands
5960
import temporalio.bridge.proto.workflow_completion
6061
import temporalio.common
6162
import temporalio.converter
6263
import temporalio.exceptions
63-
import temporalio.nexus
6464
import temporalio.workflow
6565
from temporalio.service import __version__
6666

@@ -1502,9 +1502,10 @@ async def workflow_start_nexus_operation(
15021502
service: str,
15031503
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
15041504
input: Any,
1505-
output_type: Optional[Type[OutputT]] = None,
1506-
schedule_to_close_timeout: Optional[timedelta] = None,
1507-
headers: Optional[Mapping[str, str]] = None,
1505+
output_type: Optional[Type[OutputT]],
1506+
schedule_to_close_timeout: Optional[timedelta],
1507+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
1508+
headers: Optional[Mapping[str, str]],
15081509
) -> temporalio.workflow.NexusOperationHandle[OutputT]:
15091510
# start_nexus_operation
15101511
return await self._outbound.start_nexus_operation(
@@ -1515,6 +1516,7 @@ async def workflow_start_nexus_operation(
15151516
input=input,
15161517
output_type=output_type,
15171518
schedule_to_close_timeout=schedule_to_close_timeout,
1519+
cancellation_type=cancellation_type,
15181520
headers=headers,
15191521
)
15201522
)
@@ -2757,7 +2759,7 @@ def _apply_schedule_command(
27572759
if self._input.retry_policy:
27582760
self._input.retry_policy.apply_to_proto(v.retry_policy)
27592761
v.cancellation_type = cast(
2760-
"temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType",
2762+
temporalio.bridge.proto.workflow_commands.ActivityCancellationType.ValueType,
27612763
int(self._input.cancellation_type),
27622764
)
27632765

@@ -2893,7 +2895,7 @@ def _apply_start_command(self) -> None:
28932895
if self._input.task_timeout:
28942896
v.workflow_task_timeout.FromTimedelta(self._input.task_timeout)
28952897
v.parent_close_policy = cast(
2896-
"temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType",
2898+
temporalio.bridge.proto.child_workflow.ParentClosePolicy.ValueType,
28972899
int(self._input.parent_close_policy),
28982900
)
28992901
v.workflow_id_reuse_policy = cast(
@@ -2915,7 +2917,7 @@ def _apply_start_command(self) -> None:
29152917
self._input.search_attributes, v.search_attributes
29162918
)
29172919
v.cancellation_type = cast(
2918-
"temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType",
2920+
temporalio.bridge.proto.child_workflow.ChildWorkflowCancellationType.ValueType,
29192921
int(self._input.cancellation_type),
29202922
)
29212923
if self._input.versioning_intent:
@@ -3011,11 +3013,6 @@ def __init__(
30113013

30123014
@property
30133015
def operation_token(self) -> Optional[str]:
3014-
# TODO(nexus-preview): How should this behave?
3015-
# Java has a separate class that only exists if the operation token exists:
3016-
# https://github.com/temporalio/sdk-java/blob/master/temporal-sdk/src/main/java/io/temporal/internal/sync/NexusOperationExecutionImpl.java#L26
3017-
# And Go similar:
3018-
# https://github.com/temporalio/sdk-go/blob/master/internal/workflow.go#L2770-L2771
30193016
try:
30203017
return self._start_fut.result()
30213018
except BaseException:
@@ -3064,6 +3061,11 @@ def _apply_schedule_command(self) -> None:
30643061
v.schedule_to_close_timeout.FromTimedelta(
30653062
self._input.schedule_to_close_timeout
30663063
)
3064+
v.cancellation_type = cast(
3065+
temporalio.bridge.proto.nexus.NexusOperationCancellationType.ValueType,
3066+
int(self._input.cancellation_type),
3067+
)
3068+
30673069
if self._input.headers:
30683070
for key, val in self._input.headers.items():
30693071
v.nexus_header[key] = val

temporalio/workflow.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -858,9 +858,10 @@ async def workflow_start_nexus_operation(
858858
service: str,
859859
operation: Union[nexusrpc.Operation[InputT, OutputT], str, Callable[..., Any]],
860860
input: Any,
861-
output_type: Optional[Type[OutputT]] = None,
862-
schedule_to_close_timeout: Optional[timedelta] = None,
863-
headers: Optional[Mapping[str, str]] = None,
861+
output_type: Optional[Type[OutputT]],
862+
schedule_to_close_timeout: Optional[timedelta],
863+
cancellation_type: temporalio.workflow.NexusOperationCancellationType,
864+
headers: Optional[Mapping[str, str]],
864865
) -> NexusOperationHandle[OutputT]: ...
865866

866867
@abstractmethod
@@ -5169,6 +5170,7 @@ async def start_operation(
51695170
*,
51705171
output_type: Optional[type[OutputT]] = None,
51715172
schedule_to_close_timeout: Optional[timedelta] = None,
5173+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
51725174
headers: Optional[Mapping[str, str]] = None,
51735175
) -> NexusOperationHandle[OutputT]: ...
51745176

@@ -5182,6 +5184,7 @@ async def start_operation(
51825184
*,
51835185
output_type: Optional[type[OutputT]] = None,
51845186
schedule_to_close_timeout: Optional[timedelta] = None,
5187+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
51855188
headers: Optional[Mapping[str, str]] = None,
51865189
) -> NexusOperationHandle[OutputT]: ...
51875190

@@ -5198,6 +5201,7 @@ async def start_operation(
51985201
*,
51995202
output_type: Optional[type[OutputT]] = None,
52005203
schedule_to_close_timeout: Optional[timedelta] = None,
5204+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52015205
headers: Optional[Mapping[str, str]] = None,
52025206
) -> NexusOperationHandle[OutputT]: ...
52035207

@@ -5214,6 +5218,7 @@ async def start_operation(
52145218
*,
52155219
output_type: Optional[type[OutputT]] = None,
52165220
schedule_to_close_timeout: Optional[timedelta] = None,
5221+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52175222
headers: Optional[Mapping[str, str]] = None,
52185223
) -> NexusOperationHandle[OutputT]: ...
52195224

@@ -5230,6 +5235,7 @@ async def start_operation(
52305235
*,
52315236
output_type: Optional[type[OutputT]] = None,
52325237
schedule_to_close_timeout: Optional[timedelta] = None,
5238+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52335239
headers: Optional[Mapping[str, str]] = None,
52345240
) -> NexusOperationHandle[OutputT]: ...
52355241

@@ -5270,6 +5276,7 @@ async def execute_operation(
52705276
*,
52715277
output_type: Optional[type[OutputT]] = None,
52725278
schedule_to_close_timeout: Optional[timedelta] = None,
5279+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52735280
headers: Optional[Mapping[str, str]] = None,
52745281
) -> OutputT: ...
52755282

@@ -5283,6 +5290,7 @@ async def execute_operation(
52835290
*,
52845291
output_type: Optional[type[OutputT]] = None,
52855292
schedule_to_close_timeout: Optional[timedelta] = None,
5293+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
52865294
headers: Optional[Mapping[str, str]] = None,
52875295
) -> OutputT: ...
52885296

@@ -5318,6 +5326,7 @@ async def execute_operation(
53185326
*,
53195327
output_type: Optional[type[OutputT]] = None,
53205328
schedule_to_close_timeout: Optional[timedelta] = None,
5329+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53215330
headers: Optional[Mapping[str, str]] = None,
53225331
) -> OutputT: ...
53235332

@@ -5334,6 +5343,7 @@ async def execute_operation(
53345343
*,
53355344
output_type: Optional[type[OutputT]] = None,
53365345
schedule_to_close_timeout: Optional[timedelta] = None,
5346+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53375347
headers: Optional[Mapping[str, str]] = None,
53385348
) -> OutputT: ...
53395349

@@ -5396,6 +5406,7 @@ async def start_operation(
53965406
*,
53975407
output_type: Optional[type] = None,
53985408
schedule_to_close_timeout: Optional[timedelta] = None,
5409+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
53995410
headers: Optional[Mapping[str, str]] = None,
54005411
) -> Any:
54015412
return (
@@ -5406,6 +5417,7 @@ async def start_operation(
54065417
input=input,
54075418
output_type=output_type,
54085419
schedule_to_close_timeout=schedule_to_close_timeout,
5420+
cancellation_type=cancellation_type,
54095421
headers=headers,
54105422
)
54115423
)
@@ -5417,13 +5429,15 @@ async def execute_operation(
54175429
*,
54185430
output_type: Optional[type] = None,
54195431
schedule_to_close_timeout: Optional[timedelta] = None,
5432+
cancellation_type: NexusOperationCancellationType = NexusOperationCancellationType.WAIT_CANCELLATION_COMPLETED,
54205433
headers: Optional[Mapping[str, str]] = None,
54215434
) -> Any:
54225435
handle = await self.start_operation(
54235436
operation,
54245437
input,
54255438
output_type=output_type,
54265439
schedule_to_close_timeout=schedule_to_close_timeout,
5440+
cancellation_type=cancellation_type,
54275441
headers=headers,
54285442
)
54295443
return await handle

0 commit comments

Comments
 (0)