11import asyncio
22import uuid
33from dataclasses import dataclass
4- from datetime import datetime
5- from typing import Optional
4+ from datetime import datetime , timezone
5+ from typing import Any , Optional
66
77import nexusrpc
88import pytest
@@ -52,6 +52,7 @@ class Input:
5252@dataclass
5353class CancellationResult :
5454 operation_token : str
55+ caller_unblock_time : datetime
5556
5657
5758@workflow .defn (sandboxed = False )
@@ -78,8 +79,12 @@ async def run(self, input: Input) -> CancellationResult:
7879 try :
7980 await op_handle
8081 except exceptions .NexusOperationError :
82+ caller_unblock_time = workflow .now ()
8183 assert op_handle .operation_token
82- return CancellationResult (operation_token = op_handle .operation_token )
84+ return CancellationResult (
85+ operation_token = op_handle .operation_token ,
86+ caller_unblock_time = caller_unblock_time ,
87+ )
8388 else :
8489 pytest .fail ("Expected NexusOperationError" )
8590
@@ -131,12 +136,12 @@ async def check_behavior_for_try_cancel(
131136
132137
133138async def check_behavior_for_wait_cancellation_completed (
134- caller_wf : WorkflowHandle ,
139+ caller_wf : WorkflowHandle [ Any , CancellationResult ] ,
135140 handler_wf : WorkflowHandle ,
136141) -> None :
137142 """
138- Check that a cancellation request is sent and the caller workflow exits after the operation is
139- canceled.
143+ Check that a cancellation request is sent and the caller workflow nexus operation future is
144+ unblocked after the operation is canceled.
140145 """
141146 handler_status = (await handler_wf .describe ()).status
142147 assert handler_status == WorkflowExecutionStatus .CANCELED
@@ -153,6 +158,14 @@ async def check_behavior_for_wait_cancellation_completed(
153158 (caller_wf , EventType .EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED ),
154159 ]
155160 )
161+ result = await caller_wf .result ()
162+ handler_wf_canceled_event_time = await _get_event_time (
163+ handler_wf ,
164+ EventType .EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED ,
165+ )
166+ assert (
167+ result .caller_unblock_time > handler_wf_canceled_event_time
168+ ), "For WAIT_COMPLETED, the future should be unblocked after handler workflow cancellation. "
156169
157170
158171@pytest .mark .parametrize (
@@ -225,6 +238,16 @@ async def _has_event(wf_handle: WorkflowHandle, event_type: EventType.ValueType)
225238 return False
226239
227240
241+ async def _get_event_time (
242+ wf_handle : WorkflowHandle ,
243+ event_type : EventType .ValueType ,
244+ ) -> datetime :
245+ event = await anext (
246+ e async for e in wf_handle .fetch_history_events () if e .event_type == event_type
247+ )
248+ return event .event_time .ToDatetime ().replace (tzinfo = timezone .utc )
249+
250+
228251async def _assert_event_subsequence (
229252 expected_events : list [tuple [WorkflowHandle , EventType .ValueType ]],
230253) -> None :
0 commit comments