Skip to content

Commit 1d74484

Browse files
committed
WIP: WAIT_REQUESTED
1 parent c8dffdc commit 1d74484

File tree

1 file changed

+31
-2
lines changed

1 file changed

+31
-2
lines changed

tests/nexus/test_workflow_caller_cancellation_types.py

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,28 @@ class TestContext:
4141
test_context: TestContext
4242

4343

44-
@workflow.defn
44+
@workflow.defn(sandboxed=False)
4545
class HandlerWorkflow:
46+
def __init__(self):
47+
self.caller_op_future_resolved = asyncio.Event()
48+
4649
@workflow.run
4750
async def run(self) -> None:
48-
await asyncio.Future()
51+
try:
52+
await asyncio.Future()
53+
except asyncio.CancelledError:
54+
if (
55+
test_context.cancellation_type
56+
== workflow.NexusOperationCancellationType.WAIT_REQUESTED
57+
):
58+
# We want to prove that the caller op future can be resolved before the operation
59+
# (i.e. its backing workflow) is cancelled.
60+
await self.caller_op_future_resolved.wait()
61+
raise
62+
63+
@workflow.signal
64+
def set_caller_op_future_resolved(self) -> None:
65+
self.caller_op_future_resolved.set()
4966

5067

5168
@nexusrpc.service
@@ -137,6 +154,7 @@ async def run(self, input: Input) -> CancellationResult:
137154
else self.nexus_client.start_operation(Service.workflow_op, input=None)
138155
)
139156
self.operation_token = op_handle.operation_token
157+
assert self.operation_token
140158
# Request cancellation of the asyncio task representing the nexus operation. When the handle
141159
# task is awaited, the resulting asyncio.CancelledError is caught, and a
142160
# RequestCancelNexusOperation command is emitted instead (see
@@ -167,6 +185,17 @@ async def run(self, input: Input) -> CancellationResult:
167185
# Block the thread for a bit to avoid flakiness in asserting that the future is
168186
# unblocked before the WFT completion is sent in the TryCancel case.
169187
time.sleep(1e-2)
188+
189+
# Notify handler workflow that caller op future has been resolved
190+
await workflow.get_external_workflow_handle_for(
191+
HandlerWorkflow.run,
192+
workflow_id=(
193+
nexus.WorkflowHandle[None]
194+
.from_token(self.operation_token)
195+
.workflow_id
196+
),
197+
).signal(HandlerWorkflow.set_caller_op_future_resolved)
198+
170199
await workflow.wait_condition(lambda: self.released)
171200
return CancellationResult(
172201
operation_token=op_handle.operation_token,

0 commit comments

Comments
 (0)