Skip to content

Commit d7e13af

Browse files
committed
Fix external workflow signal test [AI]
1 parent 4c6127b commit d7e13af

File tree

1 file changed

+70
-25
lines changed

1 file changed

+70
-25
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,8 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
242242
self._pending_activities: Dict[int, _ActivityHandle] = {}
243243
self._pending_child_workflows: Dict[int, _ChildWorkflowHandle] = {}
244244
self._pending_nexus_operations: Dict[int, _NexusOperationHandle] = {}
245-
self._pending_external_signals: Dict[int, asyncio.Future] = {}
246-
self._pending_external_cancels: Dict[int, asyncio.Future] = {}
245+
self._pending_external_signals: Dict[int, Tuple[asyncio.Future, str]] = {}
246+
self._pending_external_cancels: Dict[int, Tuple[asyncio.Future, str]] = {}
247247
# Keyed by type
248248
self._curr_seqs: Dict[str, int] = {}
249249
# TODO(cretz): Any concerns about not sharing this? Maybe the types I
@@ -985,18 +985,31 @@ def _apply_resolve_request_cancel_external_workflow(
985985
self,
986986
job: temporalio.bridge.proto.workflow_activation.ResolveRequestCancelExternalWorkflow,
987987
) -> None:
988-
fut = self._pending_external_cancels.pop(job.seq, None)
989-
if not fut:
988+
pending = self._pending_external_cancels.pop(job.seq, None)
989+
if not pending:
990990
raise RuntimeError(
991991
f"Failed finding pending external cancel for sequence {job.seq}"
992992
)
993+
fut, target_workflow_id = pending
993994
# We intentionally let this error if future is already done
994995
if job.HasField("failure"):
995-
# TODO: which workflow ID should be in serialization context?
996+
# Use the target workflow's context when deserializing failures
997+
context = temporalio.converter.WorkflowSerializationContext(
998+
namespace=self._info.namespace,
999+
workflow_id=target_workflow_id,
1000+
)
1001+
failure_converter = self._failure_converter
1002+
payload_converter = self._payload_converter
1003+
if isinstance(
1004+
failure_converter, temporalio.converter.WithSerializationContext
1005+
):
1006+
failure_converter = failure_converter.with_context(context)
1007+
if isinstance(
1008+
payload_converter, temporalio.converter.WithSerializationContext
1009+
):
1010+
payload_converter = payload_converter.with_context(context)
9961011
fut.set_exception(
997-
self._failure_converter.from_failure(
998-
job.failure, self._payload_converter
999-
)
1012+
failure_converter.from_failure(job.failure, payload_converter)
10001013
)
10011014
else:
10021015
fut.set_result(None)
@@ -1005,18 +1018,31 @@ def _apply_resolve_signal_external_workflow(
10051018
self,
10061019
job: temporalio.bridge.proto.workflow_activation.ResolveSignalExternalWorkflow,
10071020
) -> None:
1008-
fut = self._pending_external_signals.pop(job.seq, None)
1009-
if not fut:
1021+
pending = self._pending_external_signals.pop(job.seq, None)
1022+
if not pending:
10101023
raise RuntimeError(
10111024
f"Failed finding pending external signal for sequence {job.seq}"
10121025
)
1026+
fut, target_workflow_id = pending
10131027
# We intentionally let this error if future is already done
10141028
if job.HasField("failure"):
1015-
# TODO: which workflow ID should be in serialization context?
1029+
# Use the target workflow's context when deserializing failures
1030+
context = temporalio.converter.WorkflowSerializationContext(
1031+
namespace=self._info.namespace,
1032+
workflow_id=target_workflow_id,
1033+
)
1034+
failure_converter = self._failure_converter
1035+
payload_converter = self._payload_converter
1036+
if isinstance(
1037+
failure_converter, temporalio.converter.WithSerializationContext
1038+
):
1039+
failure_converter = failure_converter.with_context(context)
1040+
if isinstance(
1041+
payload_converter, temporalio.converter.WithSerializationContext
1042+
):
1043+
payload_converter = payload_converter.with_context(context)
10161044
fut.set_exception(
1017-
self._failure_converter.from_failure(
1018-
job.failure, self._payload_converter
1019-
)
1045+
failure_converter.from_failure(job.failure, payload_converter)
10201046
)
10211047
else:
10221048
fut.set_result(None)
@@ -1864,10 +1890,15 @@ async def run_activity() -> Any:
18641890
async def _outbound_signal_child_workflow(
18651891
self, input: SignalChildWorkflowInput
18661892
) -> None:
1867-
# TODO: which workflow ID in serialization context?
1868-
payloads = (
1869-
self._payload_converter.to_payloads(input.args) if input.args else None
1893+
# Use the child workflow's context for serialization
1894+
context = temporalio.converter.WorkflowSerializationContext(
1895+
namespace=self._info.namespace,
1896+
workflow_id=input.child_workflow_id,
18701897
)
1898+
payload_converter = self._payload_converter
1899+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
1900+
payload_converter = payload_converter.with_context(context)
1901+
payloads = payload_converter.to_payloads(input.args) if input.args else None
18711902
command = self._add_command()
18721903
v = command.signal_external_workflow_execution
18731904
v.child_workflow_id = input.child_workflow_id
@@ -1881,10 +1912,15 @@ async def _outbound_signal_child_workflow(
18811912
async def _outbound_signal_external_workflow(
18821913
self, input: SignalExternalWorkflowInput
18831914
) -> None:
1884-
# TODO: which workflow ID in serialization context?
1885-
payloads = (
1886-
self._payload_converter.to_payloads(input.args) if input.args else None
1915+
# Use the target workflow's context for serialization
1916+
context = temporalio.converter.WorkflowSerializationContext(
1917+
namespace=input.namespace,
1918+
workflow_id=input.workflow_id,
18871919
)
1920+
payload_converter = self._payload_converter
1921+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
1922+
payload_converter = payload_converter.with_context(context)
1923+
payloads = payload_converter.to_payloads(input.args) if input.args else None
18881924
command = self._add_command()
18891925
v = command.signal_external_workflow_execution
18901926
v.workflow_execution.namespace = input.namespace
@@ -1918,7 +1954,10 @@ def apply_child_cancel_error() -> None:
19181954
# TODO(cretz): Nothing waits on this future, so how
19191955
# if at all should we report child-workflow cancel
19201956
# request failure?
1921-
self._pending_external_cancels[cancel_seq] = self.create_future()
1957+
self._pending_external_cancels[cancel_seq] = (
1958+
self.create_future(),
1959+
input.id,
1960+
)
19221961

19231962
# Function that runs in the handle
19241963
async def run_child() -> Any:
@@ -2024,8 +2063,9 @@ async def _cancel_external_workflow(
20242063
done_fut = self.create_future()
20252064
command.request_cancel_external_workflow_execution.seq = seq
20262065

2027-
# Set as pending
2028-
self._pending_external_cancels[seq] = done_fut
2066+
# Set as pending with the target workflow ID for later context use
2067+
target_workflow_id = command.request_cancel_external_workflow_execution.workflow_execution.workflow_id
2068+
self._pending_external_cancels[seq] = (done_fut, target_workflow_id)
20292069

20302070
# Wait until done (there is no cancelling a cancel request)
20312071
await done_fut
@@ -2366,8 +2406,13 @@ async def _signal_external_workflow(
23662406
done_fut = self.create_future()
23672407
command.signal_external_workflow_execution.seq = seq
23682408

2369-
# Set as pending
2370-
self._pending_external_signals[seq] = done_fut
2409+
# Set as pending with the target workflow ID for later context use
2410+
# Extract the workflow ID from the command
2411+
target_workflow_id = (
2412+
command.signal_external_workflow_execution.child_workflow_id
2413+
or command.signal_external_workflow_execution.workflow_execution.workflow_id
2414+
)
2415+
self._pending_external_signals[seq] = (done_fut, target_workflow_id)
23712416

23722417
# Wait until completed or cancelled
23732418
while True:

0 commit comments

Comments
 (0)