Skip to content

Commit 48fb29d

Browse files
committed
Wire context for signal, query, and update
1 parent 3235a40 commit 48fb29d

File tree

2 files changed

+56
-8
lines changed

2 files changed

+56
-8
lines changed

temporalio/client.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import temporalio.service
6363
import temporalio.workflow
6464
from temporalio.activity import ActivityCancellationDetails
65+
from temporalio.converter import WorkflowSerializationContext
6566
from temporalio.service import (
6667
HttpConnectProxyConfig,
6768
KeepAliveConfig,
@@ -6062,8 +6063,14 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
60626063
)
60636064
req.query.query_type = input.query
60646065
if input.args:
6066+
context = WorkflowSerializationContext(
6067+
namespace=self._client.namespace,
6068+
workflow_id=input.id,
6069+
)
60656070
req.query.query_args.payloads.extend(
6066-
await self._client.data_converter.encode(input.args)
6071+
await self._client.data_converter._with_context(context).encode(
6072+
input.args
6073+
)
60676074
)
60686075
if input.headers is not None:
60696076
await self._apply_headers(input.headers, req.query.header.fields)
@@ -6087,7 +6094,11 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
60876094
if not resp.query_result.payloads:
60886095
return None
60896096
type_hints = [input.ret_type] if input.ret_type else None
6090-
results = await self._client.data_converter.decode(
6097+
context = WorkflowSerializationContext(
6098+
namespace=self._client.namespace,
6099+
workflow_id=input.id,
6100+
)
6101+
results = await self._client.data_converter._with_context(context).decode(
60916102
resp.query_result.payloads, type_hints
60926103
)
60936104
if not results:
@@ -6108,8 +6119,14 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
61086119
request_id=str(uuid.uuid4()),
61096120
)
61106121
if input.args:
6122+
context = temporalio.converter.WorkflowSerializationContext(
6123+
namespace=self._client.namespace,
6124+
workflow_id=input.id,
6125+
)
61116126
req.input.payloads.extend(
6112-
await self._client.data_converter.encode(input.args)
6127+
await self._client.data_converter._with_context(context).encode(
6128+
input.args
6129+
)
61136130
)
61146131
if input.headers is not None:
61156132
await self._apply_headers(input.headers, req.header.fields)

temporalio/worker/_workflow_instance.py

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -576,12 +576,17 @@ async def run_update() -> None:
576576
self._in_progress_updates[job.id] = HandlerExecution(
577577
job.name, defn.unfinished_policy, job.id
578578
)
579+
context = temporalio.converter.WorkflowSerializationContext(
580+
namespace=self._info.namespace,
581+
workflow_id=self._info.workflow_id,
582+
)
579583
args = self._process_handler_args(
580584
job.name,
581585
job.input,
582586
defn.name,
583587
defn.arg_types,
584588
defn.dynamic_vararg,
589+
context,
585590
)
586591
handler_input = HandleUpdateInput(
587592
id=job.id,
@@ -600,6 +605,7 @@ async def run_update() -> None:
600605
defn.name,
601606
defn.arg_types,
602607
defn.dynamic_vararg,
608+
context,
603609
)
604610
handler_input.args = args
605611

@@ -692,12 +698,17 @@ async def run_query() -> None:
692698
)
693699

694700
# Create input
701+
context = temporalio.converter.WorkflowSerializationContext(
702+
namespace=self._info.namespace,
703+
workflow_id=self._info.workflow_id,
704+
)
695705
args = self._process_handler_args(
696706
job.query_type,
697707
job.arguments,
698708
defn.name,
699709
defn.arg_types,
700710
defn.dynamic_vararg,
711+
context,
701712
)
702713
input = HandleQueryInput(
703714
id=job.query_id,
@@ -706,7 +717,12 @@ async def run_query() -> None:
706717
headers=job.headers,
707718
)
708719
success = await self._inbound.handle_query(input)
709-
result_payloads = self._payload_converter.to_payloads([success])
720+
converter = self._payload_converter
721+
if isinstance(
722+
converter, temporalio.converter.WithSerializationContext
723+
):
724+
converter = converter.with_context(context)
725+
result_payloads = converter.to_payloads([success])
710726
if len(result_payloads) != 1:
711727
raise ValueError(
712728
f"Expected 1 result payload, got {len(result_payloads)}"
@@ -716,11 +732,20 @@ async def run_query() -> None:
716732
command.respond_to_query.succeeded.response.CopyFrom(result_payloads[0])
717733
except Exception as err:
718734
try:
735+
context = temporalio.converter.WorkflowSerializationContext(
736+
namespace=self._info.namespace,
737+
workflow_id=self._info.workflow_id,
738+
)
739+
converter = self._payload_converter
740+
if isinstance(
741+
converter, temporalio.converter.WithSerializationContext
742+
):
743+
converter = converter.with_context(context)
719744
command = self._add_command()
720745
command.respond_to_query.query_id = job.query_id
721746
self._failure_converter.to_failure(
722747
err,
723-
self._payload_converter,
748+
converter,
724749
command.respond_to_query.failed,
725750
)
726751
except Exception as inner_err:
@@ -2106,35 +2131,41 @@ def _process_handler_args(
21062131
defn_name: Optional[str],
21072132
defn_arg_types: Optional[List[Type]],
21082133
defn_dynamic_vararg: bool,
2134+
context: temporalio.converter.SerializationContext,
21092135
) -> List[Any]:
21102136
# If dynamic old-style vararg, args become name + varargs of given arg
21112137
# types. If dynamic new-style raw value sequence, args become name +
21122138
# seq of raw values.
21132139
if not defn_name and defn_dynamic_vararg:
21142140
# Take off the string type hint for conversion
21152141
arg_types = defn_arg_types[1:] if defn_arg_types else None
2116-
return [job_name] + self._convert_payloads(job_input, arg_types)
2142+
return [job_name] + self._convert_payloads(job_input, arg_types, context)
21172143
if not defn_name:
21182144
return [
21192145
job_name,
21202146
self._convert_payloads(
2121-
job_input, [temporalio.common.RawValue] * len(job_input)
2147+
job_input, [temporalio.common.RawValue] * len(job_input), context
21222148
),
21232149
]
2124-
return self._convert_payloads(job_input, defn_arg_types)
2150+
return self._convert_payloads(job_input, defn_arg_types, context)
21252151

21262152
def _process_signal_job(
21272153
self,
21282154
defn: temporalio.workflow._SignalDefinition,
21292155
job: temporalio.bridge.proto.workflow_activation.SignalWorkflow,
21302156
) -> None:
21312157
try:
2158+
context = temporalio.converter.WorkflowSerializationContext(
2159+
namespace=self._info.namespace,
2160+
workflow_id=self._info.workflow_id,
2161+
)
21322162
args = self._process_handler_args(
21332163
job.signal_name,
21342164
job.input,
21352165
defn.name,
21362166
defn.arg_types,
21372167
defn.dynamic_vararg,
2168+
context,
21382169
)
21392170
except Exception:
21402171
logger.exception(

0 commit comments

Comments
 (0)