Skip to content

Commit 71ed93c

Browse files
💥 Replayer configuration from plugins (#1011)
* POC for replayer configuration from existing plugins * Handle non-combined cases * Fixing type checking * Move shared configuration into plugin definition * Moving replay configuration to worker plugin, add replay execution hook * Readme and some cleanup * Remove debug logging * Make plugin interface experimental and abstract * Update readme * Update readme * Readme fixes, move plugin to its own file * Fix readme * Update temporalio/worker/_plugin.py Co-authored-by: Spencer Judge <[email protected]> * Rename run_replayer, remove need to return self from init * Docstring change --------- Co-authored-by: Spencer Judge <[email protected]>
1 parent b59c555 commit 71ed93c

File tree

9 files changed

+430
-153
lines changed

9 files changed

+430
-153
lines changed

README.md

Lines changed: 80 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1515,18 +1515,21 @@ import temporalio.service
15151515
class AuthenticationPlugin(Plugin):
15161516
def __init__(self, api_key: str):
15171517
self.api_key = api_key
1518+
1519+
def init_client_plugin(self, next: Plugin) -> None:
1520+
self.next_client_plugin = next
15181521
15191522
def configure_client(self, config: ClientConfig) -> ClientConfig:
15201523
# Modify client configuration
15211524
config["namespace"] = "my-secure-namespace"
1522-
return super().configure_client(config)
1525+
return self.next_client_plugin.configure_client(config)
15231526
15241527
async def connect_service_client(
15251528
self, config: temporalio.service.ConnectConfig
15261529
) -> temporalio.service.ServiceClient:
15271530
# Add authentication to the connection
15281531
config.api_key = self.api_key
1529-
return await super().connect_service_client(config)
1532+
return await self.next_client_plugin.connect_service_client(config)
15301533
15311534
# Use the plugin when connecting
15321535
client = await Client.connect(
@@ -1538,31 +1541,55 @@ client = await Client.connect(
15381541
#### Worker Plugins
15391542

15401543
Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
1541-
custom lifecycle management, or modifying worker settings.
1544+
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
1545+
They should do this in the case that they modified the worker in a way which would also need to be present
1546+
for replay to function. For instance, changing the data converter or adding workflows.
15421547

15431548
Here's an example of a worker plugin that adds custom monitoring:
15441549

15451550
```python
1546-
from temporalio.worker import Plugin, WorkerConfig, Worker
1551+
import temporalio
1552+
from contextlib import asynccontextmanager
1553+
from typing import AsyncIterator
1554+
from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
15471555
import logging
15481556

15491557
class MonitoringPlugin(Plugin):
15501558
def __init__(self):
15511559
self.logger = logging.getLogger(__name__)
15521560

1561+
def init_worker_plugin(self, next: Plugin) -> None:
1562+
self.next_worker_plugin = next
1563+
15531564
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
15541565
# Modify worker configuration
15551566
original_task_queue = config["task_queue"]
15561567
config["task_queue"] = f"monitored-{original_task_queue}"
15571568
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
1558-
return super().configure_worker(config)
1569+
return self.next_worker_plugin.configure_worker(config)
15591570

15601571
async def run_worker(self, worker: Worker) -> None:
15611572
self.logger.info("Starting worker execution")
15621573
try:
1563-
await super().run_worker(worker)
1574+
await self.next_worker_plugin.run_worker(worker)
15641575
finally:
15651576
self.logger.info("Worker execution completed")
1577+
1578+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1579+
return self.next_worker_plugin.configure_replayer(config)
1580+
1581+
@asynccontextmanager
1582+
async def run_replayer(
1583+
self,
1584+
replayer: Replayer,
1585+
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1586+
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1587+
self.logger.info("Starting replay execution")
1588+
try:
1589+
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
1590+
yield results
1591+
finally:
1592+
self.logger.info("Replay execution completed")
15661593

15671594
# Use the plugin when creating a worker
15681595
worker = Worker(
@@ -1577,38 +1604,63 @@ worker = Worker(
15771604
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
15781605

15791606
```python
1607+
import temporalio
1608+
from contextlib import AbstractAsyncContextManager
1609+
from typing import AsyncIterator
15801610
from temporalio.client import Plugin as ClientPlugin, ClientConfig
1581-
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
1611+
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
15821612

15831613

15841614
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1585-
def configure_client(self, config: ClientConfig) -> ClientConfig:
1586-
# Client-side customization
1587-
config["namespace"] = "unified-namespace"
1588-
return super().configure_client(config)
1589-
1590-
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1591-
# Worker-side customization
1592-
config["max_cached_workflows"] = 500
1593-
return super().configure_worker(config)
1594-
1595-
async def run_worker(self, worker: Worker) -> None:
1596-
print("Starting unified worker")
1597-
await super().run_worker(worker)
1598-
1615+
def init_client_plugin(self, next: ClientPlugin) -> None:
1616+
self.next_client_plugin = next
15991617

1618+
def init_worker_plugin(self, next: WorkerPlugin) -> None:
1619+
self.next_worker_plugin = next
1620+
1621+
def configure_client(self, config: ClientConfig) -> ClientConfig:
1622+
# Client-side customization
1623+
config["data_converter"] = pydantic_data_converter
1624+
return self.next_client_plugin.configure_client(config)
1625+
1626+
async def connect_service_client(
1627+
self, config: temporalio.service.ConnectConfig
1628+
) -> temporalio.service.ServiceClient:
1629+
# Add authentication to the connection
1630+
config.api_key = self.api_key
1631+
return await self.next_client_plugin.connect_service_client(config)
1632+
1633+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1634+
# Worker-side customization
1635+
return self.next_worker_plugin.configure_worker(config)
1636+
1637+
async def run_worker(self, worker: Worker) -> None:
1638+
print("Starting unified worker")
1639+
await self.next_worker_plugin.run_worker(worker)
1640+
1641+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1642+
config["data_converter"] = pydantic_data_converter
1643+
return config
1644+
1645+
async def run_replayer(
1646+
self,
1647+
replayer: Replayer,
1648+
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1649+
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1650+
return self.next_worker_plugin.run_replayer(replayer, histories)
1651+
16001652
# Create client with the unified plugin
16011653
client = await Client.connect(
1602-
"localhost:7233",
1603-
plugins=[UnifiedPlugin()]
1654+
"localhost:7233",
1655+
plugins=[UnifiedPlugin()]
16041656
)
16051657

16061658
# Worker will automatically inherit the plugin from the client
16071659
worker = Worker(
1608-
client,
1609-
task_queue="my-task-queue",
1610-
workflows=[MyWorkflow],
1611-
activities=[my_activity]
1660+
client,
1661+
task_queue="my-task-queue",
1662+
workflows=[MyWorkflow],
1663+
activities=[my_activity]
16121664
)
16131665
```
16141666

@@ -1617,7 +1669,7 @@ worker = Worker(
16171669
- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
16181670
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
16191671
- Avoid providing the same plugin to both client and worker to prevent double execution
1620-
- Plugin methods should call `super()` to maintain the plugin chain
1672+
- Plugin methods should call the plugin provided during initialization to maintain the plugin chain
16211673
- Each plugin's `name()` method returns a unique identifier for debugging purposes
16221674

16231675

temporalio/client.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ async def connect(
191191

192192
root_plugin: Plugin = _RootPlugin()
193193
for plugin in reversed(plugins):
194-
root_plugin = plugin.init_client_plugin(root_plugin)
194+
plugin.init_client_plugin(root_plugin)
195+
root_plugin = plugin
195196

196197
service_client = await root_plugin.connect_service_client(connect_config)
197198

@@ -235,7 +236,8 @@ def __init__(
235236

236237
root_plugin: Plugin = _RootPlugin()
237238
for plugin in reversed(plugins):
238-
root_plugin = plugin.init_client_plugin(root_plugin)
239+
plugin.init_client_plugin(root_plugin)
240+
root_plugin = plugin
239241

240242
self._init_from_config(root_plugin.configure_client(config))
241243

@@ -7398,22 +7400,21 @@ def name(self) -> str:
73987400
"""
73997401
return type(self).__module__ + "." + type(self).__qualname__
74007402

7401-
def init_client_plugin(self, next: Plugin) -> Plugin:
7403+
@abstractmethod
7404+
def init_client_plugin(self, next: Plugin) -> None:
74027405
"""Initialize this plugin in the plugin chain.
74037406
7404-
This method sets up the chain of responsibility pattern by storing a reference
7407+
This method sets up the chain of responsibility pattern by providing a reference
74057408
to the next plugin in the chain. It is called during client creation to build
74067409
the plugin chain. Note, this may be called twice in the case of :py:meth:`connect`.
7410+
Implementations should store this reference and call the corresponding method
7411+
of the next plugin on method calls.
74077412
74087413
Args:
74097414
next: The next plugin in the chain to delegate to.
7410-
7411-
Returns:
7412-
This plugin instance for method chaining.
74137415
"""
7414-
self.next_client_plugin = next
7415-
return self
74167416

7417+
@abstractmethod
74177418
def configure_client(self, config: ClientConfig) -> ClientConfig:
74187419
"""Hook called when creating a client to allow modification of configuration.
74197420
@@ -7427,8 +7428,8 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
74277428
Returns:
74287429
The modified client configuration.
74297430
"""
7430-
return self.next_client_plugin.configure_client(config)
74317431

7432+
@abstractmethod
74327433
async def connect_service_client(
74337434
self, config: temporalio.service.ConnectConfig
74347435
) -> temporalio.service.ServiceClient:
@@ -7444,10 +7445,12 @@ async def connect_service_client(
74447445
Returns:
74457446
The connected service client.
74467447
"""
7447-
return await self.next_client_plugin.connect_service_client(config)
74487448

74497449

74507450
class _RootPlugin(Plugin):
7451+
def init_client_plugin(self, next: Plugin) -> None:
7452+
raise NotImplementedError()
7453+
74517454
def configure_client(self, config: ClientConfig) -> ClientConfig:
74527455
return config
74537456

temporalio/contrib/openai_agents/_temporal_openai_agents.py

Lines changed: 49 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Initialize Temporal OpenAI Agents overrides."""
22

3-
from contextlib import contextmanager
3+
from contextlib import asynccontextmanager, contextmanager
44
from datetime import timedelta
55
from typing import AsyncIterator, Callable, Optional, Union
66

@@ -24,7 +24,7 @@
2424

2525
import temporalio.client
2626
import temporalio.worker
27-
from temporalio.client import ClientConfig
27+
from temporalio.client import ClientConfig, Plugin
2828
from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
2929
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
3030
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
@@ -41,7 +41,13 @@
4141
from temporalio.converter import (
4242
DataConverter,
4343
)
44-
from temporalio.worker import Worker, WorkerConfig
44+
from temporalio.worker import (
45+
Replayer,
46+
ReplayerConfig,
47+
Worker,
48+
WorkerConfig,
49+
WorkflowReplayResult,
50+
)
4551

4652

4753
@contextmanager
@@ -231,6 +237,20 @@ def __init__(
231237
self._model_params = model_params
232238
self._model_provider = model_provider
233239

240+
def init_client_plugin(self, next: temporalio.client.Plugin) -> None:
241+
"""Set the next client plugin"""
242+
self.next_client_plugin = next
243+
244+
async def connect_service_client(
245+
self, config: temporalio.service.ConnectConfig
246+
) -> temporalio.service.ServiceClient:
247+
"""No modifications to service client"""
248+
return await self.next_client_plugin.connect_service_client(config)
249+
250+
def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None:
251+
"""Set the next worker plugin"""
252+
self.next_worker_plugin = next
253+
234254
def configure_client(self, config: ClientConfig) -> ClientConfig:
235255
"""Configure the Temporal client for OpenAI agents integration.
236256
@@ -246,7 +266,7 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
246266
config["data_converter"] = DataConverter(
247267
payload_converter_class=_OpenAIPayloadConverter
248268
)
249-
return super().configure_client(config)
269+
return self.next_client_plugin.configure_client(config)
250270

251271
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
252272
"""Configure the Temporal worker for OpenAI agents integration.
@@ -268,7 +288,7 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
268288
config["activities"] = list(config.get("activities") or []) + [
269289
ModelActivity(self._model_provider).invoke_model_activity
270290
]
271-
return super().configure_worker(config)
291+
return self.next_worker_plugin.configure_worker(config)
272292

273293
async def run_worker(self, worker: Worker) -> None:
274294
"""Run the worker with OpenAI agents temporal overrides.
@@ -281,4 +301,27 @@ async def run_worker(self, worker: Worker) -> None:
281301
worker: The worker instance to run.
282302
"""
283303
with set_open_ai_agent_temporal_overrides(self._model_params):
284-
await super().run_worker(worker)
304+
await self.next_worker_plugin.run_worker(worker)
305+
306+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
307+
"""Configure the replayer for OpenAI Agents."""
308+
config["interceptors"] = list(config.get("interceptors") or []) + [
309+
OpenAIAgentsTracingInterceptor()
310+
]
311+
config["data_converter"] = DataConverter(
312+
payload_converter_class=_OpenAIPayloadConverter
313+
)
314+
return config
315+
316+
@asynccontextmanager
317+
async def run_replayer(
318+
self,
319+
replayer: Replayer,
320+
histories: AsyncIterator[temporalio.client.WorkflowHistory],
321+
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
322+
"""Set the OpenAI Overrides during replay"""
323+
with set_open_ai_agent_temporal_overrides(self._model_params):
324+
async with self.next_worker_plugin.run_replayer(
325+
replayer, histories
326+
) as results:
327+
yield results

temporalio/worker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
WorkflowInterceptorClassInput,
2222
WorkflowOutboundInterceptor,
2323
)
24+
from ._plugin import Plugin
2425
from ._replayer import (
2526
Replayer,
2627
ReplayerConfig,
@@ -44,7 +45,6 @@
4445
WorkflowSlotInfo,
4546
)
4647
from ._worker import (
47-
Plugin,
4848
PollerBehavior,
4949
PollerBehaviorAutoscaling,
5050
PollerBehaviorSimpleMaximum,

0 commit comments

Comments
 (0)