Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 101 additions & 71 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1498,10 +1498,80 @@ configuration, and worker execution. Common customizations may include but are n
3. Workflows
4. Interceptors

**Important Notes:**

- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
- Avoid providing the same plugin to both client and worker to prevent double execution
- Each plugin's `name()` method returns a unique identifier for debugging purposes

#### Usage

Plugins can be provided to both `Client` and `Worker`.

```python
# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[SomePlugin()]
)
```
```python
# Use the plugin when creating a worker
worker = Worker(
client,
plugins=[SomePlugin()]
)
```
In the case of `Client`, any plugins will also be provided to any workers created with that client.
```python
# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[SomePlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
```
#### Plugin Implementations

The easiest way to create your own plugin is to use `SimplePlugin`. This takes a number of possible configurations to produce
a relatively straightforward plugin.

```python
plugin = SimplePlugin(
"MyPlugin",
data_converter=converter,
)
```

It is also possible to subclass `SimplePlugin` for some additional controls. This is what we do for `OpenAIAgentsPlugin`.

```python
class MediumPlugin(SimplePlugin):
def __init__(self):
super().__init__("MediumPlugin", data_converter=pydantic_data_converter)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
config = super().configure_worker(config)
config["task_queue"] = "override"
return config
```

#### Advanced Plugin Implementations

`SimplePlugin` doesn't cover all possible uses of plugins. For more unusual use cases, an implementor can implement
the underlying plugin interfaces directly.

A single plugin class can implement both client and worker plugin interfaces to share common logic between both
contexts. When used with a client, it will automatically be propagated to any workers created with that client.

#### Client Plugins
##### Client Plugins

Client plugins can intercept and modify client configuration and service connections. They are useful for adding
authentication, modifying connection parameters, or adding custom behavior during client creation.
Expand All @@ -1516,29 +1586,21 @@ class AuthenticationPlugin(Plugin):
def __init__(self, api_key: str):
self.api_key = api_key

def init_client_plugin(self, next: Plugin) -> None:
self.next_client_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Modify client configuration
config["namespace"] = "my-secure-namespace"
return self.next_client_plugin.configure_client(config)
return config

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
self,
config: temporalio.service.ConnectConfig,
next: Callable[[ConnectConfig], Awaitable[ServiceClient]]
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)

# Use the plugin when connecting
client = await Client.connect(
"my-server.com:7233",
plugins=[AuthenticationPlugin("my-api-key")]
)
return await next(config)
```

#### Worker Plugins
##### Worker Plugins

Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring,
custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay.
Expand All @@ -1558,47 +1620,39 @@ class MonitoringPlugin(Plugin):
def __init__(self):
self.logger = logging.getLogger(__name__)

def init_worker_plugin(self, next: Plugin) -> None:
self.next_worker_plugin = next

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Modify worker configuration
original_task_queue = config["task_queue"]
config["task_queue"] = f"monitored-{original_task_queue}"
self.logger.info(f"Worker created for task queue: {config['task_queue']}")
return self.next_worker_plugin.configure_worker(config)
return config

async def run_worker(self, worker: Worker) -> None:
async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None:
self.logger.info("Starting worker execution")
try:
await self.next_worker_plugin.run_worker(worker)
await next(worker)
finally:
self.logger.info("Worker execution completed")

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return self.next_worker_plugin.configure_replayer(config)
return config

@asynccontextmanager
async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
]
) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]:
self.logger.info("Starting replay execution")
try:
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
async with self.next_worker_plugin.run_replayer(replayer, histories) as results:
yield results
finally:
self.logger.info("Replay execution completed")

# Use the plugin when creating a worker
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity],
plugins=[MonitoringPlugin()]
)
```

For plugins that need to work with both clients and workers, you can implement both interfaces in a single class:
Expand All @@ -1612,67 +1666,43 @@ from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConf


class UnifiedPlugin(ClientPlugin, WorkerPlugin):
def init_client_plugin(self, next: ClientPlugin) -> None:
self.next_client_plugin = next

def init_worker_plugin(self, next: WorkerPlugin) -> None:
self.next_worker_plugin = next

def configure_client(self, config: ClientConfig) -> ClientConfig:
# Client-side customization
config["data_converter"] = pydantic_data_converter
return self.next_client_plugin.configure_client(config)
return config

async def connect_service_client(
self, config: temporalio.service.ConnectConfig
self,
config: temporalio.service.ConnectConfig,
next: Callable[[ConnectConfig], Awaitable[ServiceClient]]
) -> temporalio.service.ServiceClient:
# Add authentication to the connection
config.api_key = self.api_key
return await self.next_client_plugin.connect_service_client(config)
return await next(config)

def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
# Worker-side customization
return self.next_worker_plugin.configure_worker(config)
return config

async def run_worker(self, worker: Worker) -> None:
async def run_worker(self, worker: Worker, next: Callable[[Worker], Awaitable[None]]) -> None:
print("Starting unified worker")
await self.next_worker_plugin.run_worker(worker)
await next(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
config["data_converter"] = pydantic_data_converter
return self.next_worker_plugin.configure_replayer(config)
return config

async def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
]
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return self.next_worker_plugin.run_replayer(replayer, histories)

# Create client with the unified plugin
client = await Client.connect(
"localhost:7233",
plugins=[UnifiedPlugin()]
)

# Worker will automatically inherit the plugin from the client
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[MyWorkflow],
activities=[my_activity]
)
return next(replayer, histories)
```

**Important Notes:**

- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility
- Client plugins that also implement worker plugin interfaces are automatically propagated to workers
- Avoid providing the same plugin to both client and worker to prevent double execution
- Plugin methods should call the plugin provided during initialization to maintain the plugin chain
- Each plugin's `name()` method returns a unique identifier for debugging purposes


### Workflow Replay

Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
Expand Down
Loading