Skip to content

Commit a8e3c67

Browse files
committed
Reduce unnecessary instantiations
1 parent 09be82e commit a8e3c67

File tree

1 file changed

+70
-58
lines changed

1 file changed

+70
-58
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -229,14 +229,15 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
229229
self._info = det.info
230230
self._context_free_payload_converter = det.payload_converter_class()
231231
self._context_free_failure_converter = det.failure_converter_class()
232-
(
233-
self._workflow_context_payload_converter,
234-
self._workflow_context_failure_converter,
235-
) = self._converters_with_context(
236-
temporalio.converter.WorkflowSerializationContext(
237-
namespace=det.info.namespace,
238-
workflow_id=det.info.workflow_id,
239-
)
232+
workflow_context = temporalio.converter.WorkflowSerializationContext(
233+
namespace=det.info.namespace,
234+
workflow_id=det.info.workflow_id,
235+
)
236+
self._workflow_context_payload_converter = self._payload_converter_with_context(
237+
workflow_context
238+
)
239+
self._workflow_context_failure_converter = self._failure_converter_with_context(
240+
workflow_context
240241
)
241242

242243
self._extern_functions = det.extern_functions
@@ -785,20 +786,20 @@ def _apply_resolve_activity(
785786
handle = self._pending_activities.pop(job.seq, None)
786787
if not handle:
787788
raise RuntimeError(f"Failed finding activity handle for sequence {job.seq}")
788-
payload_converter, failure_converter = self._converters_with_context(
789-
temporalio.converter.ActivitySerializationContext(
790-
namespace=self._info.namespace,
791-
workflow_id=self._info.workflow_id,
792-
workflow_type=self._info.workflow_type,
793-
activity_type=handle._input.activity,
794-
activity_task_queue=(
795-
handle._input.task_queue or self._info.task_queue
796-
if isinstance(handle._input, StartActivityInput)
797-
else self._info.task_queue
798-
),
799-
is_local=isinstance(handle._input, StartLocalActivityInput),
800-
)
801-
)
789+
activity_context = temporalio.converter.ActivitySerializationContext(
790+
namespace=self._info.namespace,
791+
workflow_id=self._info.workflow_id,
792+
workflow_type=self._info.workflow_type,
793+
activity_type=handle._input.activity,
794+
activity_task_queue=(
795+
handle._input.task_queue or self._info.task_queue
796+
if isinstance(handle._input, StartActivityInput)
797+
else self._info.task_queue
798+
),
799+
is_local=isinstance(handle._input, StartLocalActivityInput),
800+
)
801+
payload_converter = self._payload_converter_with_context(activity_context)
802+
failure_converter = self._failure_converter_with_context(activity_context)
802803
if job.result.HasField("completed"):
803804
ret: Optional[Any] = None
804805
if job.result.completed.HasField("result"):
@@ -989,12 +990,12 @@ def _apply_resolve_request_cancel_external_workflow(
989990
fut, external_workflow_id = pending
990991
# We intentionally let this error if future is already done
991992
if job.HasField("failure"):
992-
payload_converter, failure_converter = self._converters_with_context(
993-
temporalio.converter.WorkflowSerializationContext(
994-
namespace=self._info.namespace,
995-
workflow_id=external_workflow_id,
996-
)
993+
workflow_context = temporalio.converter.WorkflowSerializationContext(
994+
namespace=self._info.namespace,
995+
workflow_id=external_workflow_id,
997996
)
997+
payload_converter = self._payload_converter_with_context(workflow_context)
998+
failure_converter = self._failure_converter_with_context(workflow_context)
998999
fut.set_exception(
9991000
failure_converter.from_failure(job.failure, payload_converter)
10001001
)
@@ -1013,12 +1014,12 @@ def _apply_resolve_signal_external_workflow(
10131014
fut, external_workflow_id = pending
10141015
# We intentionally let this error if future is already done
10151016
if job.HasField("failure"):
1016-
payload_converter, failure_converter = self._converters_with_context(
1017-
temporalio.converter.WorkflowSerializationContext(
1018-
namespace=self._info.namespace,
1019-
workflow_id=external_workflow_id,
1020-
)
1017+
workflow_context = temporalio.converter.WorkflowSerializationContext(
1018+
namespace=self._info.namespace,
1019+
workflow_id=external_workflow_id,
10211020
)
1021+
payload_converter = self._payload_converter_with_context(workflow_context)
1022+
failure_converter = self._failure_converter_with_context(workflow_context)
10221023
fut.set_exception(
10231024
failure_converter.from_failure(job.failure, payload_converter)
10241025
)
@@ -1874,7 +1875,7 @@ async def run_activity() -> Any:
18741875
async def _outbound_signal_child_workflow(
18751876
self, input: SignalChildWorkflowInput
18761877
) -> None:
1877-
payload_converter, _ = self._converters_with_context(
1878+
payload_converter = self._payload_converter_with_context(
18781879
temporalio.converter.WorkflowSerializationContext(
18791880
namespace=self._info.namespace,
18801881
workflow_id=input.child_workflow_id,
@@ -1894,7 +1895,7 @@ async def _outbound_signal_child_workflow(
18941895
async def _outbound_signal_external_workflow(
18951896
self, input: SignalExternalWorkflowInput
18961897
) -> None:
1897-
payload_converter, _ = self._converters_with_context(
1898+
payload_converter = self._payload_converter_with_context(
18981899
temporalio.converter.WorkflowSerializationContext(
18991900
namespace=input.namespace,
19001901
workflow_id=input.workflow_id,
@@ -2077,39 +2078,45 @@ def _convert_payloads(
20772078
raise
20782079
raise RuntimeError("Failed decoding arguments") from err
20792080

2080-
def _converters_with_context(
2081+
def _payload_converter_with_context(
20812082
self,
20822083
context: temporalio.converter.SerializationContext,
2083-
) -> Tuple[
2084-
temporalio.converter.PayloadConverter,
2085-
temporalio.converter.FailureConverter,
2086-
]:
2087-
"""Construct workflow payload and failure converters with the given context.
2084+
) -> temporalio.converter.PayloadConverter:
2085+
"""Construct workflow payload converter with the given context.
20882086
20892087
This plays a similar role to DataConverter._with_context, but operates on PayloadConverter
2090-
and FailureConverter only (since payload encoding/decoding is done by the worker, outside
2091-
the workflow sandbox).
2088+
only (payload encoding/decoding is done by the worker, outside the workflow sandbox).
20922089
"""
20932090
payload_converter = self._context_free_payload_converter
2094-
failure_converter = self._context_free_failure_converter
20952091
if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
20962092
payload_converter = payload_converter.with_context(context)
2093+
return payload_converter
2094+
2095+
def _failure_converter_with_context(
2096+
self,
2097+
context: temporalio.converter.SerializationContext,
2098+
) -> temporalio.converter.FailureConverter:
2099+
"""Construct workflow failure converter with the given context.
2100+
2101+
This plays a similar role to DataConverter._with_context, but operates on FailureConverter
2102+
only (payload encoding/decoding is done by the worker, outside the workflow sandbox).
2103+
"""
2104+
failure_converter = self._context_free_failure_converter
20972105
if isinstance(failure_converter, temporalio.converter.WithSerializationContext):
20982106
failure_converter = failure_converter.with_context(context)
2099-
return payload_converter, failure_converter
2107+
return failure_converter
21002108

21012109
def get_serialization_context(
21022110
self,
21032111
command_info: Optional[_command_aware_visitor.CommandInfo],
21042112
) -> Optional[temporalio.converter.SerializationContext]:
2105-
workflow_context = temporalio.converter.WorkflowSerializationContext(
2106-
namespace=self._info.namespace,
2107-
workflow_id=self._info.workflow_id,
2108-
)
21092113
if command_info is None:
21102114
# Use payload codec with workflow context by default (i.e. for payloads not associated
21112115
# with a pending command)
2112-
return workflow_context
2116+
return temporalio.converter.WorkflowSerializationContext(
2117+
namespace=self._info.namespace,
2118+
workflow_id=self._info.workflow_id,
2119+
)
21132120

21142121
if (
21152122
command_info.command_type
@@ -2170,7 +2177,10 @@ def get_serialization_context(
21702177

21712178
else:
21722179
# Use payload codec with workflow context for all other payloads
2173-
return workflow_context
2180+
return temporalio.converter.WorkflowSerializationContext(
2181+
namespace=self._info.namespace,
2182+
workflow_id=self._info.workflow_id,
2183+
)
21742184

21752185
def _instantiate_workflow_object(self) -> Any:
21762186
if not self._workflow_input:
@@ -2908,7 +2918,7 @@ def __init__(
29082918
self._result_fut = instance.create_future()
29092919
self._started = False
29102920
instance._register_task(self, name=f"activity: {input.activity}")
2911-
self._payload_converter, _ = self._instance._converters_with_context(
2921+
self._payload_converter = self._instance._payload_converter_with_context(
29122922
temporalio.converter.ActivitySerializationContext(
29132923
namespace=self._instance._info.namespace,
29142924
workflow_id=self._instance._info.workflow_id,
@@ -3069,13 +3079,15 @@ def __init__(
30693079
self._result_fut: asyncio.Future[Any] = instance.create_future()
30703080
self._first_execution_run_id = "<unknown>"
30713081
instance._register_task(self, name=f"child: {input.workflow}")
3072-
self._payload_converter, self._failure_converter = (
3073-
self._instance._converters_with_context(
3074-
temporalio.converter.WorkflowSerializationContext(
3075-
namespace=self._instance._info.namespace,
3076-
workflow_id=self._input.id,
3077-
)
3078-
)
3082+
workflow_context = temporalio.converter.WorkflowSerializationContext(
3083+
namespace=self._instance._info.namespace,
3084+
workflow_id=self._input.id,
3085+
)
3086+
self._payload_converter = self._instance._payload_converter_with_context(
3087+
workflow_context
3088+
)
3089+
self._failure_converter = self._instance._failure_converter_with_context(
3090+
workflow_context
30793091
)
30803092

30813093
@property

0 commit comments

Comments
 (0)