22
33from __future__ import annotations
44
5+ import abc
56import asyncio
67import copy
78import dataclasses
@@ -107,6 +108,7 @@ async def connect(
107108 namespace : str = "default" ,
108109 api_key : Optional [str ] = None ,
109110 data_converter : temporalio .converter .DataConverter = temporalio .converter .DataConverter .default ,
111+ plugins : Sequence [Plugin ] = [],
110112 interceptors : Sequence [Interceptor ] = [],
111113 default_workflow_query_reject_condition : Optional [
112114 temporalio .common .QueryRejectCondition
@@ -132,6 +134,14 @@ async def connect(
132134 metadata doesn't already have an "authorization" key.
133135 data_converter: Data converter to use for all data conversions
134136 to/from payloads.
137+ plugins: Set of plugins that are chained together to allow
138+ intercepting and modifying client creation and service connection.
139+ The earlier plugins wrap the later ones.
140+
141+ Any plugins that also implement
142+ :py:class:`temporalio.worker.Plugin` will be used as worker
143+ plugins too so they should not be given when creating a
144+ worker.
135145 interceptors: Set of interceptors that are chained together to allow
136146 intercepting of client calls. The earlier interceptors wrap the
137147 later ones.
@@ -178,13 +188,21 @@ async def connect(
178188 runtime = runtime ,
179189 http_connect_proxy_config = http_connect_proxy_config ,
180190 )
191+
192+ root_plugin : Plugin = _RootPlugin ()
193+ for plugin in reversed (plugins ):
194+ root_plugin = plugin .init_client_plugin (root_plugin )
195+
196+ service_client = await root_plugin .connect_service_client (connect_config )
197+
181198 return Client (
182- await temporalio . service . ServiceClient . connect ( connect_config ) ,
199+ service_client ,
183200 namespace = namespace ,
184201 data_converter = data_converter ,
185202 interceptors = interceptors ,
186203 default_workflow_query_reject_condition = default_workflow_query_reject_condition ,
187204 header_codec_behavior = header_codec_behavior ,
205+ plugins = plugins ,
188206 )
189207
190208 def __init__ (
@@ -193,6 +211,7 @@ def __init__(
193211 * ,
194212 namespace : str = "default" ,
195213 data_converter : temporalio .converter .DataConverter = temporalio .converter .DataConverter .default ,
214+ plugins : Sequence [Plugin ] = [],
196215 interceptors : Sequence [Interceptor ] = [],
197216 default_workflow_query_reject_condition : Optional [
198217 temporalio .common .QueryRejectCondition
@@ -203,21 +222,31 @@ def __init__(
203222
204223 See :py:meth:`connect` for details on the parameters.
205224 """
206- # Iterate over interceptors in reverse building the impl
207- self ._impl : OutboundInterceptor = _ClientImpl (self )
208- for interceptor in reversed (list (interceptors )):
209- self ._impl = interceptor .intercept_client (self ._impl )
210-
211225 # Store the config for tracking
212- self . _config = ClientConfig (
226+ config = ClientConfig (
213227 service_client = service_client ,
214228 namespace = namespace ,
215229 data_converter = data_converter ,
216230 interceptors = interceptors ,
217231 default_workflow_query_reject_condition = default_workflow_query_reject_condition ,
218232 header_codec_behavior = header_codec_behavior ,
233+ plugins = plugins ,
219234 )
220235
236+ root_plugin : Plugin = _RootPlugin ()
237+ for plugin in reversed (plugins ):
238+ root_plugin = plugin .init_client_plugin (root_plugin )
239+
240+ self ._init_from_config (root_plugin .configure_client (config ))
241+
242+ def _init_from_config (self , config : ClientConfig ):
243+ self ._config = config
244+
245+ # Iterate over interceptors in reverse building the impl
246+ self ._impl : OutboundInterceptor = _ClientImpl (self )
247+ for interceptor in reversed (list (self ._config ["interceptors" ])):
248+ self ._impl = interceptor .intercept_client (self ._impl )
249+
221250 def config (self ) -> ClientConfig :
222251 """Config, as a dictionary, used to create this client.
223252
@@ -1507,6 +1536,7 @@ class ClientConfig(TypedDict, total=False):
15071536 Optional [temporalio .common .QueryRejectCondition ]
15081537 ]
15091538 header_codec_behavior : Required [HeaderCodecBehavior ]
1539+ plugins : Required [Sequence [Plugin ]]
15101540
15111541
15121542class WorkflowHistoryEventFilterType (IntEnum ):
@@ -7357,3 +7387,81 @@ async def _decode_user_metadata(
73577387 if not metadata .HasField ("details" )
73587388 else (await converter .decode ([metadata .details ]))[0 ],
73597389 )
7390+
7391+
7392+ class Plugin (abc .ABC ):
7393+ """Base class for client plugins that can intercept and modify client behavior.
7394+
7395+ Plugins allow customization of client creation and service connection processes
7396+ through a chain of responsibility pattern. Each plugin can modify the client
7397+ configuration or intercept service client connections.
7398+
7399+ If the plugin is also a temporalio.worker.Plugin, it will additionally be propagated as a worker plugin.
7400+ You should likley not also provide it to the worker as that will result in the plugin being applied twice.
7401+ """
7402+
7403+ def name (self ) -> str :
7404+ """Get the name of this plugin. Can be overridden if desired to provide a more appropriate name.
7405+
7406+ Returns:
7407+ The fully qualified name of the plugin class (module.classname).
7408+ """
7409+ return type (self ).__module__ + "." + type (self ).__qualname__
7410+
7411+ def init_client_plugin (self , next : Plugin ) -> Plugin :
7412+ """Initialize this plugin in the plugin chain.
7413+
7414+ This method sets up the chain of responsibility pattern by storing a reference
7415+ to the next plugin in the chain. It is called during client creation to build
7416+ the plugin chain. Note, this may be called twice in the case of :py:meth:`connect`.
7417+
7418+ Args:
7419+ next: The next plugin in the chain to delegate to.
7420+
7421+ Returns:
7422+ This plugin instance for method chaining.
7423+ """
7424+ self .next_client_plugin = next
7425+ return self
7426+
7427+ def configure_client (self , config : ClientConfig ) -> ClientConfig :
7428+ """Hook called when creating a client to allow modification of configuration.
7429+
7430+ This method is called during client creation and allows plugins to modify
7431+ the client configuration before the client is fully initialized. Plugins
7432+ can add interceptors, modify connection parameters, or change other settings.
7433+
7434+ Args:
7435+ config: The client configuration dictionary to potentially modify.
7436+
7437+ Returns:
7438+ The modified client configuration.
7439+ """
7440+ return self .next_client_plugin .configure_client (config )
7441+
7442+ async def connect_service_client (
7443+ self , config : temporalio .service .ConnectConfig
7444+ ) -> temporalio .service .ServiceClient :
7445+ """Hook called when connecting to the Temporal service.
7446+
7447+ This method is called during service client connection and allows plugins
7448+ to intercept or modify the connection process. Plugins can modify connection
7449+ parameters, add authentication, or provide custom connection logic.
7450+
7451+ Args:
7452+ config: The service connection configuration.
7453+
7454+ Returns:
7455+ The connected service client.
7456+ """
7457+ return await self .next_client_plugin .connect_service_client (config )
7458+
7459+
7460+ class _RootPlugin (Plugin ):
7461+ def configure_client (self , config : ClientConfig ) -> ClientConfig :
7462+ return config
7463+
7464+ async def connect_service_client (
7465+ self , config : temporalio .service .ConnectConfig
7466+ ) -> temporalio .service .ServiceClient :
7467+ return await temporalio .service .ServiceClient .connect (config )
0 commit comments