Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
349 changes: 349 additions & 0 deletions docs/develop/plugins-guide.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
---
id: plugins-guide
title: Plugins Guide
sidebar_label: Plugins guide
description: Best practices for creating plugins for AI Agents
toc_max_heading_level: 4
hide_table_of_contents: true
keywords:
- ai agents
- best practices
tags:
- Best Practices
- AI Agents
---

import SdkTabs from '@site/src/components';

# Plugins

A **Plugin** is an abstraction that will let your users add your feature to their Workflows in one line. It makes it seamless for platform developers to add their own custom functionality to many Workflows. Using plugins, you can build reusable open source libraries or build add-ons for engineers at your company.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe re-word this to be more generic than adding stuff to workflows, since you can do more than that?


Here are some common use cases for plugins:

- AI Agent SDKs
- Observability, tracing, or logging middleware
- Adding reliable built-in functionality such as LLM calls, corporate messaging, and payments infrastructure
- Encryption or compliance middleware

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's frame the guide:

This guide will

  • introduce you to the mechanics of creating plugins
  • provide general advice for platform engineers in harnessing and maintaining Temporal's primitives such as Workflows and Activities.

## How to build a Plugin

The recommended way to start building plugins is with a `SimplePlugin`. This abstraction will tackle the vast majority of plugins people want to write.

For advanced use cases, you can extend the methods in lower-level classes that Simple Plugin is based on without re-implementing what you’ve done. See the [Advanced Topics section](#advanced-topics-for-plugins) for more information.

### Example Plugins

If you prefer to learn by getting hands-on with code, check out some existing plugins.

- Temporal's Python SDK ships with an [OpenAI Agents SDK](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/openai_agents) plugin

## What you can provide to users in a plugin

There are a number of features you can give your users with a plugin. Here's a short list of some of the things you can do.

- [Built-in Activities](#built-in-activity)
- [Workflow libraries](#workflow-libraries)
- [Built-in Workflows](#built-in-workflows)
- [Built-in Nexus Operations](#built-in-nexus-operations)
- [Custom Data Converters](#custom-data-converters)
- [Interceptors](#interceptors)

### Built-in Activity

You can provide built-in Activities in a Plugin for users to call from their Workflows. Activities are the most common Temporal primitive and should be scoped to:

- Single write operations
- Batches of similar writes
- One or more read operations followed by a write operation
- A read that should be memoized, like an LLM call, a large download, or a slow-polling read
Comment on lines +56 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reads awkwardly to me. Each line sort of contradicts the last. Presumably we have advice elsewhere in the docs we can link to about what an activity should be scoped to?

This could be just "Activities should be scoped to a small, ideally idempotent, unit of work" or something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we have advice elsewhere in the docs we can link to about what an activity should be scoped to?

We actually don't AFAICT! I'd love to link out to the general activity docs, but I figured, let's get the content in here for starters and then refactor next.

Let me suggest a rewording.

re: idempotent, we have to be careful there since LLMs are not idempotent by some definitions.


Larger pieces of functionality should be broken it up into multiple activities. This makes it easier to do failure recovery, have short timeouts, and be idempotent.

Here are some best practices you can use when you are making Activity plugins:

- Activity arguments and return values must be serializable.
- Activities that perform writes should be idempotent.
- Activities have [timeouts](https://docs.temporal.io/develop/python/failure-detection#heartbeat-timeout) and [retry policies](https://docs.temporal.io/encyclopedia/retry-policies). To be Activity-friendly, your operation should either complete within a few minutes or it should support the ability to heartbeat or poll for a result. This way it will be clear to the Workflow when the Activity is still making progress.
- You need to specify at least one timeout, typically the [start_to_close_timeout](https://docs.temporal.io/encyclopedia/detecting-activity-failures#start-to-close-timeout). Keep in mind that the shorter the timeout, the faster Temporal will retry upon failure. See the [timeouts and retry policies](#timeouts-and-retry-policies) to learn more.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely seems like we should link to general advice on activities somewhere in this section at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but we don't actually have almost any of this advice anywhere that I could find. I've opened a task as a follow-up to figure out where to put it.


#### Timeouts and retry policies

Temporal's Activity retry mechanism gives applications the benefits of durable execution.
For example, Temporal will keep track of the exponential backoff delay even if the Worker crashes. Since Temporal can’t tell when a Worker crashes, Workflows rely on the [start_to_close_timeout](https://docs.temporal.io/encyclopedia/detecting-activity-failures#start-to-close-timeout) to know how long to wait before assuming that an Activity is inactive.

Be cautious when doing retries within your Activity because it lengthens the needed Activity timeout. Such internal retries also prevent users from counting failure metrics and make it harder for users to debug in Temporal UI when something is wrong.

Follow the example for your SDK below:

<SdkTabs>
<SdkTabs.Python>
```python
@activity.defn
async def some_activity() -> None:
return None

plugin = SimplePlugin(
activities = [some_activity]
)
```
</SdkTabs.Python>
</SdkTabs>

### Workflow libraries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear what workflow "library" means here. Simply stating that it's some code you define on the plugin that users can call from their workflows would help. Or maybe a small code example too.


You can provide a library with functionality for use within a Workflow. Your library will call elements you include in your Plugin: Activities, Child Workflows, Signals, Updates, Queries, Nexus Operations, Interceptors, Data Converters, and any other code as long as it follows these requirements:

- It should be [deterministic](https://docs.temporal.io/workflows#deterministic-constraints), running the same way every time it’s executed. Non-deterministic code should go in Activities or Nexus Operations.
- It should be used in the Python [sandbox](https://docs.temporal.io/develop/python/python-sdk-sandbox).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is python-specific. Could we maybe do this in a tab? Or otherwise reformat into an SDK-specific section or something?

s/used/usable/

- It should be designed to handle being restarted, resumed, or executed in a different process from where it originally began without losing correctness or state consistency.
- It should run quickly since it may be replayed many times during a long Workflow execution. More expensive code should go in Activities or Nexus Operations.

A Plugin should allow a user to decompose their Workflows into Activities, as well as Child Workflows and Nexus Calls when needed. This gives users granular control through retries and timeouts, debuggability through the Temporal UI, operability with resets, pauses, and cancels, memoization for efficiency and resumability, and scalability using task queues and Workers.

Users use Workflows for:

- Orchestration and decision-making
- Interactivity via [message-passing](https://docs.temporal.io/evaluate/development-production-features/workflow-message-passing)
- Tracing and observability.

#### Making changes to your Workflow Library

Your users may want to keep their Workflows running across deployments of their Worker code. If their deployment includes a new version of your Plugin, changes to your Plugin could break Workflow code that started before the new version was deployed. This can be due to [non-deterministic behavior from code changes](https://docs.temporal.io/workflow-definition#non-deterministic-change) in your Plugin.

Therefore, it's recommended that you set up [replay testing](https://docs.temporal.io/develop/python/testing-suite#replay) to make sure that you’re not causing non-determinism errors for your users. If you make substantive changes, you need to use [patching](https://docs.temporal.io/patching).

#### Example of a Workflow library that uses a Plugin in Python

- [Implementation of the `OpenAIAgentsPlugin`](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/openai_agents)
- [Example of replay testing](https://github.com/temporalio/sdk-python/blob/main/tests/contrib/openai_agents/test_openai_replay.py)

### Built-in Workflows

You can provide a built-in Workflow in a `SimplePlugin`. It’s callable as a Child Workflow or standalone. When you want to provide a piece of functionality that's more complex than an Activity, you can:

- Use a [Workflow Library](#workflow-libraries) that runs directly in the end user’s Workflow
- Add a Child Workflow

Consider adding a Child Workflow when one or more of these conditions applies:

- That child should outlive the parent.
- The Workflow Event History would otherwise [not scale](https://docs.temporal.io/workflow-execution/event#event-history-limits) in parent Workflows.
- When you want a separate Workflow ID for the child so that it can be operated independently of the parent's state (canceled, terminated, paused).

Any Workflow can be run as a standalone Workflow or as a Child Workflow, so registering a Child Workflow in a `SimplePlugin` is the same as registering any Workflow.

Follow the example for your SDK below:

<SdkTabs>
<SdkTabs.Python>
```python
@workflow.defn
class HelloWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"

plugin = SimplePlugin(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tconley1428 my initial reaction was that this is unrealistic, since plugins are generally designed to be reused. I'd expect people to create a plugin class that declares the Workfows rather than make a global variable for people to use. If you agree, can you help update the code, here and in the other examples?

workflows = [HelloWorkflow]
)

...

client = await Client.connect(
"localhost:7233",
plugins=[
plugin,
],
)
async with Worker(
client,
task_queue="task-queue",
):
client.execute_workflow(
HelloWorkflow.run,
"Tim",
task_queue=worker.task_queue,
)
```
</SdkTabs.Python>
</SdkTabs>

### Built-in Nexus Operations

Nexus calls are used from Workflows similar to Activities and you can check out some common [Nexus Use Cases](https://docs.temporal.io/nexus/use-cases). Like Activities, Nexus Call arguments and return values must be serializable.

To register Nexus handlers in Workflows, follow the example for your SDK below:

<SdkTabs>
<SdkTabs.Python>
```python
@nexusrpc.service
class WeatherService:
get_weather_nexus_operation: nexusrpc.Operation[WeatherInput, Weather]

@nexusrpc.handler.service_handler(service=WeatherService)
class WeatherServiceHandler:
@nexusrpc.handler.sync_operation
async def get_weather_nexus_operation(
self, ctx: nexusrpc.handler.StartOperationContext, input: WeatherInput
) -> Weather:
return Weather(
city=input.city, temperature_range="14-20C", conditions="Sunny with wind."
)

plugin = SimplePlugin(
nexus_service_handlers = [WeatherServiceHandler()]
)
```
</SdkTabs.Python>
</SdkTabs>

### Custom Data Converters

A [Custom Data Converter](https://docs.temporal.io/default-custom-data-converters#custom-data-converter) can alter data formats or provide compression or encryption.

Note that you can use an existing Data Converter such as, in python, `PydanticPayloadConverter` in your Plugin.

To add a Custom Data Converter to a Plugin, follow the example for your SDK below:

<SdkTabs>
<SdkTabs.Python>
```python
def add_converter(converter: Optional[DataConverter]) -> DataConverter
if converter is None or converter == temporalio.converter.DataConverter.default
return pydantic_data_converter
# Should consider interactions with other plugins,
# as this will override the data converter.
# This may mean failing, warning, or something else
Comment on lines +216 to +218
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tconley1428

  • This doesn't quite match what we did for OpenAI -- please double check that this is our best up-to-date advice.
  • I can't find it, but I remember seeing a snippet where we pull the user's existing codecs into the payload. Should we do something like that here?

return converter

plugin = SimplePlugin(
data_converter = add_converter
)
```
</SdkTabs.Python>
</SdkTabs>

### Interceptors

Interceptors are middleware that can run before and after various calls such as Activities, Workflows, and Signals. You can [learn more about interceptors](https://docs.temporal.io/develop/python/interceptors) for the details of implementing them. They're used to:

- Create side effects such as logging and tracing.
- Modify arguments, such as adding headers for authorization or tracing propagation.

To add one to a Plugin, follow the example for your SDK below::

<SdkTabs>
<SdkTabs.Python>
```python
class SomeWorkerInterceptor(
temporalio.worker.Interceptor
):
pass # Your implementation

plugin = SimplePlugin(
worker_interceptors = [SomeWorkerInterceptor()]
)
```
</SdkTabs.Python>
</SdkTabs>

## Advanced Topics for Plugins

If you go deeper into `SimplePlugin`, you'll see it aggregates a pair of raw Plugin classes that you can use for a higher level of flexibility: a Worker Plugin and a client Plugin.

- Worker Plugins contain functionality that runs inside your users’ Workflows.
- Client Plugins contain functionality that runs when Workflows are created and return results.

If your Plugin implements both of them, registering it in the client will also register it in Workers created with that client.

### Client Plugin

Client Plugins are provided to the Temporal client on creation. They can change client configurations and service client configurations. `ClientConfig` contains settings like client Interceptors and DataConverters. `ConnectConfig` configures the actual network connections to the local or cloud Temporal server with values like an API key. This is the basic implementation of a client Plugin:

<SdkTabs>
<SdkTabs.Python>
```python
class MyAdvancedClientPlugin(temporalio.client.Plugin):

def configure_client(self, config: ClientConfig) -> ClientConfig:
return config

async def connect_service_client(
self,
config: ConnectConfig,
next: Callable[[ConnectConfig], Awaitable[ServiceClient]],
) -> temporalio.service.ServiceClient:
return await next(config)
```
</SdkTabs.Python>
</SdkTabs>

The primary use case for integrations so far is setting a `DataConverter`, like in the [Data Converter example](#custom-data-converters).

### Worker Plugin

Worker Plugins are provided at Worker creation and have more capabilities and corresponding implementation than client Plugins. They can change Worker configurations, run code during the Worker lifetime, and manage the Replayer in a similar way. You can learn more about the [Replayer](#replayer) in a later section.

Similar to `configure_client` above, you implement `configure_worker` and `configure_replayer` to change any necessary configurations. In addition, `run_worker` allows you to execute code before and after the Worker runs. This can be used to set up resources or globals for use during the Worker execution. `run_replayer` does the same for the Replayer, but keep in mind that the Replayer has a more complex return type. This is a basic implementation of a Worker plugin:

<SdkTabs>
<SdkTabs.Python>
```python
class MyAdvancedWorkerPlugin(temporalio.worker.Plugin):
def configure_worker(self, config: WorkerConfig) -> WorkerConfig:
return config

async def run_worker(
self, worker: Worker, next: Callable[[Worker], Awaitable[None]]
) -> None:
next(worker)

def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return config

def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return next(replayer, histories)
```
</SdkTabs.Python>
</SdkTabs>

### Replayer

The Replayer allows Workflow authors to validate that their Workflows will work after changes to either the Workflow or a library they depend on. It’s normally used in test runs or when testing Workers before they roll out in production.

The Replayer runs on a Workflow History created by a previous Workflow run. Suppose something in the Workflow or underlying code has changed in a way which could potentially cause a non-determinism error. In that case, the Replayer will notice the change in the way it runs compared to the history provided.

The Replayer is typically configured identically to the Worker and client. Ff you’re using `SimplePlugin`, this is already handled for you.

If you need to do something custom for the Replayer, you can configure it directly:

<SdkTabs>
<SdkTabs.Python>
```python
class MyAdvancedWorkerPlugin(temporalio.worker.Plugin):
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig:
return config

def run_replayer(
self,
replayer: Replayer,
histories: AsyncIterator[temporalio.client.WorkflowHistory],
next: Callable[
[Replayer, AsyncIterator[WorkflowHistory]],
AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]],
],
) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]:
return next(replayer, histories)
```
</SdkTabs.Python>
</SdkTabs>
1 change: 1 addition & 0 deletions sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ module.exports = {
'develop/activity-retry-simulator',
'develop/worker-performance',
'develop/safe-deployments',
'develop/plugins-guide',
],
},
{
Expand Down