From 1741a6e9c8bd88526aaee4c8fb5cc2a0dad65e52 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 30 Jul 2025 08:41:46 -0700 Subject: [PATCH 01/15] POC for replayer configuration from existing plugins --- temporalio/worker/_replayer.py | 27 +++++++++++++++++---- tests/test_plugins.py | 43 ++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 6e9761b58..b17aed847 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -7,7 +7,7 @@ import logging from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type +from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, Union from typing_extensions import TypedDict @@ -19,9 +19,11 @@ import temporalio.runtime import temporalio.workflow + from ..common import HeaderCodecBehavior from ._interceptor import Interceptor -from ._worker import load_default_build_id +from ._worker import load_default_build_id, WorkerConfig +from temporalio.client import ClientConfig from ._workflow import _WorkflowWorker from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner from .workflow_sandbox import SandboxedWorkflowRunner @@ -42,6 +44,7 @@ def __init__( namespace: str = "ReplayNamespace", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[Interceptor] = [], + plugins: Sequence[Union[temporalio.worker.Plugin, temporalio.client.Plugin]] = [], build_id: Optional[str] = None, identity: Optional[str] = None, workflow_failure_exception_types: Sequence[Type[BaseException]] = [], @@ -62,8 +65,6 @@ def __init__( will be shared across all replay calls and never explicitly shut down. Users are encouraged to provide their own if needing more control. """ - if not workflows: - raise ValueError("At least one workflow must be specified") self._config = ReplayerConfig( workflows=list(workflows), workflow_task_executor=( @@ -82,6 +83,24 @@ def __init__( disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, ) + root_worker_plugin: temporalio.worker.Plugin = temporalio.worker._worker._RootPlugin() + root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() + for plugin in reversed(plugins): + root_worker_plugin = plugin.init_worker_plugin(root_worker_plugin) + root_client_plugin = plugin.init_client_plugin(root_client_plugin) + + # Allow plugins to configure shared configurations with worker + worker_config = WorkerConfig(**{k: v for k, v in self._config.items() if k in WorkerConfig.__annotations__}) + worker_config = root_worker_plugin.configure_worker(worker_config) + self._config.update({k: v for k, v in worker_config.items() if k in ReplayerConfig.__annotations__}) + + # Allow plugins to configure shared configurations with client + client_config = ClientConfig(**{k: v for k, v in self._config.items() if k in ClientConfig.__annotations__}) + client_config = root_client_plugin.configure_client(client_config) + self._config.update({k: v for k, v in client_config.items() if k in ReplayerConfig.__annotations__}) + + if not self._config["workflows"]: + raise ValueError("At least one workflow must be specified") def config(self) -> ReplayerConfig: """Config, as a dictionary, used to create this replayer. diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 4a60bba4d..42c221f26 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,4 +1,5 @@ import dataclasses +import uuid import warnings from typing import cast @@ -6,11 +7,15 @@ import temporalio.client import temporalio.worker +from temporalio import workflow from temporalio.client import Client, ClientConfig, OutboundInterceptor +from temporalio.contrib.pydantic import pydantic_data_converter from temporalio.testing import WorkflowEnvironment from temporalio.worker import Worker, WorkerConfig from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from tests.worker.test_worker import never_run_activity +from temporalio.worker import Replayer +from tests.helpers import new_worker class TestClientInterceptor(temporalio.client.Interceptor): @@ -136,3 +141,41 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: SandboxedWorkflowRunner, worker.config().get("workflow_runner") ).restrictions.passthrough_modules ) + +class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + config["workflows"] = list(config["workflows"]) + [HelloWorkflow] + return super().configure_worker(config) + + def configure_client(self, config: ClientConfig) -> ClientConfig: + config["data_converter"] = pydantic_data_converter + return super().configure_client(config) + +@workflow.defn +class HelloWorkflow: + @workflow.run + async def run(self, name: str) -> str: + return f"Hello, {name}!" + +async def test_replay(client: Client) -> None: + plugin = ReplayCheckPlugin() + new_config = client.config() + new_config["plugins"] = [plugin] + client = Client(**new_config) + + async with new_worker(client) as worker: + handle = await client.start_workflow( + HelloWorkflow.run, + "Tim", + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.result() + replayer = Replayer( + workflows=[], + plugins=[plugin] + ) + assert len(replayer.config()["workflows"])==1 + assert replayer.config()["data_converter"] == pydantic_data_converter + + await replayer.replay_workflow(await handle.fetch_history()) From 3c92c40b4b416b0ab4809312083522fb08ba691a Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 30 Jul 2025 09:22:25 -0700 Subject: [PATCH 02/15] Handle non-combined cases --- temporalio/worker/_replayer.py | 11 +++++++---- tests/test_plugins.py | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index b17aed847..bd9ac06d1 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -83,18 +83,21 @@ def __init__( disable_safe_workflow_eviction=disable_safe_workflow_eviction, header_codec_behavior=header_codec_behavior, ) + + # Allow plugins to configure shared configurations with worker root_worker_plugin: temporalio.worker.Plugin = temporalio.worker._worker._RootPlugin() - root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() - for plugin in reversed(plugins): + for plugin in reversed([plugin for plugin in plugins if isinstance(plugin, temporalio.worker.Plugin)]): root_worker_plugin = plugin.init_worker_plugin(root_worker_plugin) - root_client_plugin = plugin.init_client_plugin(root_client_plugin) - # Allow plugins to configure shared configurations with worker worker_config = WorkerConfig(**{k: v for k, v in self._config.items() if k in WorkerConfig.__annotations__}) worker_config = root_worker_plugin.configure_worker(worker_config) self._config.update({k: v for k, v in worker_config.items() if k in ReplayerConfig.__annotations__}) # Allow plugins to configure shared configurations with client + root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() + for plugin in reversed([plugin for plugin in plugins if isinstance(plugin, temporalio.client.Plugin)]): + root_client_plugin = plugin.init_client_plugin(root_client_plugin) + client_config = ClientConfig(**{k: v for k, v in self._config.items() if k in ClientConfig.__annotations__}) client_config = root_client_plugin.configure_client(client_config) self._config.update({k: v for k, v in client_config.items() if k in ReplayerConfig.__annotations__}) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 42c221f26..0d62b15b6 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -179,3 +179,20 @@ async def test_replay(client: Client) -> None: assert replayer.config()["data_converter"] == pydantic_data_converter await replayer.replay_workflow(await handle.fetch_history()) + + replayer = Replayer( + workflows=[HelloWorkflow], + plugins=[MyClientPlugin()] + ) + replayer = Replayer( + workflows=[HelloWorkflow], + plugins=[MyWorkerPlugin()] + ) + replayer = Replayer( + workflows=[HelloWorkflow], + plugins=[MyClientPlugin(), MyWorkerPlugin()] + ) + replayer = Replayer( + workflows=[HelloWorkflow], + plugins=[MyWorkerPlugin(), MyClientPlugin(), MyCombinedPlugin()] + ) \ No newline at end of file From 76cacf666350cbb5bf90d002fbb4e49bdc21a209 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 30 Jul 2025 09:58:12 -0700 Subject: [PATCH 03/15] Fixing type checking --- temporalio/worker/_replayer.py | 65 +++++++++++++++++++++++++++------- tests/test_plugins.py | 34 +++++++----------- 2 files changed, 65 insertions(+), 34 deletions(-) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index bd9ac06d1..0e2045e04 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -7,7 +7,7 @@ import logging from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, Union +from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, Union, cast from typing_extensions import TypedDict @@ -18,12 +18,11 @@ import temporalio.converter import temporalio.runtime import temporalio.workflow - +from temporalio.client import ClientConfig from ..common import HeaderCodecBehavior from ._interceptor import Interceptor -from ._worker import load_default_build_id, WorkerConfig -from temporalio.client import ClientConfig +from ._worker import WorkerConfig, load_default_build_id from ._workflow import _WorkflowWorker from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner from .workflow_sandbox import SandboxedWorkflowRunner @@ -44,7 +43,9 @@ def __init__( namespace: str = "ReplayNamespace", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[Interceptor] = [], - plugins: Sequence[Union[temporalio.worker.Plugin, temporalio.client.Plugin]] = [], + plugins: Sequence[ + Union[temporalio.worker.Plugin, temporalio.client.Plugin] + ] = [], build_id: Optional[str] = None, identity: Optional[str] = None, workflow_failure_exception_types: Sequence[Type[BaseException]] = [], @@ -86,21 +87,59 @@ def __init__( # Allow plugins to configure shared configurations with worker root_worker_plugin: temporalio.worker.Plugin = temporalio.worker._worker._RootPlugin() - for plugin in reversed([plugin for plugin in plugins if isinstance(plugin, temporalio.worker.Plugin)]): + for plugin in reversed( + [ + plugin + for plugin in plugins + if isinstance(plugin, temporalio.worker.Plugin) + ] + ): root_worker_plugin = plugin.init_worker_plugin(root_worker_plugin) - worker_config = WorkerConfig(**{k: v for k, v in self._config.items() if k in WorkerConfig.__annotations__}) + worker_config = cast( + WorkerConfig, + { + k: v + for k, v in self._config.items() + if k in WorkerConfig.__annotations__ + }, + ) + worker_config = root_worker_plugin.configure_worker(worker_config) - self._config.update({k: v for k, v in worker_config.items() if k in ReplayerConfig.__annotations__}) + self._config.update( + cast(ReplayerConfig, { + k: v + for k, v in worker_config.items() + if k in ReplayerConfig.__annotations__ + }) + ) # Allow plugins to configure shared configurations with client root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() - for plugin in reversed([plugin for plugin in plugins if isinstance(plugin, temporalio.client.Plugin)]): - root_client_plugin = plugin.init_client_plugin(root_client_plugin) - - client_config = ClientConfig(**{k: v for k, v in self._config.items() if k in ClientConfig.__annotations__}) + for client_plugin in reversed( + [ + plugin + for plugin in plugins + if isinstance(plugin, temporalio.client.Plugin) + ] + ): + root_client_plugin = client_plugin.init_client_plugin(root_client_plugin) + + client_config = cast(ClientConfig, + { + k: v + for k, v in self._config.items() + if k in ClientConfig.__annotations__ + } + ) client_config = root_client_plugin.configure_client(client_config) - self._config.update({k: v for k, v in client_config.items() if k in ReplayerConfig.__annotations__}) + self._config.update( + cast(ReplayerConfig, { + k: v + for k, v in client_config.items() + if k in ReplayerConfig.__annotations__ + }) + ) if not self._config["workflows"]: raise ValueError("At least one workflow must be specified") diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 0d62b15b6..a4ccd9244 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -11,11 +11,10 @@ from temporalio.client import Client, ClientConfig, OutboundInterceptor from temporalio.contrib.pydantic import pydantic_data_converter from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Worker, WorkerConfig +from temporalio.worker import Replayer, Worker, WorkerConfig from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner -from tests.worker.test_worker import never_run_activity -from temporalio.worker import Replayer from tests.helpers import new_worker +from tests.worker.test_worker import never_run_activity class TestClientInterceptor(temporalio.client.Interceptor): @@ -142,21 +141,24 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: ).restrictions.passthrough_modules ) + class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - config["workflows"] = list(config["workflows"]) + [HelloWorkflow] + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] return super().configure_worker(config) def configure_client(self, config: ClientConfig) -> ClientConfig: config["data_converter"] = pydantic_data_converter return super().configure_client(config) + @workflow.defn class HelloWorkflow: @workflow.run async def run(self, name: str) -> str: return f"Hello, {name}!" + async def test_replay(client: Client) -> None: plugin = ReplayCheckPlugin() new_config = client.config() @@ -171,28 +173,18 @@ async def test_replay(client: Client) -> None: task_queue=worker.task_queue, ) await handle.result() - replayer = Replayer( - workflows=[], - plugins=[plugin] - ) - assert len(replayer.config()["workflows"])==1 - assert replayer.config()["data_converter"] == pydantic_data_converter + replayer = Replayer(workflows=[], plugins=[plugin]) + assert len(replayer.config().get("workflows") or []) == 1 + assert replayer.config().get("data_converter") == pydantic_data_converter await replayer.replay_workflow(await handle.fetch_history()) + replayer = Replayer(workflows=[HelloWorkflow], plugins=[MyClientPlugin()]) + replayer = Replayer(workflows=[HelloWorkflow], plugins=[MyWorkerPlugin()]) replayer = Replayer( - workflows=[HelloWorkflow], - plugins=[MyClientPlugin()] - ) - replayer = Replayer( - workflows=[HelloWorkflow], - plugins=[MyWorkerPlugin()] + workflows=[HelloWorkflow], plugins=[MyClientPlugin(), MyWorkerPlugin()] ) replayer = Replayer( workflows=[HelloWorkflow], - plugins=[MyClientPlugin(), MyWorkerPlugin()] + plugins=[MyWorkerPlugin(), MyClientPlugin(), MyCombinedPlugin()], ) - replayer = Replayer( - workflows=[HelloWorkflow], - plugins=[MyWorkerPlugin(), MyClientPlugin(), MyCombinedPlugin()] - ) \ No newline at end of file From f61b40082f537d259f9f1e536a9e32485ab4ab80 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 30 Jul 2025 12:33:30 -0700 Subject: [PATCH 04/15] Move shared configuration into plugin definition --- temporalio/worker/__init__.py | 2 + temporalio/worker/_replayer.py | 142 ++++++++++++++++++++++----------- tests/test_plugins.py | 14 ++-- 3 files changed, 108 insertions(+), 50 deletions(-) diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 6e062afcc..4c1138fd5 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -24,6 +24,7 @@ from ._replayer import ( Replayer, ReplayerConfig, + ReplayerPlugin, WorkflowReplayResult, WorkflowReplayResults, ) @@ -68,6 +69,7 @@ "WorkerDeploymentVersion", "Replayer", "ReplayerConfig", + "ReplayerPlugin", "WorkflowReplayResult", "WorkflowReplayResults", "PollerBehavior", diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 0e2045e04..e4cd1b3ad 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -7,7 +7,7 @@ import logging from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, Union, cast +from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, cast from typing_extensions import TypedDict @@ -18,7 +18,6 @@ import temporalio.converter import temporalio.runtime import temporalio.workflow -from temporalio.client import ClientConfig from ..common import HeaderCodecBehavior from ._interceptor import Interceptor @@ -30,6 +29,88 @@ logger = logging.getLogger(__name__) +class ReplayerPlugin: + """Base class for replayer plugins that can modify replayer configuration.""" + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + """Configure the replayer. + + Default implementation applies shared configuration from worker and client plugins. + + Args: + config: The replayer configuration to modify. + + Returns: + The modified replayer configuration. + """ + # If this plugin is also a worker plugin, apply shared worker config + if isinstance(self, temporalio.worker.Plugin): + # Create a minimal worker config with shared fields + worker_config = cast( + WorkerConfig, + { + "workflows": config["workflows"], + "workflow_task_executor": config["workflow_task_executor"], + "workflow_runner": config["workflow_runner"], + "unsandboxed_workflow_runner": config[ + "unsandboxed_workflow_runner" + ], + "interceptors": config["interceptors"], + "build_id": config["build_id"], + "identity": config["identity"], + "workflow_failure_exception_types": config[ + "workflow_failure_exception_types" + ], + "debug_mode": config["debug_mode"], + "disable_safe_workflow_eviction": config[ + "disable_safe_workflow_eviction" + ], + }, + ) + + modified_worker_config = self.configure_worker(worker_config) + config["workflows"] = modified_worker_config["workflows"] + config["workflow_task_executor"] = modified_worker_config[ + "workflow_task_executor" + ] + config["workflow_runner"] = modified_worker_config["workflow_runner"] + config["unsandboxed_workflow_runner"] = modified_worker_config[ + "unsandboxed_workflow_runner" + ] + config["interceptors"] = modified_worker_config["interceptors"] + config["build_id"] = modified_worker_config["build_id"] + config["identity"] = modified_worker_config["identity"] + config["workflow_failure_exception_types"] = modified_worker_config[ + "workflow_failure_exception_types" + ] + config["debug_mode"] = modified_worker_config["debug_mode"] + config["disable_safe_workflow_eviction"] = modified_worker_config[ + "disable_safe_workflow_eviction" + ] + + # If this plugin is also a client plugin, apply shared client config + if isinstance(self, temporalio.client.Plugin): + # Only include fields that exist in both ReplayerConfig and ClientConfig + # Note: interceptors are different types between client and worker, so excluded + client_config = cast( + temporalio.client.ClientConfig, + { + "namespace": config["namespace"], + "data_converter": config["data_converter"], + "header_codec_behavior": config["header_codec_behavior"], + }, + ) + + modified_client_config = self.configure_client(client_config) + config["namespace"] = modified_client_config["namespace"] + config["data_converter"] = modified_client_config["data_converter"] + config["header_codec_behavior"] = modified_client_config[ + "header_codec_behavior" + ] + + return config + + class Replayer: """Replayer to replay workflows from history.""" @@ -43,9 +124,7 @@ def __init__( namespace: str = "ReplayNamespace", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[Interceptor] = [], - plugins: Sequence[ - Union[temporalio.worker.Plugin, temporalio.client.Plugin] - ] = [], + plugins: Sequence[ReplayerPlugin] = [], build_id: Optional[str] = None, identity: Optional[str] = None, workflow_failure_exception_types: Sequence[Type[BaseException]] = [], @@ -85,62 +164,35 @@ def __init__( header_codec_behavior=header_codec_behavior, ) - # Allow plugins to configure shared configurations with worker - root_worker_plugin: temporalio.worker.Plugin = temporalio.worker._worker._RootPlugin() - for plugin in reversed( + # Initialize all worker plugins + root_worker_plugin: temporalio.worker.Plugin = ( + temporalio.worker._worker._RootPlugin() + ) + for worker_plugin in reversed( [ - plugin + cast(temporalio.worker.Plugin, plugin) for plugin in plugins if isinstance(plugin, temporalio.worker.Plugin) ] ): - root_worker_plugin = plugin.init_worker_plugin(root_worker_plugin) - - worker_config = cast( - WorkerConfig, - { - k: v - for k, v in self._config.items() - if k in WorkerConfig.__annotations__ - }, - ) - - worker_config = root_worker_plugin.configure_worker(worker_config) - self._config.update( - cast(ReplayerConfig, { - k: v - for k, v in worker_config.items() - if k in ReplayerConfig.__annotations__ - }) - ) + root_worker_plugin = worker_plugin.init_worker_plugin(root_worker_plugin) - # Allow plugins to configure shared configurations with client + # Initialize all client plugins root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() for client_plugin in reversed( [ - plugin + cast(temporalio.client.Plugin, plugin) for plugin in plugins if isinstance(plugin, temporalio.client.Plugin) ] ): root_client_plugin = client_plugin.init_client_plugin(root_client_plugin) - client_config = cast(ClientConfig, - { - k: v - for k, v in self._config.items() - if k in ClientConfig.__annotations__ - } - ) - client_config = root_client_plugin.configure_client(client_config) - self._config.update( - cast(ReplayerConfig, { - k: v - for k, v in client_config.items() - if k in ReplayerConfig.__annotations__ - }) - ) + # Apply plugin configuration + for plugin in plugins: + self._config = plugin.configure_replayer(self._config) + # Validate workflows after plugin configuration if not self._config["workflows"]: raise ValueError("At least one workflow must be specified") diff --git a/tests/test_plugins.py b/tests/test_plugins.py index a4ccd9244..49ddc23fa 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -11,7 +11,7 @@ from temporalio.client import Client, ClientConfig, OutboundInterceptor from temporalio.contrib.pydantic import pydantic_data_converter from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Replayer, Worker, WorkerConfig +from temporalio.worker import Replayer, ReplayerConfig, Worker, WorkerConfig from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from tests.helpers import new_worker from tests.worker.test_worker import never_run_activity @@ -26,7 +26,7 @@ def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor: return super().intercept_client(next) -class MyClientPlugin(temporalio.client.Plugin): +class MyClientPlugin(temporalio.worker.ReplayerPlugin, temporalio.client.Plugin): def __init__(self): self.interceptor = TestClientInterceptor() @@ -62,13 +62,15 @@ async def test_client_plugin(client: Client, env: WorkflowEnvironment): assert new_client.service_client.config.api_key == "replaced key" -class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): +class MyCombinedPlugin( + temporalio.worker.ReplayerPlugin, temporalio.client.Plugin, temporalio.worker.Plugin +): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "combined" return super().configure_worker(config) -class MyWorkerPlugin(temporalio.worker.Plugin): +class MyWorkerPlugin(temporalio.worker.ReplayerPlugin, temporalio.worker.Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "replaced_queue" runner = config.get("workflow_runner") @@ -142,7 +144,9 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: ) -class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): +class ReplayCheckPlugin( + temporalio.worker.ReplayerPlugin, temporalio.client.Plugin, temporalio.worker.Plugin +): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] return super().configure_worker(config) From 1007d02c5a3a623c41c632bba7b98fd69c7d4f84 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Fri, 1 Aug 2025 16:59:39 -0700 Subject: [PATCH 05/15] Moving replay configuration to worker plugin, add replay execution hook --- .../openai_agents/_temporal_openai_agents.py | 30 +++- temporalio/worker/__init__.py | 2 - temporalio/worker/_replayer.py | 167 ++++-------------- temporalio/worker/_worker.py | 29 +++ .../worker/workflow_sandbox/_importer.py | 6 + .../openai_agents/test_openai_replay.py | 41 ++--- tests/test_plugins.py | 31 ++-- 7 files changed, 131 insertions(+), 175 deletions(-) diff --git a/temporalio/contrib/openai_agents/_temporal_openai_agents.py b/temporalio/contrib/openai_agents/_temporal_openai_agents.py index a1f71db71..6a90e5113 100644 --- a/temporalio/contrib/openai_agents/_temporal_openai_agents.py +++ b/temporalio/contrib/openai_agents/_temporal_openai_agents.py @@ -1,6 +1,6 @@ """Initialize Temporal OpenAI Agents overrides.""" -from contextlib import contextmanager +from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager from datetime import timedelta from typing import AsyncIterator, Callable, Optional, Union @@ -41,7 +41,13 @@ from temporalio.converter import ( DataConverter, ) -from temporalio.worker import Worker, WorkerConfig +from temporalio.worker import ( + Replayer, + ReplayerConfig, + Worker, + WorkerConfig, + WorkflowReplayResult, +) @contextmanager @@ -282,3 +288,23 @@ async def run_worker(self, worker: Worker) -> None: """ with set_open_ai_agent_temporal_overrides(self._model_params): await super().run_worker(worker) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + """Configure the replayer for OpenAI Agents.""" + config["interceptors"] = list(config.get("interceptors") or []) + [ + OpenAIAgentsTracingInterceptor() + ] + config["data_converter"] = DataConverter( + payload_converter_class=_OpenAIPayloadConverter + ) + return config + + @asynccontextmanager + async def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + with set_open_ai_agent_temporal_overrides(self._model_params): + async with super().workflow_replay(replayer, histories) as results: + yield results diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 4c1138fd5..6e062afcc 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -24,7 +24,6 @@ from ._replayer import ( Replayer, ReplayerConfig, - ReplayerPlugin, WorkflowReplayResult, WorkflowReplayResults, ) @@ -69,7 +68,6 @@ "WorkerDeploymentVersion", "Replayer", "ReplayerConfig", - "ReplayerPlugin", "WorkflowReplayResult", "WorkflowReplayResults", "PollerBehavior", diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index e4cd1b3ad..c30b5fb16 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -5,9 +5,10 @@ import asyncio import concurrent.futures import logging -from contextlib import asynccontextmanager +import typing +from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type, cast +from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type from typing_extensions import TypedDict @@ -21,7 +22,6 @@ from ..common import HeaderCodecBehavior from ._interceptor import Interceptor -from ._worker import WorkerConfig, load_default_build_id from ._workflow import _WorkflowWorker from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner from .workflow_sandbox import SandboxedWorkflowRunner @@ -29,86 +29,23 @@ logger = logging.getLogger(__name__) -class ReplayerPlugin: - """Base class for replayer plugins that can modify replayer configuration.""" - - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - """Configure the replayer. - - Default implementation applies shared configuration from worker and client plugins. - - Args: - config: The replayer configuration to modify. - - Returns: - The modified replayer configuration. - """ - # If this plugin is also a worker plugin, apply shared worker config - if isinstance(self, temporalio.worker.Plugin): - # Create a minimal worker config with shared fields - worker_config = cast( - WorkerConfig, - { - "workflows": config["workflows"], - "workflow_task_executor": config["workflow_task_executor"], - "workflow_runner": config["workflow_runner"], - "unsandboxed_workflow_runner": config[ - "unsandboxed_workflow_runner" - ], - "interceptors": config["interceptors"], - "build_id": config["build_id"], - "identity": config["identity"], - "workflow_failure_exception_types": config[ - "workflow_failure_exception_types" - ], - "debug_mode": config["debug_mode"], - "disable_safe_workflow_eviction": config[ - "disable_safe_workflow_eviction" - ], - }, - ) - - modified_worker_config = self.configure_worker(worker_config) - config["workflows"] = modified_worker_config["workflows"] - config["workflow_task_executor"] = modified_worker_config[ - "workflow_task_executor" - ] - config["workflow_runner"] = modified_worker_config["workflow_runner"] - config["unsandboxed_workflow_runner"] = modified_worker_config[ - "unsandboxed_workflow_runner" - ] - config["interceptors"] = modified_worker_config["interceptors"] - config["build_id"] = modified_worker_config["build_id"] - config["identity"] = modified_worker_config["identity"] - config["workflow_failure_exception_types"] = modified_worker_config[ - "workflow_failure_exception_types" - ] - config["debug_mode"] = modified_worker_config["debug_mode"] - config["disable_safe_workflow_eviction"] = modified_worker_config[ - "disable_safe_workflow_eviction" - ] - - # If this plugin is also a client plugin, apply shared client config - if isinstance(self, temporalio.client.Plugin): - # Only include fields that exist in both ReplayerConfig and ClientConfig - # Note: interceptors are different types between client and worker, so excluded - client_config = cast( - temporalio.client.ClientConfig, - { - "namespace": config["namespace"], - "data_converter": config["data_converter"], - "header_codec_behavior": config["header_codec_behavior"], - }, - ) - - modified_client_config = self.configure_client(client_config) - config["namespace"] = modified_client_config["namespace"] - config["data_converter"] = modified_client_config["data_converter"] - config["header_codec_behavior"] = modified_client_config[ - "header_codec_behavior" - ] +class ReplayerConfig(TypedDict, total=False): + """TypedDict of config originally passed to :py:class:`Replayer`.""" - return config + workflows: Sequence[Type] + workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] + workflow_runner: WorkflowRunner + unsandboxed_workflow_runner: WorkflowRunner + namespace: str + data_converter: temporalio.converter.DataConverter + interceptors: Sequence[Interceptor] + build_id: Optional[str] + identity: Optional[str] + workflow_failure_exception_types: Sequence[Type[BaseException]] + debug_mode: bool + runtime: Optional[temporalio.runtime.Runtime] + disable_safe_workflow_eviction: bool + header_codec_behavior: HeaderCodecBehavior class Replayer: @@ -124,7 +61,7 @@ def __init__( namespace: str = "ReplayNamespace", data_converter: temporalio.converter.DataConverter = temporalio.converter.DataConverter.default, interceptors: Sequence[Interceptor] = [], - plugins: Sequence[ReplayerPlugin] = [], + plugins: Sequence[temporalio.worker.Plugin] = [], build_id: Optional[str] = None, identity: Optional[str] = None, workflow_failure_exception_types: Sequence[Type[BaseException]] = [], @@ -164,32 +101,16 @@ def __init__( header_codec_behavior=header_codec_behavior, ) - # Initialize all worker plugins - root_worker_plugin: temporalio.worker.Plugin = ( - temporalio.worker._worker._RootPlugin() - ) - for worker_plugin in reversed( - [ - cast(temporalio.worker.Plugin, plugin) - for plugin in plugins - if isinstance(plugin, temporalio.worker.Plugin) - ] - ): - root_worker_plugin = worker_plugin.init_worker_plugin(root_worker_plugin) - - # Initialize all client plugins - root_client_plugin: temporalio.client.Plugin = temporalio.client._RootPlugin() - for client_plugin in reversed( - [ - cast(temporalio.client.Plugin, plugin) - for plugin in plugins - if isinstance(plugin, temporalio.client.Plugin) - ] - ): - root_client_plugin = client_plugin.init_client_plugin(root_client_plugin) + from ._worker import _RootPlugin + + root_plugin: temporalio.worker.Plugin = _RootPlugin() + for plugin in reversed(plugins): + root_plugin = plugin.init_worker_plugin(root_plugin) + self._config = root_plugin.configure_replayer(self._config) + self._plugin = root_plugin # Apply plugin configuration - for plugin in plugins: + for plugin in reversed(plugins): self._config = plugin.configure_replayer(self._config) # Validate workflows after plugin configuration @@ -262,10 +183,9 @@ async def replay_workflows( replay_failures[result.history.run_id] = result.replay_failure return WorkflowReplayResults(replay_failures=replay_failures) - @asynccontextmanager - async def workflow_replay_iterator( + def workflow_replay_iterator( self, histories: AsyncIterator[temporalio.client.WorkflowHistory] - ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: """Replay workflows for the given histories. This is a context manager for use via ``async with``. The value is an @@ -278,6 +198,12 @@ async def workflow_replay_iterator( An async iterator that returns replayed workflow results as they are replayed. """ + return self._plugin.workflow_replay(self, histories) + + @asynccontextmanager + async def _workflow_replay_iterator( + self, histories: AsyncIterator[temporalio.client.WorkflowHistory] + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: try: last_replay_failure: Optional[Exception] last_replay_complete = asyncio.Event() @@ -337,6 +263,8 @@ def on_eviction_hook( != HeaderCodecBehavior.NO_CODEC, ) # Create bridge worker + from ._worker import load_default_build_id + bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( runtime._core_runtime, temporalio.bridge.worker.WorkerConfig( @@ -440,25 +368,6 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]: logger.warning("Failed to finalize shutdown", exc_info=True) -class ReplayerConfig(TypedDict, total=False): - """TypedDict of config originally passed to :py:class:`Replayer`.""" - - workflows: Sequence[Type] - workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] - workflow_runner: WorkflowRunner - unsandboxed_workflow_runner: WorkflowRunner - namespace: str - data_converter: temporalio.converter.DataConverter - interceptors: Sequence[Interceptor] - build_id: Optional[str] - identity: Optional[str] - workflow_failure_exception_types: Sequence[Type[BaseException]] - debug_mode: bool - runtime: Optional[temporalio.runtime.Runtime] - disable_safe_workflow_eviction: bool - header_codec_behavior: HeaderCodecBehavior - - @dataclass(frozen=True) class WorkflowReplayResult: """Single workflow replay result.""" diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index 58f881c04..f96f4199b 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -9,10 +9,12 @@ import logging import sys import warnings +from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass from datetime import timedelta from typing import ( Any, + AsyncIterator, Awaitable, Callable, List, @@ -36,6 +38,7 @@ WorkerDeploymentVersion, ) +from . import Replayer, ReplayerConfig, WorkflowReplayResult from ._activity import SharedStateManager, _ActivityWorker from ._interceptor import Interceptor from ._nexus import _NexusWorker @@ -149,6 +152,25 @@ async def run_worker(self, worker: Worker) -> None: """ await self.next_worker_plugin.run_worker(worker) + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + """Hook called when creating a replayer to allow modification of configuration. + + This should be used to configure anything in ReplayerConfig needed to make execution match + the original. This could include interceptors, DataConverter, workflows, and more. + + Uniquely does not rely on a chain, and is instead called sequentially on the plugins + because the replayer cannot instantiate the worker/client component. + """ + return config + + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + """Hook called when running a replayer to allow interception of execution.""" + return self.next_worker_plugin.workflow_replay(replayer, histories) + class _RootPlugin(Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: @@ -157,6 +179,13 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: async def run_worker(self, worker: Worker) -> None: await worker._run() + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return replayer._workflow_replay_iterator(histories) + class Worker: """Worker to process workflows and/or activities. diff --git a/temporalio/worker/workflow_sandbox/_importer.py b/temporalio/worker/workflow_sandbox/_importer.py index 462bd44c2..944f2929a 100644 --- a/temporalio/worker/workflow_sandbox/_importer.py +++ b/temporalio/worker/workflow_sandbox/_importer.py @@ -285,6 +285,12 @@ def module_configured_passthrough(self, name: str) -> bool: def _maybe_passthrough_module(self, name: str) -> Optional[types.ModuleType]: # If imports not passed through and all modules are not passed through # and name not in passthrough modules, check parents + logger.debug( + "Check passthrough module: %s - %s", + name, + temporalio.workflow.unsafe.is_imports_passed_through() + or self.module_configured_passthrough(name), + ) if ( not temporalio.workflow.unsafe.is_imports_passed_through() and not self.module_configured_passthrough(name) diff --git a/tests/contrib/openai_agents/test_openai_replay.py b/tests/contrib/openai_agents/test_openai_replay.py index d3ac92c5e..c6ac1ea68 100644 --- a/tests/contrib/openai_agents/test_openai_replay.py +++ b/tests/contrib/openai_agents/test_openai_replay.py @@ -1,16 +1,15 @@ +from contextlib import ( + AbstractAsyncContextManager, + AbstractContextManager, + asynccontextmanager, +) from pathlib import Path +from typing import AsyncGenerator import pytest from temporalio.client import WorkflowHistory -from temporalio.contrib.openai_agents import ModelActivityParameters -from temporalio.contrib.openai_agents._temporal_openai_agents import ( - set_open_ai_agent_temporal_overrides, -) -from temporalio.contrib.openai_agents._trace_interceptor import ( - OpenAIAgentsTracingInterceptor, -) -from temporalio.contrib.pydantic import pydantic_data_converter +from temporalio.contrib.openai_agents import ModelActivityParameters, OpenAIAgentsPlugin from temporalio.worker import Replayer from tests.contrib.openai_agents.test_openai import ( AgentsAsToolsWorkflow, @@ -39,17 +38,15 @@ async def test_replay(file_name: str) -> None: with (Path(__file__).with_name("histories") / file_name).open("r") as f: history_json = f.read() - with set_open_ai_agent_temporal_overrides(ModelActivityParameters()): - await Replayer( - workflows=[ - ResearchWorkflow, - ToolsWorkflow, - CustomerServiceWorkflow, - AgentsAsToolsWorkflow, - HelloWorldAgent, - InputGuardrailWorkflow, - OutputGuardrailWorkflow, - ], - data_converter=pydantic_data_converter, - interceptors=[OpenAIAgentsTracingInterceptor()], - ).replay_workflow(WorkflowHistory.from_json("fake", history_json)) + await Replayer( + workflows=[ + ResearchWorkflow, + ToolsWorkflow, + CustomerServiceWorkflow, + AgentsAsToolsWorkflow, + HelloWorldAgent, + InputGuardrailWorkflow, + OutputGuardrailWorkflow, + ], + plugins=[OpenAIAgentsPlugin()], + ).replay_workflow(WorkflowHistory.from_json("fake", history_json)) diff --git a/tests/test_plugins.py b/tests/test_plugins.py index 49ddc23fa..ced9ba668 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -26,7 +26,7 @@ def intercept_client(self, next: OutboundInterceptor) -> OutboundInterceptor: return super().intercept_client(next) -class MyClientPlugin(temporalio.worker.ReplayerPlugin, temporalio.client.Plugin): +class MyClientPlugin(temporalio.client.Plugin): def __init__(self): self.interceptor = TestClientInterceptor() @@ -62,15 +62,13 @@ async def test_client_plugin(client: Client, env: WorkflowEnvironment): assert new_client.service_client.config.api_key == "replaced key" -class MyCombinedPlugin( - temporalio.worker.ReplayerPlugin, temporalio.client.Plugin, temporalio.worker.Plugin -): +class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "combined" return super().configure_worker(config) -class MyWorkerPlugin(temporalio.worker.ReplayerPlugin, temporalio.worker.Plugin): +class MyWorkerPlugin(temporalio.worker.Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "replaced_queue" runner = config.get("workflow_runner") @@ -144,16 +142,19 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: ) -class ReplayCheckPlugin( - temporalio.worker.ReplayerPlugin, temporalio.client.Plugin, temporalio.worker.Plugin -): +class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def configure_client(self, config: ClientConfig) -> ClientConfig: + config["data_converter"] = pydantic_data_converter + return super().configure_client(config) + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] return super().configure_worker(config) - def configure_client(self, config: ClientConfig) -> ClientConfig: + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: config["data_converter"] = pydantic_data_converter - return super().configure_client(config) + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] + return config @workflow.defn @@ -182,13 +183,3 @@ async def test_replay(client: Client) -> None: assert replayer.config().get("data_converter") == pydantic_data_converter await replayer.replay_workflow(await handle.fetch_history()) - - replayer = Replayer(workflows=[HelloWorkflow], plugins=[MyClientPlugin()]) - replayer = Replayer(workflows=[HelloWorkflow], plugins=[MyWorkerPlugin()]) - replayer = Replayer( - workflows=[HelloWorkflow], plugins=[MyClientPlugin(), MyWorkerPlugin()] - ) - replayer = Replayer( - workflows=[HelloWorkflow], - plugins=[MyWorkerPlugin(), MyClientPlugin(), MyCombinedPlugin()], - ) From 673afbec647e76a86cc352b61aedd54ac4631103 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 4 Aug 2025 09:30:43 -0700 Subject: [PATCH 06/15] Readme and some cleanup --- README.md | 30 +++++++++++++ .../openai_agents/_temporal_openai_agents.py | 2 +- temporalio/worker/_replayer.py | 44 +++++++++---------- temporalio/worker/_worker.py | 14 ++++-- 4 files changed, 61 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 546c436f0..494978b5a 100644 --- a/README.md +++ b/README.md @@ -1612,6 +1612,36 @@ worker = Worker( ) ``` +Worker plugins can also configure replay. They should do this in the case that they modified the +worker in a way which would also need to be present for replay to function. For instance, changing the data converter +or adding workflows. + +```python +class ReplayPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def configure_client(self, config: ClientConfig) -> ClientConfig: + config["data_converter"] = pydantic_data_converter + return super().configure_client(config) + + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] + return super().configure_worker(config) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + config["data_converter"] = pydantic_data_converter + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] + return config + + @asynccontextmanager + async def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + with set_some_context(): + async with super().workflow_replay(replayer, histories) as results: + yield results +``` + **Important Notes:** - Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility diff --git a/temporalio/contrib/openai_agents/_temporal_openai_agents.py b/temporalio/contrib/openai_agents/_temporal_openai_agents.py index 6a90e5113..ecc3a72f7 100644 --- a/temporalio/contrib/openai_agents/_temporal_openai_agents.py +++ b/temporalio/contrib/openai_agents/_temporal_openai_agents.py @@ -1,6 +1,6 @@ """Initialize Temporal OpenAI Agents overrides.""" -from contextlib import AbstractAsyncContextManager, asynccontextmanager, contextmanager +from contextlib import asynccontextmanager, contextmanager from datetime import timedelta from typing import AsyncIterator, Callable, Optional, Union diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index c30b5fb16..d16eebb4c 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -5,7 +5,6 @@ import asyncio import concurrent.futures import logging -import typing from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass from typing import AsyncIterator, Dict, Mapping, Optional, Sequence, Type @@ -29,25 +28,6 @@ logger = logging.getLogger(__name__) -class ReplayerConfig(TypedDict, total=False): - """TypedDict of config originally passed to :py:class:`Replayer`.""" - - workflows: Sequence[Type] - workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] - workflow_runner: WorkflowRunner - unsandboxed_workflow_runner: WorkflowRunner - namespace: str - data_converter: temporalio.converter.DataConverter - interceptors: Sequence[Interceptor] - build_id: Optional[str] - identity: Optional[str] - workflow_failure_exception_types: Sequence[Type[BaseException]] - debug_mode: bool - runtime: Optional[temporalio.runtime.Runtime] - disable_safe_workflow_eviction: bool - header_codec_behavior: HeaderCodecBehavior - - class Replayer: """Replayer to replay workflows from history.""" @@ -103,16 +83,13 @@ def __init__( from ._worker import _RootPlugin + # Apply plugin configuration root_plugin: temporalio.worker.Plugin = _RootPlugin() for plugin in reversed(plugins): root_plugin = plugin.init_worker_plugin(root_plugin) self._config = root_plugin.configure_replayer(self._config) self._plugin = root_plugin - # Apply plugin configuration - for plugin in reversed(plugins): - self._config = plugin.configure_replayer(self._config) - # Validate workflows after plugin configuration if not self._config["workflows"]: raise ValueError("At least one workflow must be specified") @@ -368,6 +345,25 @@ async def replay_iterator() -> AsyncIterator[WorkflowReplayResult]: logger.warning("Failed to finalize shutdown", exc_info=True) +class ReplayerConfig(TypedDict, total=False): + """TypedDict of config originally passed to :py:class:`Replayer`.""" + + workflows: Sequence[Type] + workflow_task_executor: Optional[concurrent.futures.ThreadPoolExecutor] + workflow_runner: WorkflowRunner + unsandboxed_workflow_runner: WorkflowRunner + namespace: str + data_converter: temporalio.converter.DataConverter + interceptors: Sequence[Interceptor] + build_id: Optional[str] + identity: Optional[str] + workflow_failure_exception_types: Sequence[Type[BaseException]] + debug_mode: bool + runtime: Optional[temporalio.runtime.Runtime] + disable_safe_workflow_eviction: bool + header_codec_behavior: HeaderCodecBehavior + + @dataclass(frozen=True) class WorkflowReplayResult: """Single workflow replay result.""" diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index f96f4199b..e97ec1fcf 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -9,7 +9,7 @@ import logging import sys import warnings -from contextlib import AbstractAsyncContextManager, asynccontextmanager +from contextlib import AbstractAsyncContextManager from dataclasses import dataclass from datetime import timedelta from typing import ( @@ -158,10 +158,13 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: This should be used to configure anything in ReplayerConfig needed to make execution match the original. This could include interceptors, DataConverter, workflows, and more. - Uniquely does not rely on a chain, and is instead called sequentially on the plugins - because the replayer cannot instantiate the worker/client component. + Args: + config: The replayer configuration dictionary to potentially modify. + + Returns: + The modified replayer configuration. """ - return config + return self.next_worker_plugin.configure_replayer(config) def workflow_replay( self, @@ -176,6 +179,9 @@ class _RootPlugin(Plugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: return config + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + return config + async def run_worker(self, worker: Worker) -> None: await worker._run() From 9812c95ea52445ac070fb78192d36f7e9ffb2d9c Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 4 Aug 2025 09:31:57 -0700 Subject: [PATCH 07/15] Remove debug logging --- temporalio/worker/workflow_sandbox/_importer.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/temporalio/worker/workflow_sandbox/_importer.py b/temporalio/worker/workflow_sandbox/_importer.py index 944f2929a..462bd44c2 100644 --- a/temporalio/worker/workflow_sandbox/_importer.py +++ b/temporalio/worker/workflow_sandbox/_importer.py @@ -285,12 +285,6 @@ def module_configured_passthrough(self, name: str) -> bool: def _maybe_passthrough_module(self, name: str) -> Optional[types.ModuleType]: # If imports not passed through and all modules are not passed through # and name not in passthrough modules, check parents - logger.debug( - "Check passthrough module: %s - %s", - name, - temporalio.workflow.unsafe.is_imports_passed_through() - or self.module_configured_passthrough(name), - ) if ( not temporalio.workflow.unsafe.is_imports_passed_through() and not self.module_configured_passthrough(name) From 2ae78b10ae5e1c03c042263453b83711d4897543 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 4 Aug 2025 12:52:02 -0700 Subject: [PATCH 08/15] Make plugin interface experimental and abstract --- temporalio/client.py | 10 +- .../openai_agents/_temporal_openai_agents.py | 33 +++++- temporalio/worker/_worker.py | 16 ++- tests/test_plugins.py | 109 ++++++++++++++++-- 4 files changed, 142 insertions(+), 26 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index b4b5453d7..b75669258 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -7398,6 +7398,7 @@ def name(self) -> str: """ return type(self).__module__ + "." + type(self).__qualname__ + @abstractmethod def init_client_plugin(self, next: Plugin) -> Plugin: """Initialize this plugin in the plugin chain. @@ -7411,9 +7412,8 @@ def init_client_plugin(self, next: Plugin) -> Plugin: Returns: This plugin instance for method chaining. """ - self.next_client_plugin = next - return self + @abstractmethod def configure_client(self, config: ClientConfig) -> ClientConfig: """Hook called when creating a client to allow modification of configuration. @@ -7427,8 +7427,8 @@ def configure_client(self, config: ClientConfig) -> ClientConfig: Returns: The modified client configuration. """ - return self.next_client_plugin.configure_client(config) + @abstractmethod async def connect_service_client( self, config: temporalio.service.ConnectConfig ) -> temporalio.service.ServiceClient: @@ -7444,10 +7444,12 @@ async def connect_service_client( Returns: The connected service client. """ - return await self.next_client_plugin.connect_service_client(config) class _RootPlugin(Plugin): + def init_client_plugin(self, next: Plugin) -> Plugin: + raise NotImplementedError() + def configure_client(self, config: ClientConfig) -> ClientConfig: return config diff --git a/temporalio/contrib/openai_agents/_temporal_openai_agents.py b/temporalio/contrib/openai_agents/_temporal_openai_agents.py index ecc3a72f7..714d4a59a 100644 --- a/temporalio/contrib/openai_agents/_temporal_openai_agents.py +++ b/temporalio/contrib/openai_agents/_temporal_openai_agents.py @@ -24,7 +24,7 @@ import temporalio.client import temporalio.worker -from temporalio.client import ClientConfig +from temporalio.client import ClientConfig, Plugin from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner @@ -237,6 +237,26 @@ def __init__( self._model_params = model_params self._model_provider = model_provider + def init_client_plugin( + self, next: temporalio.client.Plugin + ) -> temporalio.client.Plugin: + """Set the next client plugin""" + self.next_client_plugin = next + return self + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + """No modifications to service client""" + return await self.next_client_plugin.connect_service_client(config) + + def init_worker_plugin( + self, next: temporalio.worker.Plugin + ) -> temporalio.worker.Plugin: + """Set the next worker plugin""" + self.next_worker_plugin = next + return self + def configure_client(self, config: ClientConfig) -> ClientConfig: """Configure the Temporal client for OpenAI agents integration. @@ -252,7 +272,7 @@ def configure_client(self, config: ClientConfig) -> ClientConfig: config["data_converter"] = DataConverter( payload_converter_class=_OpenAIPayloadConverter ) - return super().configure_client(config) + return self.next_client_plugin.configure_client(config) def configure_worker(self, config: WorkerConfig) -> WorkerConfig: """Configure the Temporal worker for OpenAI agents integration. @@ -274,7 +294,7 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["activities"] = list(config.get("activities") or []) + [ ModelActivity(self._model_provider).invoke_model_activity ] - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) async def run_worker(self, worker: Worker) -> None: """Run the worker with OpenAI agents temporal overrides. @@ -287,7 +307,7 @@ async def run_worker(self, worker: Worker) -> None: worker: The worker instance to run. """ with set_open_ai_agent_temporal_overrides(self._model_params): - await super().run_worker(worker) + await self.next_worker_plugin.run_worker(worker) def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: """Configure the replayer for OpenAI Agents.""" @@ -305,6 +325,9 @@ async def workflow_replay( replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + """Set the OpenAI Overrides during replay""" with set_open_ai_agent_temporal_overrides(self._model_params): - async with super().workflow_replay(replayer, histories) as results: + async with self.next_worker_plugin.workflow_replay( + replayer, histories + ) as results: yield results diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index e97ec1fcf..b7d01fed8 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -98,6 +98,8 @@ class Plugin(abc.ABC): Plugins allow customization of worker creation and execution processes through a chain of responsibility pattern. Each plugin can modify the worker configuration or intercept worker execution. + + WARNING: This is an experimental feature and may change in the future. """ def name(self) -> str: @@ -108,6 +110,7 @@ def name(self) -> str: """ return type(self).__module__ + "." + type(self).__qualname__ + @abc.abstractmethod def init_worker_plugin(self, next: Plugin) -> Plugin: """Initialize this plugin in the plugin chain. @@ -121,9 +124,8 @@ def init_worker_plugin(self, next: Plugin) -> Plugin: Returns: This plugin instance for method chaining. """ - self.next_worker_plugin = next - return self + @abc.abstractmethod def configure_worker(self, config: WorkerConfig) -> WorkerConfig: """Hook called when creating a worker to allow modification of configuration. @@ -138,8 +140,8 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: Returns: The modified worker configuration. """ - return self.next_worker_plugin.configure_worker(config) + @abc.abstractmethod async def run_worker(self, worker: Worker) -> None: """Hook called when running a worker to allow interception of execution. @@ -150,8 +152,8 @@ async def run_worker(self, worker: Worker) -> None: Args: worker: The worker instance to run. """ - await self.next_worker_plugin.run_worker(worker) + @abc.abstractmethod def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: """Hook called when creating a replayer to allow modification of configuration. @@ -164,18 +166,20 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: Returns: The modified replayer configuration. """ - return self.next_worker_plugin.configure_replayer(config) + @abc.abstractmethod def workflow_replay( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: """Hook called when running a replayer to allow interception of execution.""" - return self.next_worker_plugin.workflow_replay(replayer, histories) class _RootPlugin(Plugin): + def init_worker_plugin(self, next: Plugin) -> Plugin: + raise NotImplementedError() + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: return config diff --git a/tests/test_plugins.py b/tests/test_plugins.py index ced9ba668..de2533230 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,17 +1,24 @@ import dataclasses import uuid import warnings -from typing import cast +from contextlib import AbstractAsyncContextManager +from typing import AsyncIterator, cast import pytest import temporalio.client import temporalio.worker from temporalio import workflow -from temporalio.client import Client, ClientConfig, OutboundInterceptor +from temporalio.client import Client, ClientConfig, OutboundInterceptor, Plugin from temporalio.contrib.pydantic import pydantic_data_converter from temporalio.testing import WorkflowEnvironment -from temporalio.worker import Replayer, ReplayerConfig, Worker, WorkerConfig +from temporalio.worker import ( + Replayer, + ReplayerConfig, + Worker, + WorkerConfig, + WorkflowReplayResult, +) from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner from tests.helpers import new_worker from tests.worker.test_worker import never_run_activity @@ -30,18 +37,22 @@ class MyClientPlugin(temporalio.client.Plugin): def __init__(self): self.interceptor = TestClientInterceptor() + def init_client_plugin(self, next: Plugin) -> Plugin: + self.next_client_plugin = next + return self + def configure_client(self, config: ClientConfig) -> ClientConfig: config["namespace"] = "replaced_namespace" config["interceptors"] = list(config.get("interceptors") or []) + [ self.interceptor ] - return super().configure_client(config) + return self.next_client_plugin.configure_client(config) async def connect_service_client( self, config: temporalio.service.ConnectConfig ) -> temporalio.service.ServiceClient: config.api_key = "replaced key" - return await super().connect_service_client(config) + return await self.next_client_plugin.connect_service_client(config) async def test_client_plugin(client: Client, env: WorkflowEnvironment): @@ -63,12 +74,51 @@ async def test_client_plugin(client: Client, env: WorkflowEnvironment): class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def init_worker_plugin( + self, next: temporalio.worker.Plugin + ) -> temporalio.worker.Plugin: + self.next_worker_plugin = next + return self + + def init_client_plugin( + self, next: temporalio.client.Plugin + ) -> temporalio.client.Plugin: + self.next_client_plugin = next + return self + + def configure_client(self, config: ClientConfig) -> ClientConfig: + return self.next_client_plugin.configure_client(config) + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "combined" - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + return await self.next_client_plugin.connect_service_client(config) + + async def run_worker(self, worker: Worker) -> None: + await self.next_worker_plugin.run_worker(worker) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + return self.next_worker_plugin.configure_replayer(config) + + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return self.next_worker_plugin.workflow_replay(replayer, histories) class MyWorkerPlugin(temporalio.worker.Plugin): + def init_worker_plugin( + self, next: temporalio.worker.Plugin + ) -> temporalio.worker.Plugin: + self.next_worker_plugin = next + return self + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "replaced_queue" runner = config.get("workflow_runner") @@ -77,10 +127,20 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig: runner, restrictions=runner.restrictions.with_passthrough_modules("my_module"), ) - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) async def run_worker(self, worker: Worker) -> None: - await super().run_worker(worker) + await self.next_worker_plugin.run_worker(worker) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + return self.next_worker_plugin.configure_replayer(config) + + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return self.next_worker_plugin.workflow_replay(replayer, histories) async def test_worker_plugin_basic_config(client: Client) -> None: @@ -143,18 +203,45 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): + def init_worker_plugin( + self, next: temporalio.worker.Plugin + ) -> temporalio.worker.Plugin: + self.next_worker_plugin = next + return self + + def init_client_plugin( + self, next: temporalio.client.Plugin + ) -> temporalio.client.Plugin: + self.next_client_plugin = next + return self + def configure_client(self, config: ClientConfig) -> ClientConfig: config["data_converter"] = pydantic_data_converter - return super().configure_client(config) + return self.next_client_plugin.configure_client(config) def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: config["data_converter"] = pydantic_data_converter config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return config + return self.next_worker_plugin.configure_replayer(config) + + async def run_worker(self, worker: Worker) -> None: + await self.next_worker_plugin.run_worker(worker) + + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + return await self.next_client_plugin.connect_service_client(config) + + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return self.next_worker_plugin.workflow_replay(replayer, histories) @workflow.defn From beec4bd1e20735aba8d3236d3acac8bdf9d6b0ac Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 4 Aug 2025 12:59:49 -0700 Subject: [PATCH 09/15] Update readme --- README.md | 74 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 494978b5a..ba29666e0 100644 --- a/README.md +++ b/README.md @@ -1515,18 +1515,22 @@ import temporalio.service class AuthenticationPlugin(Plugin): def __init__(self, api_key: str): self.api_key = api_key + + def init_client_plugin(self, next: Plugin) -> Plugin: + self.next_client_plugin = next + return self def configure_client(self, config: ClientConfig) -> ClientConfig: # Modify client configuration config["namespace"] = "my-secure-namespace" - return super().configure_client(config) + return self.next_client_plugin.configure_client(config) async def connect_service_client( self, config: temporalio.service.ConnectConfig ) -> temporalio.service.ServiceClient: # Add authentication to the connection config.api_key = self.api_key - return await super().connect_service_client(config) + return await self.next_client_plugin.connect_service_client(config) # Use the plugin when connecting client = await Client.connect( @@ -1543,26 +1547,48 @@ custom lifecycle management, or modifying worker settings. Here's an example of a worker plugin that adds custom monitoring: ```python -from temporalio.worker import Plugin, WorkerConfig, Worker +import temporalio +from contextlib import asynccontextmanager +from typing import AsyncIterator +from temporalio.worker import Plugin, WorkerConfig, Worker, ReplayerConfig, Worker, Replayer, WorkflowReplayResult import logging class MonitoringPlugin(Plugin): def __init__(self): self.logger = logging.getLogger(__name__) + def init_worker_plugin(self, next: Plugin) -> Plugin: + self.next_worker_plugin = next + return self + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Modify worker configuration original_task_queue = config["task_queue"] config["task_queue"] = f"monitored-{original_task_queue}" self.logger.info(f"Worker created for task queue: {config['task_queue']}") - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) async def run_worker(self, worker: Worker) -> None: self.logger.info("Starting worker execution") try: - await super().run_worker(worker) + await self.next_worker_plugin.run_worker(worker) finally: self.logger.info("Worker execution completed") + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + config["data_converter"] = pydantic_data_converter + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] + return config + + @asynccontextmanager + async def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + with set_some_context(): + async with super().workflow_replay(replayer, histories) as results: + yield results # Use the plugin when creating a worker worker = Worker( @@ -1577,16 +1603,34 @@ worker = Worker( For plugins that need to work with both clients and workers, you can implement both interfaces in a single class: ```python +import temporalio +from contextlib import asynccontextmanager +from typing import AsyncIterator from temporalio.client import Plugin as ClientPlugin, ClientConfig -from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig +from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult class UnifiedPlugin(ClientPlugin, WorkerPlugin): + def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin: + self.next_client_plugin = next + return self + + def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin: + self.next_worker_plugin = next + return self + def configure_client(self, config: ClientConfig) -> ClientConfig: # Client-side customization config["namespace"] = "unified-namespace" return super().configure_client(config) + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + # Add authentication to the connection + config.api_key = self.api_key + return await self.next_client_plugin.connect_service_client(config) + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Worker-side customization config["max_cached_workflows"] = 500 @@ -1595,8 +1639,22 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin): async def run_worker(self, worker: Worker) -> None: print("Starting unified worker") await super().run_worker(worker) - - + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + config["data_converter"] = pydantic_data_converter + config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] + return config + + @asynccontextmanager + async def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + with set_some_context(): + async with super().workflow_replay(replayer, histories) as results: + yield results + # Create client with the unified plugin client = await Client.connect( "localhost:7233", From 91c229f073e1215d34bac9613c7d252c2a76a432 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Mon, 4 Aug 2025 13:00:27 -0700 Subject: [PATCH 10/15] Update readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index ba29666e0..f520775e1 100644 --- a/README.md +++ b/README.md @@ -1542,7 +1542,9 @@ client = await Client.connect( #### Worker Plugins Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, -custom lifecycle management, or modifying worker settings. +custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay. +They should do this in the case that they modified the worker in a way which would also need to be present +for replay to function. For instance, changing the data converter or adding workflows. Here's an example of a worker plugin that adds custom monitoring: @@ -1670,9 +1672,7 @@ worker = Worker( ) ``` -Worker plugins can also configure replay. They should do this in the case that they modified the -worker in a way which would also need to be present for replay to function. For instance, changing the data converter -or adding workflows. + ```python class ReplayPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): From cd2d12ffab5d9ece543f7cb3402d53528ab8d8ce Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 5 Aug 2025 08:53:27 -0700 Subject: [PATCH 11/15] Readme fixes, move plugin to its own file --- README.md | 100 +++++++-------- temporalio/worker/__init__.py | 2 +- temporalio/worker/_plugin.py | 121 ++++++++++++++++++ temporalio/worker/_replayer.py | 6 +- temporalio/worker/_worker.py | 110 +--------------- .../openai_agents/test_openai_replay.py | 6 - tests/test_plugins.py | 12 +- 7 files changed, 181 insertions(+), 176 deletions(-) create mode 100644 temporalio/worker/_plugin.py diff --git a/README.md b/README.md index f520775e1..fff0cfa7e 100644 --- a/README.md +++ b/README.md @@ -1578,9 +1578,7 @@ class MonitoringPlugin(Plugin): self.logger.info("Worker execution completed") def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - config["data_converter"] = pydantic_data_converter - config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return config + return self.next_worker_plugin.configure_replayer(config) @asynccontextmanager async def workflow_replay( @@ -1588,9 +1586,12 @@ class MonitoringPlugin(Plugin): replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: - with set_some_context(): - async with super().workflow_replay(replayer, histories) as results: + self.logger.info("Starting replay execution") + try: + async with self.next_worker_plugin.workflow_replay(replayer, histories) as results: yield results + finally: + self.logger.info("Replay execution completed") # Use the plugin when creating a worker worker = Worker( @@ -1606,69 +1607,64 @@ For plugins that need to work with both clients and workers, you can implement b ```python import temporalio -from contextlib import asynccontextmanager +from contextlib import AbstractAsyncContextManager from typing import AsyncIterator from temporalio.client import Plugin as ClientPlugin, ClientConfig from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult class UnifiedPlugin(ClientPlugin, WorkerPlugin): - def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin: - self.next_client_plugin = next - return self + def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin: + self.next_client_plugin = next + return self - def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin: - self.next_worker_plugin = next - return self + def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin: + self.next_worker_plugin = next + return self - def configure_client(self, config: ClientConfig) -> ClientConfig: - # Client-side customization - config["namespace"] = "unified-namespace" - return super().configure_client(config) - - async def connect_service_client( - self, config: temporalio.service.ConnectConfig - ) -> temporalio.service.ServiceClient: - # Add authentication to the connection - config.api_key = self.api_key - return await self.next_client_plugin.connect_service_client(config) - - def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - # Worker-side customization - config["max_cached_workflows"] = 500 - return super().configure_worker(config) - - async def run_worker(self, worker: Worker) -> None: - print("Starting unified worker") - await super().run_worker(worker) - - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - config["data_converter"] = pydantic_data_converter - config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return config + def configure_client(self, config: ClientConfig) -> ClientConfig: + # Client-side customization + config["data_converter"] = pydantic_data_converter + return super().configure_client(config) - @asynccontextmanager - async def workflow_replay( - self, - replayer: Replayer, - histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: - with set_some_context(): - async with super().workflow_replay(replayer, histories) as results: - yield results + async def connect_service_client( + self, config: temporalio.service.ConnectConfig + ) -> temporalio.service.ServiceClient: + # Add authentication to the connection + config.api_key = self.api_key + return await self.next_client_plugin.connect_service_client(config) + + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + # Worker-side customization + return super().configure_worker(config) + + async def run_worker(self, worker: Worker) -> None: + print("Starting unified worker") + await super().run_worker(worker) + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + config["data_converter"] = pydantic_data_converter + return config + + async def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[temporalio.client.WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return self.next_worker_plugin.workflow_replay(replayer, histories) # Create client with the unified plugin client = await Client.connect( - "localhost:7233", - plugins=[UnifiedPlugin()] + "localhost:7233", + plugins=[UnifiedPlugin()] ) # Worker will automatically inherit the plugin from the client worker = Worker( - client, - task_queue="my-task-queue", - workflows=[MyWorkflow], - activities=[my_activity] + client, + task_queue="my-task-queue", + workflows=[MyWorkflow], + activities=[my_activity] ) ``` diff --git a/temporalio/worker/__init__.py b/temporalio/worker/__init__.py index 6e062afcc..08686dcb3 100644 --- a/temporalio/worker/__init__.py +++ b/temporalio/worker/__init__.py @@ -21,6 +21,7 @@ WorkflowInterceptorClassInput, WorkflowOutboundInterceptor, ) +from ._plugin import Plugin from ._replayer import ( Replayer, ReplayerConfig, @@ -44,7 +45,6 @@ WorkflowSlotInfo, ) from ._worker import ( - Plugin, PollerBehavior, PollerBehaviorAutoscaling, PollerBehaviorSimpleMaximum, diff --git a/temporalio/worker/_plugin.py b/temporalio/worker/_plugin.py new file mode 100644 index 000000000..e880e0e31 --- /dev/null +++ b/temporalio/worker/_plugin.py @@ -0,0 +1,121 @@ +from __future__ import annotations + +import abc +from contextlib import AbstractAsyncContextManager +from typing import TYPE_CHECKING, AsyncIterator + +from temporalio.client import WorkflowHistory + +if TYPE_CHECKING: + from temporalio.worker import ( + Replayer, + ReplayerConfig, + Worker, + WorkerConfig, + WorkflowReplayResult, + ) + + +class Plugin(abc.ABC): + """Base class for worker plugins that can intercept and modify worker behavior. + + Plugins allow customization of worker creation and execution processes + through a chain of responsibility pattern. Each plugin can modify the worker + configuration or intercept worker execution. + + WARNING: This is an experimental feature and may change in the future. + """ + + def name(self) -> str: + """Get the qualified name of this plugin. Can be overridden if desired to provide a more appropriate name. + + Returns: + The fully qualified name of the plugin class (module.classname). + """ + return type(self).__module__ + "." + type(self).__qualname__ + + @abc.abstractmethod + def init_worker_plugin(self, next: Plugin) -> Plugin: + """Initialize this plugin in the plugin chain. + + This method sets up the chain of responsibility pattern by storing a reference + to the next plugin in the chain. It is called during worker creation to build + the plugin chain. + + Args: + next: The next plugin in the chain to delegate to. + + Returns: + This plugin instance for method chaining. + """ + + @abc.abstractmethod + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + """Hook called when creating a worker to allow modification of configuration. + + This method is called during worker creation and allows plugins to modify + the worker configuration before the worker is fully initialized. Plugins + can modify task queue names, adjust concurrency settings, add interceptors, + or change other worker settings. + + Args: + config: The worker configuration dictionary to potentially modify. + + Returns: + The modified worker configuration. + """ + + @abc.abstractmethod + async def run_worker(self, worker: Worker) -> None: + """Hook called when running a worker to allow interception of execution. + + This method is called when the worker is started and allows plugins to + intercept or wrap the worker execution. Plugins can add monitoring, + custom lifecycle management, or other execution-time behavior. + + Args: + worker: The worker instance to run. + """ + + @abc.abstractmethod + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + """Hook called when creating a replayer to allow modification of configuration. + + This should be used to configure anything in ReplayerConfig needed to make execution match + the original. This could include interceptors, DataConverter, workflows, and more. + + Args: + config: The replayer configuration dictionary to potentially modify. + + Returns: + The modified replayer configuration. + """ + + @abc.abstractmethod + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + """Hook called when running a replayer to allow interception of execution.""" + + +class _RootPlugin(Plugin): + def init_worker_plugin(self, next: Plugin) -> Plugin: + raise NotImplementedError() + + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: + return config + + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: + return config + + async def run_worker(self, worker: Worker) -> None: + await worker._run() + + def workflow_replay( + self, + replayer: Replayer, + histories: AsyncIterator[WorkflowHistory], + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: + return replayer._workflow_replay_iterator(histories) diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index d16eebb4c..55fbcdff4 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -21,6 +21,8 @@ from ..common import HeaderCodecBehavior from ._interceptor import Interceptor +from ._plugin import _RootPlugin +from ._worker import load_default_build_id from ._workflow import _WorkflowWorker from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner from .workflow_sandbox import SandboxedWorkflowRunner @@ -81,8 +83,6 @@ def __init__( header_codec_behavior=header_codec_behavior, ) - from ._worker import _RootPlugin - # Apply plugin configuration root_plugin: temporalio.worker.Plugin = _RootPlugin() for plugin in reversed(plugins): @@ -240,8 +240,6 @@ def on_eviction_hook( != HeaderCodecBehavior.NO_CODEC, ) # Create bridge worker - from ._worker import load_default_build_id - bridge_worker, pusher = temporalio.bridge.worker.Worker.for_replay( runtime._core_runtime, temporalio.bridge.worker.WorkerConfig( diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index b7d01fed8..ebee1c2c5 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -2,19 +2,16 @@ from __future__ import annotations -import abc import asyncio import concurrent.futures import hashlib import logging import sys import warnings -from contextlib import AbstractAsyncContextManager from dataclasses import dataclass from datetime import timedelta from typing import ( Any, - AsyncIterator, Awaitable, Callable, List, @@ -38,10 +35,10 @@ WorkerDeploymentVersion, ) -from . import Replayer, ReplayerConfig, WorkflowReplayResult from ._activity import SharedStateManager, _ActivityWorker from ._interceptor import Interceptor from ._nexus import _NexusWorker +from ._plugin import Plugin, _RootPlugin from ._tuning import WorkerTuner from ._workflow import _WorkflowWorker from ._workflow_instance import UnsandboxedWorkflowRunner, WorkflowRunner @@ -92,111 +89,6 @@ def _to_bridge(self) -> temporalio.bridge.worker.PollerBehavior: ] -class Plugin(abc.ABC): - """Base class for worker plugins that can intercept and modify worker behavior. - - Plugins allow customization of worker creation and execution processes - through a chain of responsibility pattern. Each plugin can modify the worker - configuration or intercept worker execution. - - WARNING: This is an experimental feature and may change in the future. - """ - - def name(self) -> str: - """Get the qualified name of this plugin. Can be overridden if desired to provide a more appropriate name. - - Returns: - The fully qualified name of the plugin class (module.classname). - """ - return type(self).__module__ + "." + type(self).__qualname__ - - @abc.abstractmethod - def init_worker_plugin(self, next: Plugin) -> Plugin: - """Initialize this plugin in the plugin chain. - - This method sets up the chain of responsibility pattern by storing a reference - to the next plugin in the chain. It is called during worker creation to build - the plugin chain. - - Args: - next: The next plugin in the chain to delegate to. - - Returns: - This plugin instance for method chaining. - """ - - @abc.abstractmethod - def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - """Hook called when creating a worker to allow modification of configuration. - - This method is called during worker creation and allows plugins to modify - the worker configuration before the worker is fully initialized. Plugins - can modify task queue names, adjust concurrency settings, add interceptors, - or change other worker settings. - - Args: - config: The worker configuration dictionary to potentially modify. - - Returns: - The modified worker configuration. - """ - - @abc.abstractmethod - async def run_worker(self, worker: Worker) -> None: - """Hook called when running a worker to allow interception of execution. - - This method is called when the worker is started and allows plugins to - intercept or wrap the worker execution. Plugins can add monitoring, - custom lifecycle management, or other execution-time behavior. - - Args: - worker: The worker instance to run. - """ - - @abc.abstractmethod - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - """Hook called when creating a replayer to allow modification of configuration. - - This should be used to configure anything in ReplayerConfig needed to make execution match - the original. This could include interceptors, DataConverter, workflows, and more. - - Args: - config: The replayer configuration dictionary to potentially modify. - - Returns: - The modified replayer configuration. - """ - - @abc.abstractmethod - def workflow_replay( - self, - replayer: Replayer, - histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - """Hook called when running a replayer to allow interception of execution.""" - - -class _RootPlugin(Plugin): - def init_worker_plugin(self, next: Plugin) -> Plugin: - raise NotImplementedError() - - def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - return config - - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - return config - - async def run_worker(self, worker: Worker) -> None: - await worker._run() - - def workflow_replay( - self, - replayer: Replayer, - histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return replayer._workflow_replay_iterator(histories) - - class Worker: """Worker to process workflows and/or activities. diff --git a/tests/contrib/openai_agents/test_openai_replay.py b/tests/contrib/openai_agents/test_openai_replay.py index c6ac1ea68..d625343b8 100644 --- a/tests/contrib/openai_agents/test_openai_replay.py +++ b/tests/contrib/openai_agents/test_openai_replay.py @@ -1,10 +1,4 @@ -from contextlib import ( - AbstractAsyncContextManager, - AbstractContextManager, - asynccontextmanager, -) from pathlib import Path -from typing import AsyncGenerator import pytest diff --git a/tests/test_plugins.py b/tests/test_plugins.py index de2533230..e923cbc6a 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -1,7 +1,7 @@ import dataclasses import uuid import warnings -from contextlib import AbstractAsyncContextManager +from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import AsyncIterator, cast import pytest @@ -236,12 +236,16 @@ async def connect_service_client( ) -> temporalio.service.ServiceClient: return await self.next_client_plugin.connect_service_client(config) - def workflow_replay( + @asynccontextmanager + async def workflow_replay( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return self.next_worker_plugin.workflow_replay(replayer, histories) + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: + async with self.next_worker_plugin.workflow_replay( + replayer, histories + ) as result: + yield result @workflow.defn From 7671abe7efe65567529e060610e08eac20797590 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Tue, 5 Aug 2025 09:38:29 -0700 Subject: [PATCH 12/15] Fix readme --- README.md | 36 ++++-------------------------------- 1 file changed, 4 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index fff0cfa7e..813310ac6 100644 --- a/README.md +++ b/README.md @@ -1625,7 +1625,7 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin): def configure_client(self, config: ClientConfig) -> ClientConfig: # Client-side customization config["data_converter"] = pydantic_data_converter - return super().configure_client(config) + return self.next_client_plugin.configure_client(config) async def connect_service_client( self, config: temporalio.service.ConnectConfig @@ -1636,11 +1636,11 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin): def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Worker-side customization - return super().configure_worker(config) + return self.next_worker_plugin.configure_worker(config) async def run_worker(self, worker: Worker) -> None: print("Starting unified worker") - await super().run_worker(worker) + await self.next_worker_plugin.run_worker(worker) def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: config["data_converter"] = pydantic_data_converter @@ -1668,40 +1668,12 @@ worker = Worker( ) ``` - - -```python -class ReplayPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): - def configure_client(self, config: ClientConfig) -> ClientConfig: - config["data_converter"] = pydantic_data_converter - return super().configure_client(config) - - def configure_worker(self, config: WorkerConfig) -> WorkerConfig: - config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return super().configure_worker(config) - - def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: - config["data_converter"] = pydantic_data_converter - config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow] - return config - - @asynccontextmanager - async def workflow_replay( - self, - replayer: Replayer, - histories: AsyncIterator[temporalio.client.WorkflowHistory], - ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: - with set_some_context(): - async with super().workflow_replay(replayer, histories) as results: - yield results -``` - **Important Notes:** - Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility - Client plugins that also implement worker plugin interfaces are automatically propagated to workers - Avoid providing the same plugin to both client and worker to prevent double execution -- Plugin methods should call `super()` to maintain the plugin chain +- Plugin methods should call the plugin provided during initialization to maintain the plugin chain - Each plugin's `name()` method returns a unique identifier for debugging purposes From 602fd4200ebcc484f8ce8e280d032d6c81880171 Mon Sep 17 00:00:00 2001 From: tconley1428 Date: Wed, 6 Aug 2025 13:57:57 -0700 Subject: [PATCH 13/15] Update temporalio/worker/_plugin.py Co-authored-by: Spencer Judge --- temporalio/worker/_plugin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/temporalio/worker/_plugin.py b/temporalio/worker/_plugin.py index e880e0e31..df5592814 100644 --- a/temporalio/worker/_plugin.py +++ b/temporalio/worker/_plugin.py @@ -82,7 +82,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: """Hook called when creating a replayer to allow modification of configuration. This should be used to configure anything in ReplayerConfig needed to make execution match - the original. This could include interceptors, DataConverter, workflows, and more. + the worker and client config. This could include interceptors, DataConverter, workflows, and more. Args: config: The replayer configuration dictionary to potentially modify. From 28545467ec4bc58be0df3f43e7e8cab8f2ddd441 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 6 Aug 2025 13:38:24 -0700 Subject: [PATCH 14/15] Rename run_replayer, remove need to return self from init --- README.md | 22 ++++------ temporalio/client.py | 13 +++--- .../openai_agents/_temporal_openai_agents.py | 14 ++----- temporalio/worker/_plugin.py | 11 ++--- temporalio/worker/_replayer.py | 5 ++- temporalio/worker/_worker.py | 3 +- tests/test_plugins.py | 42 ++++++------------- 7 files changed, 40 insertions(+), 70 deletions(-) diff --git a/README.md b/README.md index 813310ac6..3f6a37ef1 100644 --- a/README.md +++ b/README.md @@ -1516,9 +1516,8 @@ class AuthenticationPlugin(Plugin): def __init__(self, api_key: str): self.api_key = api_key - def init_client_plugin(self, next: Plugin) -> Plugin: + def init_client_plugin(self, next: Plugin) -> None: self.next_client_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: # Modify client configuration @@ -1552,16 +1551,15 @@ Here's an example of a worker plugin that adds custom monitoring: import temporalio from contextlib import asynccontextmanager from typing import AsyncIterator -from temporalio.worker import Plugin, WorkerConfig, Worker, ReplayerConfig, Worker, Replayer, WorkflowReplayResult +from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult import logging class MonitoringPlugin(Plugin): def __init__(self): self.logger = logging.getLogger(__name__) - def init_worker_plugin(self, next: Plugin) -> Plugin: + def init_worker_plugin(self, next: Plugin) -> None: self.next_worker_plugin = next - return self def configure_worker(self, config: WorkerConfig) -> WorkerConfig: # Modify worker configuration @@ -1581,14 +1579,14 @@ class MonitoringPlugin(Plugin): return self.next_worker_plugin.configure_replayer(config) @asynccontextmanager - async def workflow_replay( + async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: self.logger.info("Starting replay execution") try: - async with self.next_worker_plugin.workflow_replay(replayer, histories) as results: + async with self.next_worker_plugin.run_replayer(replayer, histories) as results: yield results finally: self.logger.info("Replay execution completed") @@ -1614,13 +1612,11 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf class UnifiedPlugin(ClientPlugin, WorkerPlugin): - def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin: + def init_client_plugin(self, next: ClientPlugin) -> None: self.next_client_plugin = next - return self - def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin: + def init_worker_plugin(self, next: WorkerPlugin) -> None: self.next_worker_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: # Client-side customization @@ -1646,12 +1642,12 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin): config["data_converter"] = pydantic_data_converter return config - async def workflow_replay( + async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return self.next_worker_plugin.workflow_replay(replayer, histories) + return self.next_worker_plugin.run_replayer(replayer, histories) # Create client with the unified plugin client = await Client.connect( diff --git a/temporalio/client.py b/temporalio/client.py index b75669258..d1c7c4ac9 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -191,7 +191,8 @@ async def connect( root_plugin: Plugin = _RootPlugin() for plugin in reversed(plugins): - root_plugin = plugin.init_client_plugin(root_plugin) + plugin.init_client_plugin(root_plugin) + root_plugin = plugin service_client = await root_plugin.connect_service_client(connect_config) @@ -235,7 +236,8 @@ def __init__( root_plugin: Plugin = _RootPlugin() for plugin in reversed(plugins): - root_plugin = plugin.init_client_plugin(root_plugin) + plugin.init_client_plugin(root_plugin) + root_plugin = plugin self._init_from_config(root_plugin.configure_client(config)) @@ -7399,7 +7401,7 @@ def name(self) -> str: return type(self).__module__ + "." + type(self).__qualname__ @abstractmethod - def init_client_plugin(self, next: Plugin) -> Plugin: + def init_client_plugin(self, next: Plugin) -> None: """Initialize this plugin in the plugin chain. This method sets up the chain of responsibility pattern by storing a reference @@ -7408,9 +7410,6 @@ def init_client_plugin(self, next: Plugin) -> Plugin: Args: next: The next plugin in the chain to delegate to. - - Returns: - This plugin instance for method chaining. """ @abstractmethod @@ -7447,7 +7446,7 @@ async def connect_service_client( class _RootPlugin(Plugin): - def init_client_plugin(self, next: Plugin) -> Plugin: + def init_client_plugin(self, next: Plugin) -> None: raise NotImplementedError() def configure_client(self, config: ClientConfig) -> ClientConfig: diff --git a/temporalio/contrib/openai_agents/_temporal_openai_agents.py b/temporalio/contrib/openai_agents/_temporal_openai_agents.py index 714d4a59a..21defd4a8 100644 --- a/temporalio/contrib/openai_agents/_temporal_openai_agents.py +++ b/temporalio/contrib/openai_agents/_temporal_openai_agents.py @@ -237,12 +237,9 @@ def __init__( self._model_params = model_params self._model_provider = model_provider - def init_client_plugin( - self, next: temporalio.client.Plugin - ) -> temporalio.client.Plugin: + def init_client_plugin(self, next: temporalio.client.Plugin) -> None: """Set the next client plugin""" self.next_client_plugin = next - return self async def connect_service_client( self, config: temporalio.service.ConnectConfig @@ -250,12 +247,9 @@ async def connect_service_client( """No modifications to service client""" return await self.next_client_plugin.connect_service_client(config) - def init_worker_plugin( - self, next: temporalio.worker.Plugin - ) -> temporalio.worker.Plugin: + def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None: """Set the next worker plugin""" self.next_worker_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: """Configure the Temporal client for OpenAI agents integration. @@ -320,14 +314,14 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: return config @asynccontextmanager - async def workflow_replay( + async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: """Set the OpenAI Overrides during replay""" with set_open_ai_agent_temporal_overrides(self._model_params): - async with self.next_worker_plugin.workflow_replay( + async with self.next_worker_plugin.run_replayer( replayer, histories ) as results: yield results diff --git a/temporalio/worker/_plugin.py b/temporalio/worker/_plugin.py index df5592814..19ac3abab 100644 --- a/temporalio/worker/_plugin.py +++ b/temporalio/worker/_plugin.py @@ -35,7 +35,7 @@ def name(self) -> str: return type(self).__module__ + "." + type(self).__qualname__ @abc.abstractmethod - def init_worker_plugin(self, next: Plugin) -> Plugin: + def init_worker_plugin(self, next: Plugin) -> None: """Initialize this plugin in the plugin chain. This method sets up the chain of responsibility pattern by storing a reference @@ -44,9 +44,6 @@ def init_worker_plugin(self, next: Plugin) -> Plugin: Args: next: The next plugin in the chain to delegate to. - - Returns: - This plugin instance for method chaining. """ @abc.abstractmethod @@ -92,7 +89,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: """ @abc.abstractmethod - def workflow_replay( + def run_replayer( self, replayer: Replayer, histories: AsyncIterator[WorkflowHistory], @@ -101,7 +98,7 @@ def workflow_replay( class _RootPlugin(Plugin): - def init_worker_plugin(self, next: Plugin) -> Plugin: + def init_worker_plugin(self, next: Plugin) -> None: raise NotImplementedError() def configure_worker(self, config: WorkerConfig) -> WorkerConfig: @@ -113,7 +110,7 @@ def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: async def run_worker(self, worker: Worker) -> None: await worker._run() - def workflow_replay( + def run_replayer( self, replayer: Replayer, histories: AsyncIterator[WorkflowHistory], diff --git a/temporalio/worker/_replayer.py b/temporalio/worker/_replayer.py index 55fbcdff4..240429bf7 100644 --- a/temporalio/worker/_replayer.py +++ b/temporalio/worker/_replayer.py @@ -86,7 +86,8 @@ def __init__( # Apply plugin configuration root_plugin: temporalio.worker.Plugin = _RootPlugin() for plugin in reversed(plugins): - root_plugin = plugin.init_worker_plugin(root_plugin) + plugin.init_worker_plugin(root_plugin) + root_plugin = plugin self._config = root_plugin.configure_replayer(self._config) self._plugin = root_plugin @@ -175,7 +176,7 @@ def workflow_replay_iterator( An async iterator that returns replayed workflow results as they are replayed. """ - return self._plugin.workflow_replay(self, histories) + return self._plugin.run_replayer(self, histories) @asynccontextmanager async def _workflow_replay_iterator( diff --git a/temporalio/worker/_worker.py b/temporalio/worker/_worker.py index ebee1c2c5..f93848496 100644 --- a/temporalio/worker/_worker.py +++ b/temporalio/worker/_worker.py @@ -374,7 +374,8 @@ def __init__( root_plugin: Plugin = _RootPlugin() for plugin in reversed(plugins): - root_plugin = plugin.init_worker_plugin(root_plugin) + plugin.init_worker_plugin(root_plugin) + root_plugin = plugin config = root_plugin.configure_worker(config) self._plugin = root_plugin diff --git a/tests/test_plugins.py b/tests/test_plugins.py index e923cbc6a..eb08bba2d 100644 --- a/tests/test_plugins.py +++ b/tests/test_plugins.py @@ -37,9 +37,8 @@ class MyClientPlugin(temporalio.client.Plugin): def __init__(self): self.interceptor = TestClientInterceptor() - def init_client_plugin(self, next: Plugin) -> Plugin: + def init_client_plugin(self, next: Plugin) -> None: self.next_client_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: config["namespace"] = "replaced_namespace" @@ -74,17 +73,11 @@ async def test_client_plugin(client: Client, env: WorkflowEnvironment): class MyCombinedPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): - def init_worker_plugin( - self, next: temporalio.worker.Plugin - ) -> temporalio.worker.Plugin: + def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None: self.next_worker_plugin = next - return self - def init_client_plugin( - self, next: temporalio.client.Plugin - ) -> temporalio.client.Plugin: + def init_client_plugin(self, next: temporalio.client.Plugin) -> None: self.next_client_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: return self.next_client_plugin.configure_client(config) @@ -104,20 +97,17 @@ async def run_worker(self, worker: Worker) -> None: def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: return self.next_worker_plugin.configure_replayer(config) - def workflow_replay( + def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return self.next_worker_plugin.workflow_replay(replayer, histories) + return self.next_worker_plugin.run_replayer(replayer, histories) class MyWorkerPlugin(temporalio.worker.Plugin): - def init_worker_plugin( - self, next: temporalio.worker.Plugin - ) -> temporalio.worker.Plugin: + def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None: self.next_worker_plugin = next - return self def configure_worker(self, config: WorkerConfig) -> WorkerConfig: config["task_queue"] = "replaced_queue" @@ -135,12 +125,12 @@ async def run_worker(self, worker: Worker) -> None: def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: return self.next_worker_plugin.configure_replayer(config) - def workflow_replay( + def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: - return self.next_worker_plugin.workflow_replay(replayer, histories) + return self.next_worker_plugin.run_replayer(replayer, histories) async def test_worker_plugin_basic_config(client: Client) -> None: @@ -203,17 +193,11 @@ async def test_worker_sandbox_restrictions(client: Client) -> None: class ReplayCheckPlugin(temporalio.client.Plugin, temporalio.worker.Plugin): - def init_worker_plugin( - self, next: temporalio.worker.Plugin - ) -> temporalio.worker.Plugin: + def init_worker_plugin(self, next: temporalio.worker.Plugin) -> None: self.next_worker_plugin = next - return self - def init_client_plugin( - self, next: temporalio.client.Plugin - ) -> temporalio.client.Plugin: + def init_client_plugin(self, next: temporalio.client.Plugin) -> None: self.next_client_plugin = next - return self def configure_client(self, config: ClientConfig) -> ClientConfig: config["data_converter"] = pydantic_data_converter @@ -237,14 +221,12 @@ async def connect_service_client( return await self.next_client_plugin.connect_service_client(config) @asynccontextmanager - async def workflow_replay( + async def run_replayer( self, replayer: Replayer, histories: AsyncIterator[temporalio.client.WorkflowHistory], ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: - async with self.next_worker_plugin.workflow_replay( - replayer, histories - ) as result: + async with self.next_worker_plugin.run_replayer(replayer, histories) as result: yield result From afa77688a69848e8a5a4f60c44d11db4df0b4ec1 Mon Sep 17 00:00:00 2001 From: Tim Conley Date: Wed, 6 Aug 2025 13:59:48 -0700 Subject: [PATCH 15/15] Docstring change --- temporalio/client.py | 4 +++- temporalio/worker/_plugin.py | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/temporalio/client.py b/temporalio/client.py index d1c7c4ac9..6b206a912 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -7404,9 +7404,11 @@ def name(self) -> str: def init_client_plugin(self, next: Plugin) -> None: """Initialize this plugin in the plugin chain. - This method sets up the chain of responsibility pattern by storing a reference + This method sets up the chain of responsibility pattern by providing a reference to the next plugin in the chain. It is called during client creation to build the plugin chain. Note, this may be called twice in the case of :py:meth:`connect`. + Implementations should store this reference and call the corresponding method + of the next plugin on method calls. Args: next: The next plugin in the chain to delegate to. diff --git a/temporalio/worker/_plugin.py b/temporalio/worker/_plugin.py index 19ac3abab..0e696a2dd 100644 --- a/temporalio/worker/_plugin.py +++ b/temporalio/worker/_plugin.py @@ -38,9 +38,10 @@ def name(self) -> str: def init_worker_plugin(self, next: Plugin) -> None: """Initialize this plugin in the plugin chain. - This method sets up the chain of responsibility pattern by storing a reference + This method sets up the chain of responsibility pattern by providing a reference to the next plugin in the chain. It is called during worker creation to build - the plugin chain. + the plugin chain. Implementations should store this reference and call the corresponding method + of the next plugin on method calls. Args: next: The next plugin in the chain to delegate to.