Skip to content

Commit 0b90258

Browse files
committed
Rename converters for explicitness
1 parent 3bb9950 commit 0b90258

File tree

1 file changed

+52
-35
lines changed

1 file changed

+52
-35
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -232,12 +232,13 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
232232
self._info = det.info
233233
self._context_free_payload_converter = det.payload_converter_class()
234234
self._context_free_failure_converter = det.failure_converter_class()
235-
self._payload_converter, self._failure_converter = (
236-
self._converters_with_context(
237-
temporalio.converter.WorkflowSerializationContext(
238-
namespace=det.info.namespace,
239-
workflow_id=det.info.workflow_id,
240-
)
235+
(
236+
self._workflow_context_payload_converter,
237+
self._workflow_context_failure_converter,
238+
) = self._converters_with_context(
239+
temporalio.converter.WorkflowSerializationContext(
240+
namespace=det.info.namespace,
241+
workflow_id=det.info.workflow_id,
241242
)
242243
)
243244

@@ -495,9 +496,9 @@ def activate(
495496
# Set completion failure
496497
self._current_completion.failed.failure.SetInParent()
497498
try:
498-
self._failure_converter.to_failure(
499+
self._workflow_context_failure_converter.to_failure(
499500
activation_err,
500-
self._payload_converter,
501+
self._workflow_context_payload_converter,
501502
self._current_completion.failed.failure,
502503
)
503504
except Exception as inner_err:
@@ -639,7 +640,9 @@ async def run_update() -> None:
639640

640641
# Run the handler
641642
success = await self._inbound.handle_update_handler(handler_input)
642-
result_payloads = self._payload_converter.to_payloads([success])
643+
result_payloads = self._workflow_context_payload_converter.to_payloads(
644+
[success]
645+
)
643646
if len(result_payloads) != 1:
644647
raise ValueError(
645648
f"Expected 1 result payload, got {len(result_payloads)}"
@@ -671,9 +674,9 @@ async def run_update() -> None:
671674
job.protocol_instance_id
672675
)
673676
command.update_response.rejected.SetInParent()
674-
self._failure_converter.to_failure(
677+
self._workflow_context_failure_converter.to_failure(
675678
err,
676-
self._payload_converter,
679+
self._workflow_context_payload_converter,
677680
command.update_response.rejected,
678681
)
679682
else:
@@ -735,7 +738,9 @@ async def run_query() -> None:
735738
headers=job.headers,
736739
)
737740
success = await self._inbound.handle_query(input)
738-
result_payloads = self._payload_converter.to_payloads([success])
741+
result_payloads = (
742+
self._workflow_context_payload_converter.to_payloads([success])
743+
)
739744
if len(result_payloads) != 1:
740745
raise ValueError(
741746
f"Expected 1 result payload, got {len(result_payloads)}"
@@ -747,9 +752,9 @@ async def run_query() -> None:
747752
try:
748753
command = self._add_command()
749754
command.respond_to_query.query_id = job.query_id
750-
self._failure_converter.to_failure(
755+
self._workflow_context_failure_converter.to_failure(
751756
err,
752-
self._payload_converter,
757+
self._workflow_context_payload_converter,
753758
command.respond_to_query.failed,
754759
)
755760
except Exception as inner_err:
@@ -1055,7 +1060,9 @@ def _apply_initialize_workflow(
10551060
async def run_workflow(input: ExecuteWorkflowInput) -> None:
10561061
try:
10571062
result = await self._inbound.execute_workflow(input)
1058-
result_payloads = self._payload_converter.to_payloads([result])
1063+
result_payloads = self._workflow_context_payload_converter.to_payloads(
1064+
[result]
1065+
)
10591066
if len(result_payloads) != 1:
10601067
raise ValueError(
10611068
f"Expected 1 result payload, got {len(result_payloads)}"
@@ -1095,7 +1102,7 @@ def _make_workflow_input(
10951102
arg_types = [temporalio.common.RawValue] * len(init_job.arguments)
10961103

10971104
args = self._convert_payloads(
1098-
init_job.arguments, arg_types, self._payload_converter
1105+
init_job.arguments, arg_types, self._workflow_context_payload_converter
10991106
)
11001107
# Put args in a list if dynamic
11011108
if not self._defn.name:
@@ -1235,7 +1242,7 @@ def workflow_is_replaying(self) -> bool:
12351242
def workflow_memo(self) -> Mapping[str, Any]:
12361243
if self._untyped_converted_memo is None:
12371244
self._untyped_converted_memo = {
1238-
k: self._payload_converter.from_payload(v)
1245+
k: self._workflow_context_payload_converter.from_payload(v)
12391246
for k, v in self._info.raw_memo.items()
12401247
}
12411248
return self._untyped_converted_memo
@@ -1248,7 +1255,7 @@ def workflow_memo_value(
12481255
if default is temporalio.common._arg_unset:
12491256
raise KeyError(f"Memo does not have a value for key {key}")
12501257
return default
1251-
return self._payload_converter.from_payload(
1258+
return self._workflow_context_payload_converter.from_payload(
12521259
payload,
12531260
type_hint, # type: ignore[arg-type]
12541261
)
@@ -1262,7 +1269,9 @@ def workflow_upsert_memo(self, updates: Mapping[str, Any]) -> None:
12621269
# Intentionally not checking if memo exists, so that no-op removals show up in history too.
12631270
removals.append(k)
12641271
else:
1265-
update_payloads[k] = self._payload_converter.to_payload(v)
1272+
update_payloads[k] = (
1273+
self._workflow_context_payload_converter.to_payload(v)
1274+
)
12661275

12671276
if not update_payloads and not removals:
12681277
return
@@ -1281,16 +1290,16 @@ def workflow_upsert_memo(self, updates: Mapping[str, Any]) -> None:
12811290
mut_raw_memo[k] = v
12821291

12831292
if removals:
1284-
null_payload = self._payload_converter.to_payload(None)
1293+
null_payload = self._workflow_context_payload_converter.to_payload(None)
12851294
for k in removals:
12861295
fields[k].CopyFrom(null_payload)
12871296
mut_raw_memo.pop(k, None)
12881297

12891298
# Keeping deserialized memo dict in sync, if exists
12901299
if self._untyped_converted_memo is not None:
12911300
for k, v in update_payloads.items():
1292-
self._untyped_converted_memo[k] = self._payload_converter.from_payload(
1293-
v
1301+
self._untyped_converted_memo[k] = (
1302+
self._workflow_context_payload_converter.from_payload(v)
12941303
)
12951304
for k in removals:
12961305
self._untyped_converted_memo.pop(k, None)
@@ -1328,7 +1337,7 @@ def workflow_patch(self, id: str, *, deprecated: bool) -> bool:
13281337
return use_patch
13291338

13301339
def workflow_payload_converter(self) -> temporalio.converter.PayloadConverter:
1331-
return self._payload_converter
1340+
return self._workflow_context_payload_converter
13321341

13331342
def workflow_random(self) -> random.Random:
13341343
self._assert_not_read_only("random")
@@ -1723,7 +1732,7 @@ async def workflow_sleep(
17231732
) -> None:
17241733
user_metadata = (
17251734
temporalio.api.sdk.v1.UserMetadata(
1726-
summary=self._payload_converter.to_payload(summary)
1735+
summary=self._workflow_context_payload_converter.to_payload(summary)
17271736
)
17281737
if summary
17291738
else None
@@ -1748,7 +1757,9 @@ async def workflow_wait_condition(
17481757
self._conditions.append((fn, fut))
17491758
user_metadata = (
17501759
temporalio.api.sdk.v1.UserMetadata(
1751-
summary=self._payload_converter.to_payload(timeout_summary)
1760+
summary=self._workflow_context_payload_converter.to_payload(
1761+
timeout_summary
1762+
)
17521763
)
17531764
if timeout_summary
17541765
else None
@@ -1800,18 +1811,18 @@ def workflow_last_completion_result(
18001811
return None
18011812

18021813
if type_hint is None:
1803-
return self._payload_converter.from_payload(
1814+
return self._workflow_context_payload_converter.from_payload(
18041815
self._last_completion_result.payloads[0]
18051816
)
18061817
else:
1807-
return self._payload_converter.from_payload(
1818+
return self._workflow_context_payload_converter.from_payload(
18081819
self._last_completion_result.payloads[0], type_hint
18091820
)
18101821

18111822
def workflow_last_failure(self) -> Optional[BaseException]:
18121823
if self._last_failure:
1813-
return self._failure_converter.from_failure(
1814-
self._last_failure, self._payload_converter
1824+
return self._workflow_context_failure_converter.from_failure(
1825+
self._last_failure, self._workflow_context_payload_converter
18151826
)
18161827

18171828
return None
@@ -2256,19 +2267,19 @@ def _process_handler_args(
22562267
# Take off the string type hint for conversion
22572268
arg_types = defn_arg_types[1:] if defn_arg_types else None
22582269
return [job_name] + self._convert_payloads(
2259-
job_input, arg_types, self._payload_converter
2270+
job_input, arg_types, self._workflow_context_payload_converter
22602271
)
22612272
if not defn_name:
22622273
return [
22632274
job_name,
22642275
self._convert_payloads(
22652276
job_input,
22662277
[temporalio.common.RawValue] * len(job_input),
2267-
self._payload_converter,
2278+
self._workflow_context_payload_converter,
22682279
),
22692280
]
22702281
return self._convert_payloads(
2271-
job_input, defn_arg_types, self._payload_converter
2282+
job_input, defn_arg_types, self._workflow_context_payload_converter
22722283
)
22732284

22742285
def _process_signal_job(
@@ -2422,7 +2433,9 @@ def _set_workflow_failure(self, err: BaseException) -> None:
24222433
failure = self._add_command().fail_workflow_execution.failure
24232434
failure.SetInParent()
24242435
try:
2425-
self._failure_converter.to_failure(err, self._payload_converter, failure)
2436+
self._workflow_context_failure_converter.to_failure(
2437+
err, self._workflow_context_payload_converter, failure
2438+
)
24262439
except Exception as inner_err:
24272440
raise ValueError("Failed converting workflow exception") from inner_err
24282441

@@ -3329,13 +3342,17 @@ def __init__(
33293342
def _apply_command(self) -> None:
33303343
# Convert arguments before creating command in case it raises error
33313344
payloads = (
3332-
self._instance._payload_converter.to_payloads(self._input.args)
3345+
self._instance._workflow_context_payload_converter.to_payloads(
3346+
self._input.args
3347+
)
33333348
if self._input.args
33343349
else None
33353350
)
33363351
memo_payloads = (
33373352
{
3338-
k: self._instance._payload_converter.to_payloads([val])[0]
3353+
k: self._instance._workflow_context_payload_converter.to_payloads(
3354+
[val]
3355+
)[0]
33393356
for k, val in self._input.memo.items()
33403357
}
33413358
if self._input.memo

0 commit comments

Comments
 (0)