Skip to content

Commit 7574c23

Browse files
committed
Fix external workflow signal test [AI]
1 parent 421476d commit 7574c23

File tree

1 file changed

+70
-25
lines changed

1 file changed

+70
-25
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
255255
self._pending_activities: Dict[int, _ActivityHandle] = {}
256256
self._pending_child_workflows: Dict[int, _ChildWorkflowHandle] = {}
257257
self._pending_nexus_operations: Dict[int, _NexusOperationHandle] = {}
258-
self._pending_external_signals: Dict[int, asyncio.Future] = {}
259-
self._pending_external_cancels: Dict[int, asyncio.Future] = {}
258+
self._pending_external_signals: Dict[int, Tuple[asyncio.Future, str]] = {}
259+
self._pending_external_cancels: Dict[int, Tuple[asyncio.Future, str]] = {}
260260
# Keyed by type
261261
self._curr_seqs: Dict[str, int] = {}
262262
# TODO(cretz): Any concerns about not sharing this? Maybe the types I
@@ -991,18 +991,31 @@ def _apply_resolve_request_cancel_external_workflow(
991991
self,
992992
job: temporalio.bridge.proto.workflow_activation.ResolveRequestCancelExternalWorkflow,
993993
) -> None:
994-
fut = self._pending_external_cancels.pop(job.seq, None)
995-
if not fut:
994+
pending = self._pending_external_cancels.pop(job.seq, None)
995+
if not pending:
996996
raise RuntimeError(
997997
f"Failed finding pending external cancel for sequence {job.seq}"
998998
)
999+
fut, target_workflow_id = pending
9991000
# We intentionally let this error if future is already done
10001001
if job.HasField("failure"):
1001-
# TODO: which workflow ID should be in serialization context?
1002+
# Use the target workflow's context when deserializing failures
1003+
context = temporalio.converter.WorkflowSerializationContext(
1004+
namespace=self._info.namespace,
1005+
workflow_id=target_workflow_id,
1006+
)
1007+
failure_converter = self._failure_converter
1008+
payload_converter = self._payload_converter
1009+
if isinstance(
1010+
failure_converter, temporalio.converter.WithSerializationContext
1011+
):
1012+
failure_converter = failure_converter.with_context(context)
1013+
if isinstance(
1014+
payload_converter, temporalio.converter.WithSerializationContext
1015+
):
1016+
payload_converter = payload_converter.with_context(context)
10021017
fut.set_exception(
1003-
self._failure_converter.from_failure(
1004-
job.failure, self._payload_converter
1005-
)
1018+
failure_converter.from_failure(job.failure, payload_converter)
10061019
)
10071020
else:
10081021
fut.set_result(None)
@@ -1011,18 +1024,31 @@ def _apply_resolve_signal_external_workflow(
10111024
self,
10121025
job: temporalio.bridge.proto.workflow_activation.ResolveSignalExternalWorkflow,
10131026
) -> None:
1014-
fut = self._pending_external_signals.pop(job.seq, None)
1015-
if not fut:
1027+
pending = self._pending_external_signals.pop(job.seq, None)
1028+
if not pending:
10161029
raise RuntimeError(
10171030
f"Failed finding pending external signal for sequence {job.seq}"
10181031
)
1032+
fut, target_workflow_id = pending
10191033
# We intentionally let this error if future is already done
10201034
if job.HasField("failure"):
1021-
# TODO: which workflow ID should be in serialization context?
1035+
# Use the target workflow's context when deserializing failures
1036+
context = temporalio.converter.WorkflowSerializationContext(
1037+
namespace=self._info.namespace,
1038+
workflow_id=target_workflow_id,
1039+
)
1040+
failure_converter = self._failure_converter
1041+
payload_converter = self._payload_converter
1042+
if isinstance(
1043+
failure_converter, temporalio.converter.WithSerializationContext
1044+
):
1045+
failure_converter = failure_converter.with_context(context)
1046+
if isinstance(
1047+
payload_converter, temporalio.converter.WithSerializationContext
1048+
):
1049+
payload_converter = payload_converter.with_context(context)
10221050
fut.set_exception(
1023-
self._failure_converter.from_failure(
1024-
job.failure, self._payload_converter
1025-
)
1051+
failure_converter.from_failure(job.failure, payload_converter)
10261052
)
10271053
else:
10281054
fut.set_result(None)
@@ -1870,10 +1896,15 @@ async def run_activity() -> Any:
18701896
async def _outbound_signal_child_workflow(
18711897
self, input: SignalChildWorkflowInput
18721898
) -> None:
1873-
# TODO: which workflow ID in serialization context?
1874-
payloads = (
1875-
self._payload_converter.to_payloads(input.args) if input.args else None
1899+
# Use the child workflow's context for serialization
1900+
context = temporalio.converter.WorkflowSerializationContext(
1901+
namespace=self._info.namespace,
1902+
workflow_id=input.child_workflow_id,
18761903
)
1904+
payload_converter = self._payload_converter
1905+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
1906+
payload_converter = payload_converter.with_context(context)
1907+
payloads = payload_converter.to_payloads(input.args) if input.args else None
18771908
command = self._add_command()
18781909
v = command.signal_external_workflow_execution
18791910
v.child_workflow_id = input.child_workflow_id
@@ -1887,10 +1918,15 @@ async def _outbound_signal_child_workflow(
18871918
async def _outbound_signal_external_workflow(
18881919
self, input: SignalExternalWorkflowInput
18891920
) -> None:
1890-
# TODO: which workflow ID in serialization context?
1891-
payloads = (
1892-
self._payload_converter.to_payloads(input.args) if input.args else None
1921+
# Use the target workflow's context for serialization
1922+
context = temporalio.converter.WorkflowSerializationContext(
1923+
namespace=input.namespace,
1924+
workflow_id=input.workflow_id,
18931925
)
1926+
payload_converter = self._payload_converter
1927+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
1928+
payload_converter = payload_converter.with_context(context)
1929+
payloads = payload_converter.to_payloads(input.args) if input.args else None
18941930
command = self._add_command()
18951931
v = command.signal_external_workflow_execution
18961932
v.workflow_execution.namespace = input.namespace
@@ -1924,7 +1960,10 @@ def apply_child_cancel_error() -> None:
19241960
# TODO(cretz): Nothing waits on this future, so how
19251961
# if at all should we report child-workflow cancel
19261962
# request failure?
1927-
self._pending_external_cancels[cancel_seq] = self.create_future()
1963+
self._pending_external_cancels[cancel_seq] = (
1964+
self.create_future(),
1965+
input.id,
1966+
)
19281967

19291968
# Function that runs in the handle
19301969
async def run_child() -> Any:
@@ -2030,8 +2069,9 @@ async def _cancel_external_workflow(
20302069
done_fut = self.create_future()
20312070
command.request_cancel_external_workflow_execution.seq = seq
20322071

2033-
# Set as pending
2034-
self._pending_external_cancels[seq] = done_fut
2072+
# Set as pending with the target workflow ID for later context use
2073+
target_workflow_id = command.request_cancel_external_workflow_execution.workflow_execution.workflow_id
2074+
self._pending_external_cancels[seq] = (done_fut, target_workflow_id)
20352075

20362076
# Wait until done (there is no cancelling a cancel request)
20372077
await done_fut
@@ -2329,8 +2369,13 @@ async def _signal_external_workflow(
23292369
done_fut = self.create_future()
23302370
command.signal_external_workflow_execution.seq = seq
23312371

2332-
# Set as pending
2333-
self._pending_external_signals[seq] = done_fut
2372+
# Set as pending with the target workflow ID for later context use
2373+
# Extract the workflow ID from the command
2374+
target_workflow_id = (
2375+
command.signal_external_workflow_execution.child_workflow_id
2376+
or command.signal_external_workflow_execution.workflow_execution.workflow_id
2377+
)
2378+
self._pending_external_signals[seq] = (done_fut, target_workflow_id)
23342379

23352380
# Wait until completed or cancelled
23362381
while True:

0 commit comments

Comments
 (0)