Skip to content

Commit 126bcd8

Browse files
authored
Plugin Support (#952)
* Initial rough framework for plugins * Ensure plugin modification happen before any other initialization * Format * Use local tuner for type inference * Format * Some PR updates, refactoring tests, added name * Added docstrings * Update readme * Change on_*_create to configure_* * Update README * Fix merge problem * Make plugins inherit from ABC, warn about double initialization * Make it easier to modify sandbox restrictions * Freeze sandbox runner * Freeze sandbox runner
1 parent 60f67d9 commit 126bcd8

File tree

8 files changed

+631
-115
lines changed

8 files changed

+631
-115
lines changed

README.md

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ informal introduction to the features and their implementation.
9696
- [Testing](#testing-1)
9797
- [Interceptors](#interceptors)
9898
- [Nexus](#nexus)
99+
- [Plugins](#plugins)
100+
- [Client Plugins](#client-plugins)
101+
- [Worker Plugins](#worker-plugins)
99102
- [Workflow Replay](#workflow-replay)
100103
- [Observability](#observability)
101104
- [Metrics](#metrics)
@@ -1482,6 +1485,140 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus).
14821485
```
14831486
14841487
1488+
### Plugins
1489+
1490+
Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of
1491+
responsibility pattern. They allow you to intercept and modify client creation, service connections, worker
1492+
configuration, and worker execution. Common customizations may include but are not limited to:
1493+
1494+
1. DataConverter
1495+
2. Activities
1496+
3. Workflows
1497+
4. Interceptors
1498+
1499+
A single plugin class can implement both client and worker plugin interfaces to share common logic between both
1500+
contexts. When used with a client, it will automatically be propagated to any workers created with that client.
1501+
1502+
#### Client Plugins
1503+
1504+
Client plugins can intercept and modify client configuration and service connections. They are useful for adding
1505+
authentication, modifying connection parameters, or adding custom behavior during client creation.
1506+
1507+
Here's an example of a client plugin that adds custom authentication:
1508+
1509+
```python
1510+
from temporalio.client import Plugin, ClientConfig
1511+
import temporalio.service
1512+
1513+
class AuthenticationPlugin(Plugin):
1514+
def __init__(self, api_key: str):
1515+
self.api_key = api_key
1516+
1517+
def configure_client(self, config: ClientConfig) -> ClientConfig:
1518+
# Modify client configuration
1519+
config["namespace"] = "my-secure-namespace"
1520+
return super().configure_client(config)
1521+
1522+
async def connect_service_client(
1523+
self, config: temporalio.service.ConnectConfig
1524+
) -> temporalio.service.ServiceClient:
1525+
# Add authentication to the connection
1526+
config.api_key = self.api_key
1527+
return await super().connect_service_client(config)
1528+
1529+
# Use the plugin when connecting
1530+
client = await Client.connect(
1531+
"my-server.com:7233",
1532+
plugins=[AuthenticationPlugin("my-api-key")]
1533+
)
1534+
```
1535+
1536+
#### Worker Plugins
1537+
1538+
Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
1539+
custom lifecycle management, or modifying worker settings.
1540+
1541+
Here's an example of a worker plugin that adds custom monitoring:
1542+
1543+
```python
1544+
from temporalio.worker import Plugin, WorkerConfig, Worker
1545+
import logging
1546+
1547+
class MonitoringPlugin(Plugin):
1548+
def __init__(self):
1549+
self.logger = logging.getLogger(__name__)
1550+
1551+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1552+
# Modify worker configuration
1553+
original_task_queue = config["task_queue"]
1554+
config["task_queue"] = f"monitored-{original_task_queue}"
1555+
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
1556+
return super().configure_worker(config)
1557+
1558+
async def run_worker(self, worker: Worker) -> None:
1559+
self.logger.info("Starting worker execution")
1560+
try:
1561+
await super().run_worker(worker)
1562+
finally:
1563+
self.logger.info("Worker execution completed")
1564+
1565+
# Use the plugin when creating a worker
1566+
worker = Worker(
1567+
client,
1568+
task_queue="my-task-queue",
1569+
workflows=[MyWorkflow],
1570+
activities=[my_activity],
1571+
plugins=[MonitoringPlugin()]
1572+
)
1573+
```
1574+
1575+
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
1576+
1577+
```python
1578+
from temporalio.client import Plugin as ClientPlugin, ClientConfig
1579+
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
1580+
1581+
1582+
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1583+
def configure_client(self, config: ClientConfig) -> ClientConfig:
1584+
# Client-side customization
1585+
config["namespace"] = "unified-namespace"
1586+
return super().configure_client(config)
1587+
1588+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1589+
# Worker-side customization
1590+
config["max_cached_workflows"] = 500
1591+
return super().configure_worker(config)
1592+
1593+
async def run_worker(self, worker: Worker) -> None:
1594+
print("Starting unified worker")
1595+
await super().run_worker(worker)
1596+
1597+
1598+
# Create client with the unified plugin
1599+
client = await Client.connect(
1600+
"localhost:7233",
1601+
plugins=[UnifiedPlugin()]
1602+
)
1603+
1604+
# Worker will automatically inherit the plugin from the client
1605+
worker = Worker(
1606+
client,
1607+
task_queue="my-task-queue",
1608+
workflows=[MyWorkflow],
1609+
activities=[my_activity]
1610+
)
1611+
```
1612+
1613+
**Important Notes:**
1614+
1615+
- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
1616+
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
1617+
- Avoid providing the same plugin to both client and worker to prevent double execution
1618+
- Plugin methods should call `super()` to maintain the plugin chain
1619+
- Each plugin's `name()` method returns a unique identifier for debugging purposes
1620+
1621+
14851622
### Workflow Replay
14861623

14871624
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

temporalio/client.py

Lines changed: 115 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
import abc
56
import asyncio
67
import copy
78
import dataclasses
@@ -107,6 +108,7 @@ async def connect(
107108
namespace: str = "default",
108109
api_key: Optional[str] = None,
109110
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
111+
plugins: Sequence[Plugin] = [],
110112
interceptors: Sequence[Interceptor] = [],
111113
default_workflow_query_reject_condition: Optional[
112114
temporalio.common.QueryRejectCondition
@@ -132,6 +134,14 @@ async def connect(
132134
metadata doesn't already have an "authorization" key.
133135
data_converter: Data converter to use for all data conversions
134136
to/from payloads.
137+
plugins: Set of plugins that are chained together to allow
138+
intercepting and modifying client creation and service connection.
139+
The earlier plugins wrap the later ones.
140+
141+
Any plugins that also implement
142+
:py:class:`temporalio.worker.Plugin` will be used as worker
143+
plugins too so they should not be given when creating a
144+
worker.
135145
interceptors: Set of interceptors that are chained together to allow
136146
intercepting of client calls. The earlier interceptors wrap the
137147
later ones.
@@ -178,13 +188,21 @@ async def connect(
178188
runtime=runtime,
179189
http_connect_proxy_config=http_connect_proxy_config,
180190
)
191+
192+
root_plugin: Plugin = _RootPlugin()
193+
for plugin in reversed(plugins):
194+
root_plugin = plugin.init_client_plugin(root_plugin)
195+
196+
service_client = await root_plugin.connect_service_client(connect_config)
197+
181198
return Client(
182-
await temporalio.service.ServiceClient.connect(connect_config),
199+
service_client,
183200
namespace=namespace,
184201
data_converter=data_converter,
185202
interceptors=interceptors,
186203
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
187204
header_codec_behavior=header_codec_behavior,
205+
plugins=plugins,
188206
)
189207

190208
def __init__(
@@ -193,6 +211,7 @@ def __init__(
193211
*,
194212
namespace: str = "default",
195213
data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default,
214+
plugins: Sequence[Plugin] = [],
196215
interceptors: Sequence[Interceptor] = [],
197216
default_workflow_query_reject_condition: Optional[
198217
temporalio.common.QueryRejectCondition
@@ -203,21 +222,31 @@ def __init__(
203222
204223
See :py:meth:`connect` for details on the parameters.
205224
"""
206-
# Iterate over interceptors in reverse building the impl
207-
self._impl: OutboundInterceptor = _ClientImpl(self)
208-
for interceptor in reversed(list(interceptors)):
209-
self._impl = interceptor.intercept_client(self._impl)
210-
211225
# Store the config for tracking
212-
self._config = ClientConfig(
226+
config = ClientConfig(
213227
service_client=service_client,
214228
namespace=namespace,
215229
data_converter=data_converter,
216230
interceptors=interceptors,
217231
default_workflow_query_reject_condition=default_workflow_query_reject_condition,
218232
header_codec_behavior=header_codec_behavior,
233+
plugins=plugins,
219234
)
220235

236+
root_plugin: Plugin = _RootPlugin()
237+
for plugin in reversed(plugins):
238+
root_plugin = plugin.init_client_plugin(root_plugin)
239+
240+
self._init_from_config(root_plugin.configure_client(config))
241+
242+
def _init_from_config(self, config: ClientConfig):
243+
self._config = config
244+
245+
# Iterate over interceptors in reverse building the impl
246+
self._impl: OutboundInterceptor = _ClientImpl(self)
247+
for interceptor in reversed(list(self._config["interceptors"])):
248+
self._impl = interceptor.intercept_client(self._impl)
249+
221250
def config(self) -> ClientConfig:
222251
"""Config, as a dictionary, used to create this client.
223252
@@ -1507,6 +1536,7 @@ class ClientConfig(TypedDict, total=False):
15071536
Optional[temporalio.common.QueryRejectCondition]
15081537
]
15091538
header_codec_behavior: Required[HeaderCodecBehavior]
1539+
plugins: Required[Sequence[Plugin]]
15101540

15111541

15121542
class WorkflowHistoryEventFilterType(IntEnum):
@@ -7356,3 +7386,81 @@ async def _decode_user_metadata(
73567386
if not metadata.HasField("details")
73577387
else (await converter.decode([metadata.details]))[0],
73587388
)
7389+
7390+
7391+
class Plugin(abc.ABC):
7392+
"""Base class for client plugins that can intercept and modify client behavior.
7393+
7394+
Plugins allow customization of client creation and service connection processes
7395+
through a chain of responsibility pattern. Each plugin can modify the client
7396+
configuration or intercept service client connections.
7397+
7398+
If the plugin is also a temporalio.worker.Plugin, it will additionally be propagated as a worker plugin.
7399+
You should likley not also provide it to the worker as that will result in the plugin being applied twice.
7400+
"""
7401+
7402+
def name(self) -> str:
7403+
"""Get the name of this plugin. Can be overridden if desired to provide a more appropriate name.
7404+
7405+
Returns:
7406+
The fully qualified name of the plugin class (module.classname).
7407+
"""
7408+
return type(self).__module__ + "." + type(self).__qualname__
7409+
7410+
def init_client_plugin(self, next: Plugin) -> Plugin:
7411+
"""Initialize this plugin in the plugin chain.
7412+
7413+
This method sets up the chain of responsibility pattern by storing a reference
7414+
to the next plugin in the chain. It is called during client creation to build
7415+
the plugin chain. Note, this may be called twice in the case of :py:meth:`connect`.
7416+
7417+
Args:
7418+
next: The next plugin in the chain to delegate to.
7419+
7420+
Returns:
7421+
This plugin instance for method chaining.
7422+
"""
7423+
self.next_client_plugin = next
7424+
return self
7425+
7426+
def configure_client(self, config: ClientConfig) -> ClientConfig:
7427+
"""Hook called when creating a client to allow modification of configuration.
7428+
7429+
This method is called during client creation and allows plugins to modify
7430+
the client configuration before the client is fully initialized. Plugins
7431+
can add interceptors, modify connection parameters, or change other settings.
7432+
7433+
Args:
7434+
config: The client configuration dictionary to potentially modify.
7435+
7436+
Returns:
7437+
The modified client configuration.
7438+
"""
7439+
return self.next_client_plugin.configure_client(config)
7440+
7441+
async def connect_service_client(
7442+
self, config: temporalio.service.ConnectConfig
7443+
) -> temporalio.service.ServiceClient:
7444+
"""Hook called when connecting to the Temporal service.
7445+
7446+
This method is called during service client connection and allows plugins
7447+
to intercept or modify the connection process. Plugins can modify connection
7448+
parameters, add authentication, or provide custom connection logic.
7449+
7450+
Args:
7451+
config: The service connection configuration.
7452+
7453+
Returns:
7454+
The connected service client.
7455+
"""
7456+
return await self.next_client_plugin.connect_service_client(config)
7457+
7458+
7459+
class _RootPlugin(Plugin):
7460+
def configure_client(self, config: ClientConfig) -> ClientConfig:
7461+
return config
7462+
7463+
async def connect_service_client(
7464+
self, config: temporalio.service.ConnectConfig
7465+
) -> temporalio.service.ServiceClient:
7466+
return await temporalio.service.ServiceClient.connect(config)

temporalio/worker/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
WorkflowSlotInfo,
4545
)
4646
from ._worker import (
47+
Plugin,
4748
PollerBehavior,
4849
PollerBehaviorAutoscaling,
4950
PollerBehaviorSimpleMaximum,
@@ -78,6 +79,7 @@
7879
"ActivityOutboundInterceptor",
7980
"WorkflowInboundInterceptor",
8081
"WorkflowOutboundInterceptor",
82+
"Plugin",
8183
# Interceptor input
8284
"ContinueAsNewInput",
8385
"ExecuteActivityInput",

0 commit comments

Comments
 (0)