2727 Mapping ,
2828 Optional ,
2929 Sequence ,
30+ Text ,
3031 Tuple ,
3132 Type ,
3233 Union ,
3738import google .protobuf .duration_pb2
3839import google .protobuf .json_format
3940import google .protobuf .timestamp_pb2
41+ from google .protobuf .internal .containers import MessageMap
4042from typing_extensions import Concatenate , Required , TypedDict
4143
4244import temporalio .api .common .v1
6668 TLSConfig ,
6769)
6870
71+ from .common import HeaderCodecBehavior
6972from .types import (
7073 AnyType ,
7174 LocalReturnType ,
@@ -116,6 +119,7 @@ async def connect(
116119 lazy : bool = False ,
117120 runtime : Optional [temporalio .runtime .Runtime ] = None ,
118121 http_connect_proxy_config : Optional [HttpConnectProxyConfig ] = None ,
122+ header_codec_behavior : HeaderCodecBehavior = HeaderCodecBehavior .NO_CODEC ,
119123 ) -> Client :
120124 """Connect to a Temporal server.
121125
@@ -160,6 +164,7 @@ async def connect(
160164 used for workers.
161165 runtime: The runtime for this client, or the default if unset.
162166 http_connect_proxy_config: Configuration for HTTP CONNECT proxy.
167+ header_codec_behavior: Encoding behavior for headers sent by the client.
163168 """
164169 connect_config = temporalio .service .ConnectConfig (
165170 target_host = target_host ,
@@ -179,6 +184,7 @@ async def connect(
179184 data_converter = data_converter ,
180185 interceptors = interceptors ,
181186 default_workflow_query_reject_condition = default_workflow_query_reject_condition ,
187+ header_codec_behavior = header_codec_behavior ,
182188 )
183189
184190 def __init__ (
@@ -191,6 +197,7 @@ def __init__(
191197 default_workflow_query_reject_condition : Optional [
192198 temporalio .common .QueryRejectCondition
193199 ] = None ,
200+ header_codec_behavior : HeaderCodecBehavior = HeaderCodecBehavior .NO_CODEC ,
194201 ):
195202 """Create a Temporal client from a service client.
196203
@@ -208,6 +215,7 @@ def __init__(
208215 data_converter = data_converter ,
209216 interceptors = interceptors ,
210217 default_workflow_query_reject_condition = default_workflow_query_reject_condition ,
218+ header_codec_behavior = header_codec_behavior ,
211219 )
212220
213221 def config (self ) -> ClientConfig :
@@ -1501,6 +1509,7 @@ class ClientConfig(TypedDict, total=False):
15011509 default_workflow_query_reject_condition : Required [
15021510 Optional [temporalio .common .QueryRejectCondition ]
15031511 ]
1512+ header_codec_behavior : Required [HeaderCodecBehavior ]
15041513
15051514
15061515class WorkflowHistoryEventFilterType (IntEnum ):
@@ -3859,6 +3868,10 @@ class ScheduleActionStartWorkflow(ScheduleAction):
38593868 priority : temporalio .common .Priority
38603869
38613870 headers : Optional [Mapping [str , temporalio .api .common .v1 .Payload ]]
3871+ """
3872+ Headers may still be encoded by the payload codec if present.
3873+ """
3874+ _from_raw : bool = dataclasses .field (compare = False , init = False )
38623875
38633876 @staticmethod
38643877 def _from_proto ( # pyright: ignore
@@ -3985,6 +3998,7 @@ def __init__(
39853998 """
39863999 super ().__init__ ()
39874000 if raw_info :
4001+ self ._from_raw = True
39884002 # Ignore other fields
39894003 self .workflow = raw_info .workflow_type .name
39904004 self .args = raw_info .input .payloads if raw_info .input else []
@@ -4044,6 +4058,7 @@ def __init__(
40444058 else temporalio .common .Priority .default
40454059 )
40464060 else :
4061+ self ._from_raw = False
40474062 if not id :
40484063 raise ValueError ("ID required" )
40494064 if not task_queue :
@@ -4067,7 +4082,7 @@ def __init__(
40674082 self .memo = memo
40684083 self .typed_search_attributes = typed_search_attributes
40694084 self .untyped_search_attributes = untyped_search_attributes
4070- self .headers = headers
4085+ self .headers = headers # encode here
40714086 self .static_summary = static_summary
40724087 self .static_details = static_details
40734088 self .priority = priority
@@ -4145,8 +4160,12 @@ async def _to_proto(
41454160 self .typed_search_attributes , action .start_workflow .search_attributes
41464161 )
41474162 if self .headers :
4148- temporalio .common ._apply_headers (
4149- self .headers , action .start_workflow .header .fields
4163+ await _apply_headers (
4164+ self .headers ,
4165+ action .start_workflow .header .fields ,
4166+ client .config ()["header_codec_behavior" ] == HeaderCodecBehavior .CODEC
4167+ and not self ._from_raw ,
4168+ client .data_converter .payload_codec ,
41504169 )
41514170 return action
41524171
@@ -5920,7 +5939,7 @@ async def _populate_start_workflow_execution_request(
59205939 if input .start_delay is not None :
59215940 req .workflow_start_delay .FromTimedelta (input .start_delay )
59225941 if input .headers is not None :
5923- temporalio . common ._apply_headers (input .headers , req .header .fields )
5942+ await self ._apply_headers (input .headers , req .header .fields )
59245943 if input .priority is not None :
59255944 req .priority .CopyFrom (input .priority ._to_proto ())
59265945 if input .versioning_override is not None :
@@ -6006,7 +6025,7 @@ async def query_workflow(self, input: QueryWorkflowInput) -> Any:
60066025 await self ._client .data_converter .encode (input .args )
60076026 )
60086027 if input .headers is not None :
6009- temporalio . common ._apply_headers (input .headers , req .query .header .fields )
6028+ await self ._apply_headers (input .headers , req .query .header .fields )
60106029 try :
60116030 resp = await self ._client .workflow_service .query_workflow (
60126031 req , retry = True , metadata = input .rpc_metadata , timeout = input .rpc_timeout
@@ -6052,7 +6071,7 @@ async def signal_workflow(self, input: SignalWorkflowInput) -> None:
60526071 await self ._client .data_converter .encode (input .args )
60536072 )
60546073 if input .headers is not None :
6055- temporalio . common ._apply_headers (input .headers , req .header .fields )
6074+ await self ._apply_headers (input .headers , req .header .fields )
60566075 await self ._client .workflow_service .signal_workflow_execution (
60576076 req , retry = True , metadata = input .rpc_metadata , timeout = input .rpc_timeout
60586077 )
@@ -6163,9 +6182,7 @@ async def _build_update_workflow_execution_request(
61636182 await self ._client .data_converter .encode (input .args )
61646183 )
61656184 if input .headers is not None :
6166- temporalio .common ._apply_headers (
6167- input .headers , req .request .input .header .fields
6168- )
6185+ await self ._apply_headers (input .headers , req .request .input .header .fields )
61696186 return req
61706187
61716188 async def start_update_with_start_workflow (
@@ -6721,6 +6738,33 @@ async def get_worker_task_reachability(
67216738 )
67226739 return WorkerTaskReachability ._from_proto (resp )
67236740
6741+ async def _apply_headers (
6742+ self ,
6743+ source : Optional [Mapping [str , temporalio .api .common .v1 .Payload ]],
6744+ dest : MessageMap [Text , temporalio .api .common .v1 .Payload ],
6745+ ) -> None :
6746+ await _apply_headers (
6747+ source ,
6748+ dest ,
6749+ self ._client .config ()["header_codec_behavior" ] == HeaderCodecBehavior .CODEC ,
6750+ self ._client .data_converter .payload_codec ,
6751+ )
6752+
6753+
6754+ async def _apply_headers (
6755+ source : Optional [Mapping [str , temporalio .api .common .v1 .Payload ]],
6756+ dest : MessageMap [Text , temporalio .api .common .v1 .Payload ],
6757+ encode_headers : bool ,
6758+ codec : Optional [temporalio .converter .PayloadCodec ],
6759+ ) -> None :
6760+ if source is None :
6761+ return
6762+ if encode_headers and codec is not None :
6763+ for payload in source .values ():
6764+ new_payload = (await codec .encode ([payload ]))[0 ]
6765+ payload .CopyFrom (new_payload )
6766+ temporalio .common ._apply_headers (source , dest )
6767+
67246768
67256769def _history_from_json (
67266770 history : Union [str , Dict [str , Any ]],
0 commit comments