Skip to content

Commit cd2d12f

Browse files
committed
Readme fixes, move plugin to its own file
1 parent 91c229f commit cd2d12f

File tree

7 files changed

+181
-176
lines changed

7 files changed

+181
-176
lines changed

README.md

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,19 +1578,20 @@ class MonitoringPlugin(Plugin):
15781578
self.logger.info("Worker execution completed")
15791579

15801580
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1581-
config["data_converter"] = pydantic_data_converter
1582-
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
1583-
return config
1581+
return self.next_worker_plugin.configure_replayer(config)
15841582

15851583
@asynccontextmanager
15861584
async def workflow_replay(
15871585
self,
15881586
replayer: Replayer,
15891587
histories: AsyncIterator[temporalio.client.WorkflowHistory],
15901588
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1591-
with set_some_context():
1592-
async with super().workflow_replay(replayer, histories) as results:
1589+
self.logger.info("Starting replay execution")
1590+
try:
1591+
async with self.next_worker_plugin.workflow_replay(replayer, histories) as results:
15931592
yield results
1593+
finally:
1594+
self.logger.info("Replay execution completed")
15941595

15951596
# Use the plugin when creating a worker
15961597
worker = Worker(
@@ -1606,69 +1607,64 @@ For plugins that need to work with both clients and workers, you can implement b
16061607

16071608
```python
16081609
import temporalio
1609-
from contextlib import asynccontextmanager
1610+
from contextlib import AbstractAsyncContextManager
16101611
from typing import AsyncIterator
16111612
from temporalio.client import Plugin as ClientPlugin, ClientConfig
16121613
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
16131614

16141615

16151616
class UnifiedPlugin(ClientPlugin, WorkerPlugin):
1616-
def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin:
1617-
self.next_client_plugin = next
1618-
return self
1617+
def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin:
1618+
self.next_client_plugin = next
1619+
return self
16191620

1620-
def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin:
1621-
self.next_worker_plugin = next
1622-
return self
1621+
def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin:
1622+
self.next_worker_plugin = next
1623+
return self
16231624

1624-
def configure_client(self, config: ClientConfig) -> ClientConfig:
1625-
# Client-side customization
1626-
config["namespace"] = "unified-namespace"
1627-
return super().configure_client(config)
1628-
1629-
async def connect_service_client(
1630-
self, config: temporalio.service.ConnectConfig
1631-
) -> temporalio.service.ServiceClient:
1632-
# Add authentication to the connection
1633-
config.api_key = self.api_key
1634-
return await self.next_client_plugin.connect_service_client(config)
1635-
1636-
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1637-
# Worker-side customization
1638-
config["max_cached_workflows"] = 500
1639-
return super().configure_worker(config)
1640-
1641-
async def run_worker(self, worker: Worker) -> None:
1642-
print("Starting unified worker")
1643-
await super().run_worker(worker)
1644-
1645-
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1646-
config["data_converter"] = pydantic_data_converter
1647-
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
1648-
return config
1625+
def configure_client(self, config: ClientConfig) -> ClientConfig:
1626+
# Client-side customization
1627+
config["data_converter"] = pydantic_data_converter
1628+
return super().configure_client(config)
16491629

1650-
@asynccontextmanager
1651-
async def workflow_replay(
1652-
self,
1653-
replayer: Replayer,
1654-
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1655-
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
1656-
with set_some_context():
1657-
async with super().workflow_replay(replayer, histories) as results:
1658-
yield results
1630+
async def connect_service_client(
1631+
self, config: temporalio.service.ConnectConfig
1632+
) -> temporalio.service.ServiceClient:
1633+
# Add authentication to the connection
1634+
config.api_key = self.api_key
1635+
return await self.next_client_plugin.connect_service_client(config)
1636+
1637+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
1638+
# Worker-side customization
1639+
return super().configure_worker(config)
1640+
1641+
async def run_worker(self, worker: Worker) -> None:
1642+
print("Starting unified worker")
1643+
await super().run_worker(worker)
1644+
1645+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
1646+
config["data_converter"] = pydantic_data_converter
1647+
return config
1648+
1649+
async def workflow_replay(
1650+
self,
1651+
replayer: Replayer,
1652+
histories: AsyncIterator[temporalio.client.WorkflowHistory],
1653+
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
1654+
return self.next_worker_plugin.workflow_replay(replayer, histories)
16591655

16601656
# Create client with the unified plugin
16611657
client = await Client.connect(
1662-
"localhost:7233",
1663-
plugins=[UnifiedPlugin()]
1658+
"localhost:7233",
1659+
plugins=[UnifiedPlugin()]
16641660
)
16651661

16661662
# Worker will automatically inherit the plugin from the client
16671663
worker = Worker(
1668-
client,
1669-
task_queue="my-task-queue",
1670-
workflows=[MyWorkflow],
1671-
activities=[my_activity]
1664+
client,
1665+
task_queue="my-task-queue",
1666+
workflows=[MyWorkflow],
1667+
activities=[my_activity]
16721668
)
16731669
```
16741670

temporalio/worker/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
WorkflowInterceptorClassInput,
2222
WorkflowOutboundInterceptor,
2323
)
24+
from ._plugin import Plugin
2425
from ._replayer import (
2526
Replayer,
2627
ReplayerConfig,
@@ -44,7 +45,6 @@
4445
WorkflowSlotInfo,
4546
)
4647
from ._worker import (
47-
Plugin,
4848
PollerBehavior,
4949
PollerBehaviorAutoscaling,
5050
PollerBehaviorSimpleMaximum,

temporalio/worker/_plugin.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
from __future__ import annotations
2+
3+
import abc
4+
from contextlib import AbstractAsyncContextManager
5+
from typing import TYPE_CHECKING, AsyncIterator
6+
7+
from temporalio.client import WorkflowHistory
8+
9+
if TYPE_CHECKING:
10+
from temporalio.worker import (
11+
Replayer,
12+
ReplayerConfig,
13+
Worker,
14+
WorkerConfig,
15+
WorkflowReplayResult,
16+
)
17+
18+
19+
class Plugin(abc.ABC):
20+
"""Base class for worker plugins that can intercept and modify worker behavior.
21+
22+
Plugins allow customization of worker creation and execution processes
23+
through a chain of responsibility pattern. Each plugin can modify the worker
24+
configuration or intercept worker execution.
25+
26+
WARNING: This is an experimental feature and may change in the future.
27+
"""
28+
29+
def name(self) -> str:
30+
"""Get the qualified name of this plugin. Can be overridden if desired to provide a more appropriate name.
31+
32+
Returns:
33+
The fully qualified name of the plugin class (module.classname).
34+
"""
35+
return type(self).__module__ + "." + type(self).__qualname__
36+
37+
@abc.abstractmethod
38+
def init_worker_plugin(self, next: Plugin) -> Plugin:
39+
"""Initialize this plugin in the plugin chain.
40+
41+
This method sets up the chain of responsibility pattern by storing a reference
42+
to the next plugin in the chain. It is called during worker creation to build
43+
the plugin chain.
44+
45+
Args:
46+
next: The next plugin in the chain to delegate to.
47+
48+
Returns:
49+
This plugin instance for method chaining.
50+
"""
51+
52+
@abc.abstractmethod
53+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
54+
"""Hook called when creating a worker to allow modification of configuration.
55+
56+
This method is called during worker creation and allows plugins to modify
57+
the worker configuration before the worker is fully initialized. Plugins
58+
can modify task queue names, adjust concurrency settings, add interceptors,
59+
or change other worker settings.
60+
61+
Args:
62+
config: The worker configuration dictionary to potentially modify.
63+
64+
Returns:
65+
The modified worker configuration.
66+
"""
67+
68+
@abc.abstractmethod
69+
async def run_worker(self, worker: Worker) -> None:
70+
"""Hook called when running a worker to allow interception of execution.
71+
72+
This method is called when the worker is started and allows plugins to
73+
intercept or wrap the worker execution. Plugins can add monitoring,
74+
custom lifecycle management, or other execution-time behavior.
75+
76+
Args:
77+
worker: The worker instance to run.
78+
"""
79+
80+
@abc.abstractmethod
81+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
82+
"""Hook called when creating a replayer to allow modification of configuration.
83+
84+
This should be used to configure anything in ReplayerConfig needed to make execution match
85+
the original. This could include interceptors, DataConverter, workflows, and more.
86+
87+
Args:
88+
config: The replayer configuration dictionary to potentially modify.
89+
90+
Returns:
91+
The modified replayer configuration.
92+
"""
93+
94+
@abc.abstractmethod
95+
def workflow_replay(
96+
self,
97+
replayer: Replayer,
98+
histories: AsyncIterator[WorkflowHistory],
99+
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
100+
"""Hook called when running a replayer to allow interception of execution."""
101+
102+
103+
class _RootPlugin(Plugin):
104+
def init_worker_plugin(self, next: Plugin) -> Plugin:
105+
raise NotImplementedError()
106+
107+
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
108+
return config
109+
110+
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
111+
return config
112+
113+
async def run_worker(self, worker: Worker) -> None:
114+
await worker._run()
115+
116+
def workflow_replay(
117+
self,
118+
replayer: Replayer,
119+
histories: AsyncIterator[WorkflowHistory],
120+
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
121+
return replayer._workflow_replay_iterator(histories)

temporalio/worker/_replayer.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
from ..common import HeaderCodecBehavior
2323
from ._interceptor import Interceptor
24+
from ._plugin import _RootPlugin
25+
from ._worker import load_default_build_id
2426
from ._workflow import _WorkflowWorker
2527
from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner
2628
from .workflow_sandbox import SandboxedWorkflowRunner
@@ -81,8 +83,6 @@ def __init__(
8183
header_codec_behavior=header_codec_behavior,
8284
)
8385

84-
from ._worker import _RootPlugin
85-
8686
# Apply plugin configuration
8787
root_plugin: temporalio.worker.Plugin = _RootPlugin()
8888
for plugin in reversed(plugins):
@@ -240,8 +240,6 @@ def on_eviction_hook(
240240
!= HeaderCodecBehavior.NO_CODEC,
241241
)
242242
# Create bridge worker
243-
from ._worker import load_default_build_id
244-
245243
bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay(
246244
runtime._core_runtime,
247245
temporalio.bridge.worker.WorkerConfig(

0 commit comments

Comments
 (0)