@@ -1515,18 +1515,21 @@ import temporalio.service
15151515class AuthenticationPlugin(Plugin):
15161516 def __init__(self, api_key: str):
15171517 self.api_key = api_key
1518+
1519+ def init_client_plugin(self, next: Plugin) -> None:
1520+ self.next_client_plugin = next
15181521
15191522 def configure_client(self, config: ClientConfig) -> ClientConfig:
15201523 # Modify client configuration
15211524 config["namespace"] = "my-secure-namespace"
1522- return super() .configure_client(config)
1525+ return self.next_client_plugin .configure_client(config)
15231526
15241527 async def connect_service_client(
15251528 self, config: temporalio.service.ConnectConfig
15261529 ) -> temporalio.service.ServiceClient:
15271530 # Add authentication to the connection
15281531 config.api_key = self.api_key
1529- return await super() .connect_service_client(config)
1532+ return await self.next_client_plugin .connect_service_client(config)
15301533
15311534# Use the plugin when connecting
15321535client = await Client.connect(
@@ -1538,31 +1541,55 @@ client = await Client.connect(
15381541#### Worker Plugins
15391542
15401543Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
1541- custom lifecycle management, or modifying worker settings.
1544+ custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
1545+ They should do this in the case that they modified the worker in a way which would also need to be present
1546+ for replay to function. For instance, changing the data converter or adding workflows.
15421547
15431548Here's an example of a worker plugin that adds custom monitoring:
15441549
15451550``` python
1546- from temporalio.worker import Plugin, WorkerConfig, Worker
1551+ import temporalio
1552+ from contextlib import asynccontextmanager
1553+ from typing import AsyncIterator
1554+ from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
15471555import logging
15481556
15491557class MonitoringPlugin (Plugin ):
15501558 def __init__ (self ):
15511559 self .logger = logging.getLogger(__name__ )
15521560
1561+ def init_worker_plugin (self , next : Plugin) -> None :
1562+ self .next_worker_plugin = next
1563+
15531564 def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
15541565 # Modify worker configuration
15551566 original_task_queue = config[" task_queue" ]
15561567 config[" task_queue" ] = f " monitored- { original_task_queue} "
15571568 self .logger.info(f " Worker created for task queue: { config[' task_queue' ]} " )
1558- return super () .configure_worker(config)
1569+ return self .next_worker_plugin .configure_worker(config)
15591570
15601571 async def run_worker (self , worker : Worker) -> None :
15611572 self .logger.info(" Starting worker execution" )
15621573 try :
1563- await super () .run_worker(worker)
1574+ await self .next_worker_plugin .run_worker(worker)
15641575 finally :
15651576 self .logger.info(" Worker execution completed" )
1577+
1578+ def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1579+ return self .next_worker_plugin.configure_replayer(config)
1580+
1581+ @asynccontextmanager
1582+ async def run_replayer (
1583+ self ,
1584+ replayer : Replayer,
1585+ histories : AsyncIterator[temporalio.client.WorkflowHistory],
1586+ ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1587+ self .logger.info(" Starting replay execution" )
1588+ try :
1589+ async with self .next_worker_plugin.run_replayer(replayer, histories) as results:
1590+ yield results
1591+ finally :
1592+ self .logger.info(" Replay execution completed" )
15661593
15671594# Use the plugin when creating a worker
15681595worker = Worker(
@@ -1577,38 +1604,63 @@ worker = Worker(
15771604For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
15781605
15791606``` python
1607+ import temporalio
1608+ from contextlib import AbstractAsyncContextManager
1609+ from typing import AsyncIterator
15801610from temporalio.client import Plugin as ClientPlugin, ClientConfig
1581- from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
1611+ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
15821612
15831613
15841614class UnifiedPlugin (ClientPlugin , WorkerPlugin ):
1585- def configure_client (self , config : ClientConfig) -> ClientConfig:
1586- # Client-side customization
1587- config[" namespace" ] = " unified-namespace"
1588- return super ().configure_client(config)
1589-
1590- def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
1591- # Worker-side customization
1592- config[" max_cached_workflows" ] = 500
1593- return super ().configure_worker(config)
1594-
1595- async def run_worker (self , worker : Worker) -> None :
1596- print (" Starting unified worker" )
1597- await super ().run_worker(worker)
1598-
1615+ def init_client_plugin (self , next : ClientPlugin) -> None :
1616+ self .next_client_plugin = next
15991617
1618+ def init_worker_plugin (self , next : WorkerPlugin) -> None :
1619+ self .next_worker_plugin = next
1620+
1621+ def configure_client (self , config : ClientConfig) -> ClientConfig:
1622+ # Client-side customization
1623+ config[" data_converter" ] = pydantic_data_converter
1624+ return self .next_client_plugin.configure_client(config)
1625+
1626+ async def connect_service_client (
1627+ self , config : temporalio.service.ConnectConfig
1628+ ) -> temporalio.service.ServiceClient:
1629+ # Add authentication to the connection
1630+ config.api_key = self .api_key
1631+ return await self .next_client_plugin.connect_service_client(config)
1632+
1633+ def configure_worker (self , config : WorkerConfig) -> WorkerConfig:
1634+ # Worker-side customization
1635+ return self .next_worker_plugin.configure_worker(config)
1636+
1637+ async def run_worker (self , worker : Worker) -> None :
1638+ print (" Starting unified worker" )
1639+ await self .next_worker_plugin.run_worker(worker)
1640+
1641+ def configure_replayer (self , config : ReplayerConfig) -> ReplayerConfig:
1642+ config[" data_converter" ] = pydantic_data_converter
1643+ return config
1644+
1645+ async def run_replayer (
1646+ self ,
1647+ replayer : Replayer,
1648+ histories : AsyncIterator[temporalio.client.WorkflowHistory],
1649+ ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1650+ return self .next_worker_plugin.run_replayer(replayer, histories)
1651+
16001652# Create client with the unified plugin
16011653client = await Client.connect(
1602- " localhost:7233" ,
1603- plugins = [UnifiedPlugin()]
1654+ " localhost:7233" ,
1655+ plugins = [UnifiedPlugin()]
16041656)
16051657
16061658# Worker will automatically inherit the plugin from the client
16071659worker = Worker(
1608- client,
1609- task_queue = " my-task-queue" ,
1610- workflows = [MyWorkflow],
1611- activities = [my_activity]
1660+ client,
1661+ task_queue = " my-task-queue" ,
1662+ workflows = [MyWorkflow],
1663+ activities = [my_activity]
16121664)
16131665```
16141666
@@ -1617,7 +1669,7 @@ worker = Worker(
16171669- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
16181670- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
16191671- Avoid providing the same plugin to both client and worker to prevent double execution
1620- - Plugin methods should call ` super() ` to maintain the plugin chain
1672+ - Plugin methods should call the plugin provided during initialization to maintain the plugin chain
16211673- Each plugin's ` name() ` method returns a unique identifier for debugging purposes
16221674
16231675
0 commit comments