Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 97 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1515,18 +1515,22 @@ import temporalio.service
class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> Plugin:
self.next_client_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return super().configure_client(config)
return self.next_client_plugin.configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await super().connect_service_client(config)
return await self.next_client_plugin.connect_service_client(config)

# Use the plugin when connecting
client = await Client.connect(
Expand All @@ -1538,31 +1542,55 @@ client = await Client.connect(
#### Worker Plugins

Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings.
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
They should do this in the case that they modified the worker in a way which would also need to be present
for replay to function. For instance, changing the data converter or adding workflows.

Here's an example of a worker plugin that adds custom monitoring:

```python
from temporalio.worker import Plugin, WorkerConfig, Worker
import temporalio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from temporalio.worker import Plugin, WorkerConfig, Worker, ReplayerConfig, Worker, Replayer, WorkflowReplayResult
import logging

class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: Plugin) -> Plugin:
self.next_worker_plugin = next
return self

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return super().configure_worker(config)
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
self.logger.info("Starting worker execution")
try:
await super().run_worker(worker)
await self.next_worker_plugin.run_worker(worker)
finally:
self.logger.info("Worker execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
return config

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
with set_some_context():
async with super().workflow_replay(replayer, histories) as results:
yield results

# Use the plugin when creating a worker
worker = Worker(
Expand All @@ -1577,16 +1605,34 @@ worker = Worker(
For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:

```python
import temporalio
from contextlib import asynccontextmanager
from typing import AsyncIterator
from temporalio.client import Plugin as ClientPlugin, ClientConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig
from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def init_client_plugin(self, next: ClientPlugin) -> ClientPlugin:
self.next_client_plugin = next
return self

def init_worker_plugin(self, next: WorkerPlugin) -> WorkerPlugin:
self.next_worker_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["namespace"] = "unified-namespace"
return super().configure_client(config)

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
config["max_cached_workflows"] = 500
Expand All @@ -1595,8 +1641,22 @@ class UnifiedPlugin(ClientPlugin, WorkerPlugin):
async def run_worker(self, worker: Worker) -> None:
print("Starting unified worker")
await super().run_worker(worker)



def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
return config

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
with set_some_context():
async with super().workflow_replay(replayer, histories) as results:
yield results

# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
Expand All @@ -1612,6 +1672,34 @@ worker = Worker(
)
```



```python
class ReplayPlugin(temporalio.client.Plugin, temporalio.worker.Plugin):
def configure_client(self, config: ClientConfig) -> ClientConfig:
config["data_converter"] = pydantic_data_converter
return super().configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
return super().configure_worker(config)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
config["workflows"] = list(config.get("workflows") or []) + [HelloWorkflow]
return config

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
with set_some_context():
async with super().workflow_replay(replayer, histories) as results:
yield results
```

**Important Notes:**

- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
Expand Down
10 changes: 6 additions & 4 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7398,6 +7398,7 @@ def name(self) -> str:
"""
return type(self).__module__ + "." + type(self).__qualname__

@abstractmethod
def init_client_plugin(self, next: Plugin) -> Plugin:
"""Initialize this plugin in the plugin chain.

Expand All @@ -7411,9 +7412,8 @@ def init_client_plugin(self, next: Plugin) -> Plugin:
Returns:
This plugin instance for method chaining.
"""
self.next_client_plugin = next
return self

@abstractmethod
def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Hook called when creating a client to allow modification of configuration.

Expand All @@ -7427,8 +7427,8 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
Returns:
The modified client configuration.
"""
return self.next_client_plugin.configure_client(config)

@abstractmethod
async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
Expand All @@ -7444,10 +7444,12 @@ async def connect_service_client(
Returns:
The connected service client.
"""
return await self.next_client_plugin.connect_service_client(config)


class _RootPlugin(Plugin):
def init_client_plugin(self, next: Plugin) -> Plugin:
raise NotImplementedError()

def configure_client(self, config: ClientConfig) -> ClientConfig:
return config

Expand Down
61 changes: 55 additions & 6 deletions temporalio/contrib/openai_agents/_temporal_openai_agents.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Initialize Temporal OpenAI Agents overrides."""

from contextlib import contextmanager
from contextlib import asynccontextmanager, contextmanager
from datetime import timedelta
from typing import AsyncIterator, Callable, Optional, Union

Expand All @@ -24,7 +24,7 @@

import temporalio.client
import temporalio.worker
from temporalio.client import ClientConfig
from temporalio.client import ClientConfig, Plugin
from temporalio.contrib.openai_agents._invoke_model_activity import ModelActivity
from temporalio.contrib.openai_agents._model_parameters import ModelActivityParameters
from temporalio.contrib.openai_agents._openai_runner import TemporalOpenAIRunner
Expand All @@ -41,7 +41,13 @@
from temporalio.converter import (
DataConverter,
)
from temporalio.worker import Worker, WorkerConfig
from temporalio.worker import (
Replayer,
ReplayerConfig,
Worker,
WorkerConfig,
WorkflowReplayResult,
)


@contextmanager
Expand Down Expand Up @@ -231,6 +237,26 @@ def __init__(
self._model_params = model_params
self._model_provider = model_provider

def init_client_plugin(
self, next: temporalio.client.Plugin
) -> temporalio.client.Plugin:
"""Set the next client plugin"""
self.next_client_plugin = next
return self

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
) -> temporalio.service.ServiceClient:
"""No modifications to service client"""
return await self.next_client_plugin.connect_service_client(config)

def init_worker_plugin(
self, next: temporalio.worker.Plugin
) -> temporalio.worker.Plugin:
"""Set the next worker plugin"""
self.next_worker_plugin = next
return self

def configure_client(self, config: ClientConfig) -> ClientConfig:
"""Configure the Temporal client for OpenAI agents integration.

Expand All @@ -246,7 +272,7 @@ def configure_client(self, config: ClientConfig) -> ClientConfig:
config["data_converter"] = DataConverter(
payload_converter_class=_OpenAIPayloadConverter
)
return super().configure_client(config)
return self.next_client_plugin.configure_client(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
"""Configure the Temporal worker for OpenAI agents integration.
Expand All @@ -268,7 +294,7 @@ def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
config["activities"] = list(config.get("activities") or []) + [
ModelActivity(self._model_provider).invoke_model_activity
]
return super().configure_worker(config)
return self.next_worker_plugin.configure_worker(config)

async def run_worker(self, worker: Worker) -> None:
"""Run the worker with OpenAI agents temporal overrides.
Expand All @@ -281,4 +307,27 @@ async def run_worker(self, worker: Worker) -> None:
worker: The worker instance to run.
"""
with set_open_ai_agent_temporal_overrides(self._model_params):
await super().run_worker(worker)
await self.next_worker_plugin.run_worker(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
"""Configure the replayer for OpenAI Agents."""
config["interceptors"] = list(config.get("interceptors") or []) + [

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very surprising footgun and introduces coupling, and requires that users understand a concept (Replayer) that few are familiar with. It feels like a very large chance that users will mess this up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is why we chose to require that plugin authors configure replayers and therefore become familiar with them. Regular users don't need to become familiar with replayers, but plugin authors do to ensure their plugin works right in replayer scenarios and not just client and worker scenarios.

OpenAIAgentsTracingInterceptor()
]
config["data_converter"] = DataConverter(
payload_converter_class=_OpenAIPayloadConverter
)
return config

@asynccontextmanager
async def workflow_replay(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
"""Set the OpenAI Overrides during replay"""
with set_open_ai_agent_temporal_overrides(self._model_params):
async with self.next_worker_plugin.workflow_replay(
replayer, histories
) as results:
yield results
Loading
Loading