Skip to content

Commit 9c9a3fe

Browse files
committed
Cleanup: use converters from nexus / child workflow handles
1 parent 4a744fa commit 9c9a3fe

File tree

1 file changed

+29
-40
lines changed

1 file changed

+29
-40
lines changed

temporalio/worker/_workflow_instance.py

Lines changed: 29 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -836,33 +836,28 @@ def _apply_resolve_child_workflow_execution(
836836
raise RuntimeError(
837837
f"Failed finding child workflow handle for sequence {job.seq}"
838838
)
839-
payload_converter, failure_converter = self._converters_with_context(
840-
temporalio.converter.WorkflowSerializationContext(
841-
namespace=self._info.namespace,
842-
workflow_id=handle._input.id,
843-
)
844-
)
839+
845840
if job.result.HasField("completed"):
846841
ret: Optional[Any] = None
847842
if job.result.completed.HasField("result"):
848843
ret_types = [handle._input.ret_type] if handle._input.ret_type else None
849844
ret_vals = self._convert_payloads(
850845
[job.result.completed.result],
851846
ret_types,
852-
payload_converter,
847+
handle._payload_converter,
853848
)
854849
ret = ret_vals[0]
855850
handle._resolve_success(ret)
856851
elif job.result.HasField("failed"):
857852
handle._resolve_failure(
858-
failure_converter.from_failure(
859-
job.result.failed.failure, payload_converter
853+
handle._failure_converter.from_failure(
854+
job.result.failed.failure, handle._payload_converter
860855
)
861856
)
862857
elif job.result.HasField("cancelled"):
863858
handle._resolve_failure(
864-
failure_converter.from_failure(
865-
job.result.cancelled.failure, payload_converter
859+
handle._failure_converter.from_failure(
860+
job.result.cancelled.failure, handle._payload_converter
866861
)
867862
)
868863
else:
@@ -898,14 +893,10 @@ def _apply_resolve_child_workflow_execution_start(
898893
)
899894
elif job.HasField("cancelled"):
900895
self._pending_child_workflows.pop(job.seq)
901-
payload_converter, failure_converter = self._converters_with_context(
902-
temporalio.converter.WorkflowSerializationContext(
903-
namespace=self._info.namespace,
904-
workflow_id=handle._input.id,
905-
)
906-
)
907896
handle._resolve_failure(
908-
failure_converter.from_failure(job.cancelled.failure, payload_converter)
897+
handle._failure_converter.from_failure(
898+
job.cancelled.failure, handle._payload_converter
899+
)
909900
)
910901
else:
911902
raise RuntimeError("Child workflow start did not have a known status")
@@ -919,13 +910,6 @@ def _apply_resolve_nexus_operation_start(
919910
raise RuntimeError(
920911
f"Failed to find nexus operation handle for job sequence number {job.seq}"
921912
)
922-
# We don't set a serialization context for nexus operations on the caller side because it's
923-
# not possible to set a matching context on the handler side.
924-
payload_converter, failure_converter = (
925-
self._context_free_payload_converter,
926-
self._context_free_failure_converter,
927-
)
928-
929913
if job.HasField("operation_token"):
930914
# The nexus operation started asynchronously. A `ResolveNexusOperation` job
931915
# will follow in a future activation.
@@ -938,7 +922,9 @@ def _apply_resolve_nexus_operation_start(
938922
# The nexus operation start failed; no ResolveNexusOperation will follow.
939923
self._pending_nexus_operations.pop(job.seq, None)
940924
handle._resolve_failure(
941-
failure_converter.from_failure(job.failed, payload_converter)
925+
handle._failure_converter.from_failure(
926+
job.failed, handle._payload_converter
927+
)
942928
)
943929
else:
944930
raise ValueError(f"Unknown Nexus operation start status: {job}")
@@ -961,32 +947,32 @@ def _apply_resolve_nexus_operation(
961947
# completed / failed, but it has already been resolved.
962948
return
963949

964-
# We don't set a serialization context for nexus operations on the caller side because it is
965-
# not possible to set the same context on the handler side.
966-
payload_converter, failure_converter = (
967-
self._context_free_payload_converter,
968-
self._context_free_failure_converter,
969-
)
970950
# Handle the four oneof variants of NexusOperationResult
971951
result = job.result
972952
if result.HasField("completed"):
973953
[output] = self._convert_payloads(
974954
[result.completed],
975955
[handle._input.output_type] if handle._input.output_type else None,
976-
payload_converter,
956+
handle._payload_converter,
977957
)
978958
handle._resolve_success(output)
979959
elif result.HasField("failed"):
980960
handle._resolve_failure(
981-
failure_converter.from_failure(result.failed, payload_converter)
961+
handle._failure_converter.from_failure(
962+
result.failed, handle._payload_converter
963+
)
982964
)
983965
elif result.HasField("cancelled"):
984966
handle._resolve_failure(
985-
failure_converter.from_failure(result.cancelled, payload_converter)
967+
handle._failure_converter.from_failure(
968+
result.cancelled, handle._payload_converter
969+
)
986970
)
987971
elif result.HasField("timed_out"):
988972
handle._resolve_failure(
989-
failure_converter.from_failure(result.timed_out, payload_converter)
973+
handle._failure_converter.from_failure(
974+
result.timed_out, handle._payload_converter
975+
)
990976
)
991977
else:
992978
raise RuntimeError("Nexus operation did not have a result")
@@ -3083,10 +3069,12 @@ def __init__(
30833069
self._result_fut: asyncio.Future[Any] = instance.create_future()
30843070
self._first_execution_run_id = "<unknown>"
30853071
instance._register_task(self, name=f"child: {input.workflow}")
3086-
self._payload_converter, _ = self._instance._converters_with_context(
3087-
temporalio.converter.WorkflowSerializationContext(
3088-
namespace=self._instance._info.namespace,
3089-
workflow_id=self._input.id,
3072+
self._payload_converter, self._failure_converter = (
3073+
self._instance._converters_with_context(
3074+
temporalio.converter.WorkflowSerializationContext(
3075+
namespace=self._instance._info.namespace,
3076+
workflow_id=self._input.id,
3077+
)
30903078
)
30913079
)
30923080

@@ -3274,6 +3262,7 @@ def __init__(
32743262
self._start_fut: asyncio.Future[Optional[str]] = instance.create_future()
32753263
self._result_fut: asyncio.Future[Optional[OutputT]] = instance.create_future()
32763264
self._payload_converter = self._instance._context_free_payload_converter
3265+
self._failure_converter = self._instance._context_free_failure_converter
32773266

32783267
@property
32793268
def operation_token(self) -> Optional[str]:

0 commit comments

Comments
 (0)