Skip to content

Commit 0b3e68e

Browse files
committed
Reuse and rename helper
1 parent 177dbd9 commit 0b3e68e

File tree

1 file changed

+32
-27
lines changed

1 file changed

+32
-27
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,14 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
210210
self._defn = det.defn
211211
self._workflow_input: Optional[ExecuteWorkflowInput] = None
212212
self._info = det.info
213-
self._payload_converter = det.payload_converter_class()
214-
self._failure_converter = det.failure_converter_class()
215-
self._payload_converter, self._failure_converter = self._converters(
216-
temporalio.converter.WorkflowSerializationContext(
217-
namespace=det.info.namespace,
218-
workflow_id=det.info.workflow_id,
213+
self._payload_converter, self._failure_converter = (
214+
self._converters_with_context(
215+
temporalio.converter.WorkflowSerializationContext(
216+
namespace=det.info.namespace,
217+
workflow_id=det.info.workflow_id,
218+
),
219+
det.payload_converter_class(),
220+
det.failure_converter_class(),
219221
)
220222
)
221223

@@ -762,7 +764,7 @@ def _apply_resolve_activity(
762764
handle = self._pending_activities.pop(job.seq, None)
763765
if not handle:
764766
raise RuntimeError(f"Failed finding activity handle for sequence {job.seq}")
765-
payload_converter, failure_converter = self._converters(
767+
payload_converter, failure_converter = self._converters_with_context(
766768
temporalio.converter.ActivitySerializationContext(
767769
namespace=self._info.namespace,
768770
workflow_id=self._info.workflow_id,
@@ -813,7 +815,7 @@ def _apply_resolve_child_workflow_execution(
813815
raise RuntimeError(
814816
f"Failed finding child workflow handle for sequence {job.seq}"
815817
)
816-
payload_converter, failure_converter = self._converters(
818+
payload_converter, failure_converter = self._converters_with_context(
817819
temporalio.converter.WorkflowSerializationContext(
818820
namespace=self._info.namespace,
819821
workflow_id=handle._input.id,
@@ -875,7 +877,7 @@ def _apply_resolve_child_workflow_execution_start(
875877
)
876878
elif job.HasField("cancelled"):
877879
self._pending_child_workflows.pop(job.seq)
878-
payload_converter, failure_converter = self._converters(
880+
payload_converter, failure_converter = self._converters_with_context(
879881
temporalio.converter.WorkflowSerializationContext(
880882
namespace=self._info.namespace,
881883
workflow_id=handle._input.id,
@@ -898,7 +900,7 @@ def _apply_resolve_nexus_operation_start(
898900
)
899901
# We not set a serialization context for nexus operations on the caller side because it is
900902
# not possible to do so on the handler side.
901-
payload_converter, failure_converter = self._converters(None)
903+
payload_converter, failure_converter = self._converters_with_context(None)
902904

903905
if job.HasField("operation_token"):
904906
# The nexus operation started asynchronously. A `ResolveNexusOperation` job
@@ -937,7 +939,7 @@ def _apply_resolve_nexus_operation(
937939

938940
# We not set a serialization context for nexus operations on the caller side because it is
939941
# not possible to do so on the handler side.
940-
payload_converter, failure_converter = self._converters(None)
942+
payload_converter, failure_converter = self._converters_with_context(None)
941943
# Handle the four oneof variants of NexusOperationResult
942944
result = job.result
943945
if result.HasField("completed"):
@@ -974,7 +976,7 @@ def _apply_resolve_request_cancel_external_workflow(
974976
fut, external_workflow_id = pending
975977
# We intentionally let this error if future is already done
976978
if job.HasField("failure"):
977-
payload_converter, failure_converter = self._converters(
979+
payload_converter, failure_converter = self._converters_with_context(
978980
temporalio.converter.WorkflowSerializationContext(
979981
namespace=self._info.namespace,
980982
workflow_id=external_workflow_id,
@@ -998,7 +1000,7 @@ def _apply_resolve_signal_external_workflow(
9981000
fut, external_workflow_id = pending
9991001
# We intentionally let this error if future is already done
10001002
if job.HasField("failure"):
1001-
payload_converter, failure_converter = self._converters(
1003+
payload_converter, failure_converter = self._converters_with_context(
10021004
temporalio.converter.WorkflowSerializationContext(
10031005
namespace=self._info.namespace,
10041006
workflow_id=external_workflow_id,
@@ -1853,7 +1855,7 @@ async def run_activity() -> Any:
18531855
async def _outbound_signal_child_workflow(
18541856
self, input: SignalChildWorkflowInput
18551857
) -> None:
1856-
payload_converter, _ = self._converters(
1858+
payload_converter, _ = self._converters_with_context(
18571859
temporalio.converter.WorkflowSerializationContext(
18581860
namespace=self._info.namespace,
18591861
workflow_id=input.child_workflow_id,
@@ -1873,7 +1875,7 @@ async def _outbound_signal_child_workflow(
18731875
async def _outbound_signal_external_workflow(
18741876
self, input: SignalExternalWorkflowInput
18751877
) -> None:
1876-
payload_converter, _ = self._converters(
1878+
payload_converter, _ = self._converters_with_context(
18771879
temporalio.converter.WorkflowSerializationContext(
18781880
namespace=input.namespace,
18791881
workflow_id=input.workflow_id,
@@ -2056,25 +2058,28 @@ def _convert_payloads(
20562058
raise
20572059
raise RuntimeError("Failed decoding arguments") from err
20582060

2059-
def _converters(
2060-
self, context: Optional[temporalio.converter.SerializationContext]
2061+
def _converters_with_context(
2062+
self,
2063+
context: Optional[temporalio.converter.SerializationContext],
2064+
base_payload_converter: Optional[temporalio.converter.PayloadConverter] = None,
2065+
base_failure_converter: Optional[temporalio.converter.FailureConverter] = None,
20612066
) -> Tuple[
20622067
temporalio.converter.PayloadConverter,
20632068
temporalio.converter.FailureConverter,
20642069
]:
20652070
"""Construct workflow payload and failure converters with the given context."""
2066-
payload_converter = self._payload_converter
2067-
failure_converter = self._failure_converter
2071+
base_payload_converter = base_payload_converter or self._payload_converter
2072+
base_failure_converter = base_failure_converter or self._failure_converter
20682073
if context:
20692074
if isinstance(
2070-
payload_converter, temporalio.converter.WithSerializationContext
2075+
base_payload_converter, temporalio.converter.WithSerializationContext
20712076
):
2072-
payload_converter = payload_converter.with_context(context)
2077+
base_payload_converter = base_payload_converter.with_context(context)
20732078
if isinstance(
2074-
failure_converter, temporalio.converter.WithSerializationContext
2079+
base_failure_converter, temporalio.converter.WithSerializationContext
20752080
):
2076-
failure_converter = failure_converter.with_context(context)
2077-
return payload_converter, failure_converter
2081+
base_failure_converter = base_failure_converter.with_context(context)
2082+
return base_payload_converter, base_failure_converter
20782083

20792084
def _instantiate_workflow_object(self) -> Any:
20802085
if not self._workflow_input:
@@ -2812,7 +2817,7 @@ def __init__(
28122817
self._result_fut = instance.create_future()
28132818
self._started = False
28142819
instance._register_task(self, name=f"activity: {input.activity}")
2815-
self._payload_converter, _ = self._instance._converters(
2820+
self._payload_converter, _ = self._instance._converters_with_context(
28162821
temporalio.converter.ActivitySerializationContext(
28172822
namespace=self._instance._info.namespace,
28182823
workflow_id=self._instance._info.workflow_id,
@@ -2973,7 +2978,7 @@ def __init__(
29732978
self._result_fut: asyncio.Future[Any] = instance.create_future()
29742979
self._first_execution_run_id = "<unknown>"
29752980
instance._register_task(self, name=f"child: {input.workflow}")
2976-
self._payload_converter, _ = self._instance._converters(
2981+
self._payload_converter, _ = self._instance._converters_with_context(
29772982
temporalio.converter.WorkflowSerializationContext(
29782983
namespace=self._instance._info.namespace,
29792984
workflow_id=self._input.id,
@@ -3163,7 +3168,7 @@ def __init__(
31633168
self._task = asyncio.Task(fn)
31643169
self._start_fut: asyncio.Future[Optional[str]] = instance.create_future()
31653170
self._result_fut: asyncio.Future[Optional[OutputT]] = instance.create_future()
3166-
self._payload_converter, _ = self._instance._converters(None)
3171+
self._payload_converter, _ = self._instance._converters_with_context(None)
31673172

31683173
@property
31693174
def operation_token(self) -> Optional[str]:

0 commit comments

Comments
 (0)