Skip to content

Commit 0cc178b

Browse files
committed
child workflow helper
1 parent 721f7d3 commit 0cc178b

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
211211
self._workflow_input: Optional[ExecuteWorkflowInput] = None
212212
self._info = det.info
213213

214+
# converters
214215
self._payload_converter = det.payload_converter_class()
216+
self._failure_converter = det.failure_converter_class()
215217
self._serialization_context = temporalio.converter.WorkflowSerializationContext(
216218
namespace=self._info.namespace,
217219
workflow_id=self._info.workflow_id,
@@ -222,8 +224,6 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
222224
self._payload_converter = self._payload_converter.with_context(
223225
self._serialization_context
224226
)
225-
226-
self._failure_converter = det.failure_converter_class()
227227
if isinstance(
228228
self._failure_converter, temporalio.converter.WithSerializationContext
229229
):
@@ -845,15 +845,21 @@ def _apply_resolve_child_workflow_execution(
845845
ret = ret_vals[0]
846846
handle._resolve_success(ret)
847847
elif job.result.HasField("failed"):
848+
failure_converter, payload_converter = self._child_workflow_converters(
849+
handle
850+
)
848851
handle._resolve_failure(
849-
self._failure_converter.from_failure(
850-
job.result.failed.failure, self._payload_converter
852+
failure_converter.from_failure(
853+
job.result.failed.failure, payload_converter
851854
)
852855
)
853856
elif job.result.HasField("cancelled"):
857+
failure_converter, payload_converter = self._child_workflow_converters(
858+
handle
859+
)
854860
handle._resolve_failure(
855-
self._failure_converter.from_failure(
856-
job.result.cancelled.failure, self._payload_converter
861+
failure_converter.from_failure(
862+
job.result.cancelled.failure, payload_converter
857863
)
858864
)
859865
else:
@@ -889,10 +895,11 @@ def _apply_resolve_child_workflow_execution_start(
889895
)
890896
elif job.HasField("cancelled"):
891897
self._pending_child_workflows.pop(job.seq)
898+
failure_converter, payload_converter = self._child_workflow_converters(
899+
handle
900+
)
892901
handle._resolve_failure(
893-
self._failure_converter.from_failure(
894-
job.cancelled.failure, self._payload_converter
895-
)
902+
failure_converter.from_failure(job.cancelled.failure, payload_converter)
896903
)
897904
else:
898905
raise RuntimeError("Child workflow start did not have a known status")
@@ -2042,6 +2049,27 @@ def _check_condition(self, fn: Callable[[], bool], fut: asyncio.Future) -> bool:
20422049
return True
20432050
return False
20442051

2052+
def _child_workflow_converters(
2053+
self, handle: "_ChildWorkflowHandle"
2054+
) -> Tuple[
2055+
temporalio.converter.FailureConverter, temporalio.converter.PayloadConverter
2056+
]:
2057+
"""Get failure and payload converters with child workflow context.
2058+
2059+
The context applied here uses the child workflow ID.
2060+
"""
2061+
context = temporalio.converter.WorkflowSerializationContext(
2062+
namespace=self._info.namespace,
2063+
workflow_id=handle._input.id,
2064+
)
2065+
failure_converter = self._failure_converter
2066+
payload_converter = self._payload_converter
2067+
if isinstance(failure_converter, temporalio.converter.WithSerializationContext):
2068+
failure_converter = failure_converter.with_context(context)
2069+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
2070+
payload_converter = payload_converter.with_context(context)
2071+
return failure_converter, payload_converter
2072+
20452073
def _convert_payloads(
20462074
self,
20472075
payloads: Sequence[temporalio.api.common.v1.Payload],

0 commit comments

Comments
 (0)