Skip to content

Commit ae4402c

Browse files
committed
Helper
1 parent 0cc178b commit ae4402c

File tree

1 file changed

+28
-19
lines changed

1 file changed

+28
-19
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -210,26 +210,13 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
210210
self._defn = det.defn
211211
self._workflow_input: Optional[ExecuteWorkflowInput] = None
212212
self._info = det.info
213-
214-
# converters
215-
self._payload_converter = det.payload_converter_class()
216-
self._failure_converter = det.failure_converter_class()
217-
self._serialization_context = temporalio.converter.WorkflowSerializationContext(
218-
namespace=self._info.namespace,
219-
workflow_id=self._info.workflow_id,
213+
(
214+
self._payload_converter,
215+
self._failure_converter,
216+
self._serialization_context,
217+
) = self._workflow_converters(
218+
det.payload_converter_class(), det.failure_converter_class()
220219
)
221-
if isinstance(
222-
self._payload_converter, temporalio.converter.WithSerializationContext
223-
):
224-
self._payload_converter = self._payload_converter.with_context(
225-
self._serialization_context
226-
)
227-
if isinstance(
228-
self._failure_converter, temporalio.converter.WithSerializationContext
229-
):
230-
self._failure_converter = self._failure_converter.with_context(
231-
self._serialization_context
232-
)
233220

234221
self._extern_functions = det.extern_functions
235222
self._disable_eager_activity_execution = det.disable_eager_activity_execution
@@ -2288,6 +2275,28 @@ def _run_once(self, *, check_conditions: bool) -> None:
22882275
finally:
22892276
asyncio._set_running_loop(None)
22902277

2278+
def _workflow_converters(
2279+
self,
2280+
payload_converter: temporalio.converter.PayloadConverter,
2281+
failure_converter: temporalio.converter.FailureConverter,
2282+
) -> Tuple[
2283+
temporalio.converter.PayloadConverter,
2284+
temporalio.converter.FailureConverter,
2285+
temporalio.converter.WorkflowSerializationContext,
2286+
]:
2287+
"""Get workflow failure and payload converters with workflow context.
2288+
2289+
The context applied here includes the workflow ID of this workflow."""
2290+
context = temporalio.converter.WorkflowSerializationContext(
2291+
namespace=self._info.namespace,
2292+
workflow_id=self._info.workflow_id,
2293+
)
2294+
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
2295+
payload_converter = payload_converter.with_context(context)
2296+
if isinstance(failure_converter, temporalio.converter.WithSerializationContext):
2297+
failure_converter = failure_converter.with_context(context)
2298+
return payload_converter, failure_converter, context
2299+
22912300
# This is used for the primary workflow function and signal handlers in
22922301
# order to apply common exception handling to each
22932302
async def _run_top_level_workflow_function(self, coro: Awaitable[None]) -> None:

0 commit comments

Comments
 (0)