|
4 | 4 | [](https://pypi.org/project/temporalio) |
5 | 5 | [](LICENSE) |
6 | 6 |
|
| 7 | +**📣 News: Integration between OpenAI Agents SDK and Temporal is now in public preview. [Learn more](temporalio/contrib/openai_agents/README.md).** |
| 8 | + |
7 | 9 | [Temporal](https://temporal.io/) is a distributed, scalable, durable, and highly available orchestration engine used to |
8 | 10 | execute asynchronous, long-running business logic in a scalable and resilient way. |
9 | 11 |
|
@@ -94,7 +96,11 @@ informal introduction to the features and their implementation. |
94 | 96 | - [Heartbeating and Cancellation](#heartbeating-and-cancellation) |
95 | 97 | - [Worker Shutdown](#worker-shutdown) |
96 | 98 | - [Testing](#testing-1) |
| 99 | + - [Interceptors](#interceptors) |
97 | 100 | - [Nexus](#nexus) |
| 101 | + - [Plugins](#plugins) |
| 102 | + - [Client Plugins](#client-plugins) |
| 103 | + - [Worker Plugins](#worker-plugins) |
98 | 104 | - [Workflow Replay](#workflow-replay) |
99 | 105 | - [Observability](#observability) |
100 | 106 | - [Metrics](#metrics) |
@@ -1256,6 +1262,7 @@ calls in the `temporalio.activity` package make use of it. Specifically: |
1256 | 1262 |
|
1257 | 1263 | * `in_activity()` - Whether an activity context is present |
1258 | 1264 | * `info()` - Returns the immutable info of the currently running activity |
| 1265 | +* `client()` - Returns the Temporal client used by this worker. Only available in `async def` activities. |
1259 | 1266 | * `heartbeat(*details)` - Record a heartbeat |
1260 | 1267 | * `is_cancelled()` - Whether a cancellation has been requested on this activity |
1261 | 1268 | * `wait_for_cancelled()` - `async` call to wait for cancellation request |
@@ -1310,6 +1317,70 @@ affect calls activity code might make to functions on the `temporalio.activity` |
1310 | 1317 | * `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity |
1311 | 1318 |
|
1312 | 1319 |
|
| 1320 | +### Interceptors |
| 1321 | + |
| 1322 | +The behavior of the SDK can be customized in many useful ways by modifying inbound and outbound calls using |
| 1323 | +interceptors. This is similar to the use of middleware in other frameworks. |
| 1324 | + |
| 1325 | +There are five categories of inbound and outbound calls that you can modify in this way: |
| 1326 | + |
| 1327 | +1. Outbound client calls, such as `start_workflow()`, `signal_workflow()`, `list_workflows()`, `update_schedule()`, etc. |
| 1328 | + |
| 1329 | +2. Inbound workflow calls: `execute_workflow()`, `handle_signal()`, `handle_update_handler()`, etc |
| 1330 | + |
| 1331 | +3. Outbound workflow calls: `start_activity()`, `start_child_workflow()`, `start_nexus_operation()`, etc |
| 1332 | + |
| 1333 | +4. Inbound call to execute an activity: `execute_activity()` |
| 1334 | + |
| 1335 | +5. Outbound activity calls: `info()` and `heartbeat()` |
| 1336 | + |
| 1337 | + |
| 1338 | +To modify outbound client calls, define a class inheriting from |
| 1339 | +[`client.Interceptor`](https://python.temporal.io/temporalio.client.Interceptor.html), and implement the method |
| 1340 | +`intercept_client()` to return an instance of |
| 1341 | +[`OutboundInterceptor`](https://python.temporal.io/temporalio.client.OutboundInterceptor.html) that implements the |
| 1342 | +subset of outbound client calls that you wish to modify. |
| 1343 | + |
| 1344 | +Then, pass a list containing an instance of your `client.Interceptor` class as the |
| 1345 | +`interceptors` argument of [`Client.connect()`](https://python.temporal.io/temporalio.client.Client.html#connect). |
| 1346 | + |
| 1347 | +The purpose of the interceptor framework is that the methods you implement on your interceptor classes can perform |
| 1348 | +arbitrary side effects and/or arbitrary modifications to the data, before it is received by the SDK's "real" |
| 1349 | +implementation. The `interceptors` list can contain multiple interceptors. In this case they form a chain: a method |
| 1350 | +implemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on |
| 1351 | +to the corresponding method on the next interceptor in the list. Your interceptor classes need not implement every |
| 1352 | +method; the default implementation is always to pass the data on to the next method in the interceptor chain. |
| 1353 | + |
| 1354 | +The remaining four categories are worker calls. To modify these, define a class inheriting from |
| 1355 | +[`worker.Interceptor`](https://python.temporal.io/temporalio.worker.Interceptor.html) and implement methods on that |
| 1356 | +class to define the |
| 1357 | +[`ActivityInboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityInboundInterceptor.html), |
| 1358 | +[`ActivityOutboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityOutboundInterceptor.html), |
| 1359 | +[`WorkflowInboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowInboundInterceptor.html), and |
| 1360 | +[`WorkflowOutboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowOutboundInterceptor.html) classes |
| 1361 | +that you wish to use to effect your modifications. Then, pass a list containing an instance of your `worker.Interceptor` |
| 1362 | +class as the `interceptors` argument of the [`Worker()`](https://python.temporal.io/temporalio.worker.Worker.html) |
| 1363 | +constructor. |
| 1364 | + |
| 1365 | +It often happens that your worker and client interceptors will share code because they implement closely related logic. |
| 1366 | +For convenience, you can create an interceptor class that inherits from _both_ `client.Interceptor` and |
| 1367 | +`worker.Interceptor` (their method sets do not overlap). You can then pass this in the `interceptors` argument of |
| 1368 | +`Client.connect()` when starting your worker _as well as_ in your client/starter code. If you do this, your worker will |
| 1369 | +automatically pick up the interceptors from its underlying client (and you should not pass them directly to the |
| 1370 | +`Worker()` constructor). |
| 1371 | + |
| 1372 | +This is best explained by example. The [Context Propagation Interceptor |
| 1373 | +Sample](https://github.com/temporalio/samples-python/tree/main/context_propagation) is a good starting point. In |
| 1374 | +[context_propagation/interceptor.py](https://github.com/temporalio/samples-python/blob/main/context_propagation/interceptor.py) |
| 1375 | +a class is defined that inherits from both `client.Interceptor` and `worker.Interceptor`. It implements the various |
| 1376 | +methods such that the outbound client and workflow calls set a certain key in the outbound `headers` field, and the |
| 1377 | +inbound workflow and activity calls retrieve the header value from the inbound workflow/activity input data. An instance |
| 1378 | +of this interceptor class is passed to `Client.connect()` when [starting the |
| 1379 | +worker](https://github.com/temporalio/samples-python/blob/main/context_propagation/worker.py) and when connecting the |
| 1380 | +client in the [workflow starter |
| 1381 | +code](https://github.com/temporalio/samples-python/blob/main/context_propagation/starter.py). |
| 1382 | + |
| 1383 | + |
1313 | 1384 | ### Nexus |
1314 | 1385 |
|
1315 | 1386 | ⚠️ **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** ⚠️ |
@@ -1416,6 +1487,192 @@ https://github.com/temporalio/samples-python/tree/nexus/hello_nexus). |
1416 | 1487 | ``` |
1417 | 1488 |
|
1418 | 1489 |
|
| 1490 | +### Plugins |
| 1491 | +
|
| 1492 | +Plugins provide a way to extend and customize the behavior of Temporal clients and workers through a chain of |
| 1493 | +responsibility pattern. They allow you to intercept and modify client creation, service connections, worker |
| 1494 | +configuration, and worker execution. Common customizations may include but are not limited to: |
| 1495 | +
|
| 1496 | +1. DataConverter |
| 1497 | +2. Activities |
| 1498 | +3. Workflows |
| 1499 | +4. Interceptors |
| 1500 | +
|
| 1501 | +A single plugin class can implement both client and worker plugin interfaces to share common logic between both |
| 1502 | +contexts. When used with a client, it will automatically be propagated to any workers created with that client. |
| 1503 | +
|
| 1504 | +#### Client Plugins |
| 1505 | +
|
| 1506 | +Client plugins can intercept and modify client configuration and service connections. They are useful for adding |
| 1507 | +authentication, modifying connection parameters, or adding custom behavior during client creation. |
| 1508 | +
|
| 1509 | +Here's an example of a client plugin that adds custom authentication: |
| 1510 | +
|
| 1511 | +```python |
| 1512 | +from temporalio.client import Plugin, ClientConfig |
| 1513 | +import temporalio.service |
| 1514 | +
|
| 1515 | +class AuthenticationPlugin(Plugin): |
| 1516 | + def __init__(self, api_key: str): |
| 1517 | + self.api_key = api_key |
| 1518 | + |
| 1519 | + def init_client_plugin(self, next: Plugin) -> None: |
| 1520 | + self.next_client_plugin = next |
| 1521 | +
|
| 1522 | + def configure_client(self, config: ClientConfig) -> ClientConfig: |
| 1523 | + # Modify client configuration |
| 1524 | + config["namespace"] = "my-secure-namespace" |
| 1525 | + return self.next_client_plugin.configure_client(config) |
| 1526 | +
|
| 1527 | + async def connect_service_client( |
| 1528 | + self, config: temporalio.service.ConnectConfig |
| 1529 | + ) -> temporalio.service.ServiceClient: |
| 1530 | + # Add authentication to the connection |
| 1531 | + config.api_key = self.api_key |
| 1532 | + return await self.next_client_plugin.connect_service_client(config) |
| 1533 | +
|
| 1534 | +# Use the plugin when connecting |
| 1535 | +client = await Client.connect( |
| 1536 | + "my-server.com:7233", |
| 1537 | + plugins=[AuthenticationPlugin("my-api-key")] |
| 1538 | +) |
| 1539 | +``` |
| 1540 | + |
| 1541 | +#### Worker Plugins |
| 1542 | + |
| 1543 | +Worker plugins can modify worker configuration and intercept worker execution. They are useful for adding monitoring, |
| 1544 | +custom lifecycle management, or modifying worker settings. Worker plugins can also configure replay. |
| 1545 | +They should do this in the case that they modified the worker in a way which would also need to be present |
| 1546 | +for replay to function. For instance, changing the data converter or adding workflows. |
| 1547 | + |
| 1548 | +Here's an example of a worker plugin that adds custom monitoring: |
| 1549 | + |
| 1550 | +```python |
| 1551 | +import temporalio |
| 1552 | +from contextlib import asynccontextmanager |
| 1553 | +from typing import AsyncIterator |
| 1554 | +from temporalio.worker import Plugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult |
| 1555 | +import logging |
| 1556 | + |
| 1557 | +class MonitoringPlugin(Plugin): |
| 1558 | + def __init__(self): |
| 1559 | + self.logger = logging.getLogger(__name__) |
| 1560 | + |
| 1561 | + def init_worker_plugin(self, next: Plugin) -> None: |
| 1562 | + self.next_worker_plugin = next |
| 1563 | + |
| 1564 | + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: |
| 1565 | + # Modify worker configuration |
| 1566 | + original_task_queue = config["task_queue"] |
| 1567 | + config["task_queue"] = f"monitored-{original_task_queue}" |
| 1568 | + self.logger.info(f"Worker created for task queue: {config['task_queue']}") |
| 1569 | + return self.next_worker_plugin.configure_worker(config) |
| 1570 | + |
| 1571 | + async def run_worker(self, worker: Worker) -> None: |
| 1572 | + self.logger.info("Starting worker execution") |
| 1573 | + try: |
| 1574 | + await self.next_worker_plugin.run_worker(worker) |
| 1575 | + finally: |
| 1576 | + self.logger.info("Worker execution completed") |
| 1577 | + |
| 1578 | + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: |
| 1579 | + return self.next_worker_plugin.configure_replayer(config) |
| 1580 | + |
| 1581 | + @asynccontextmanager |
| 1582 | + async def run_replayer( |
| 1583 | + self, |
| 1584 | + replayer: Replayer, |
| 1585 | + histories: AsyncIterator[temporalio.client.WorkflowHistory], |
| 1586 | + ) -> AsyncIterator[AsyncIterator[WorkflowReplayResult]]: |
| 1587 | + self.logger.info("Starting replay execution") |
| 1588 | + try: |
| 1589 | + async with self.next_worker_plugin.run_replayer(replayer, histories) as results: |
| 1590 | + yield results |
| 1591 | + finally: |
| 1592 | + self.logger.info("Replay execution completed") |
| 1593 | + |
| 1594 | +# Use the plugin when creating a worker |
| 1595 | +worker = Worker( |
| 1596 | + client, |
| 1597 | + task_queue="my-task-queue", |
| 1598 | + workflows=[MyWorkflow], |
| 1599 | + activities=[my_activity], |
| 1600 | + plugins=[MonitoringPlugin()] |
| 1601 | +) |
| 1602 | +``` |
| 1603 | + |
| 1604 | +For plugins that need to work with both clients and workers, you can implement both interfaces in a single class: |
| 1605 | + |
| 1606 | +```python |
| 1607 | +import temporalio |
| 1608 | +from contextlib import AbstractAsyncContextManager |
| 1609 | +from typing import AsyncIterator |
| 1610 | +from temporalio.client import Plugin as ClientPlugin, ClientConfig |
| 1611 | +from temporalio.worker import Plugin as WorkerPlugin, WorkerConfig, ReplayerConfig, Worker, Replayer, WorkflowReplayResult |
| 1612 | + |
| 1613 | + |
| 1614 | +class UnifiedPlugin(ClientPlugin, WorkerPlugin): |
| 1615 | + def init_client_plugin(self, next: ClientPlugin) -> None: |
| 1616 | + self.next_client_plugin = next |
| 1617 | + |
| 1618 | + def init_worker_plugin(self, next: WorkerPlugin) -> None: |
| 1619 | + self.next_worker_plugin = next |
| 1620 | + |
| 1621 | + def configure_client(self, config: ClientConfig) -> ClientConfig: |
| 1622 | + # Client-side customization |
| 1623 | + config["data_converter"] = pydantic_data_converter |
| 1624 | + return self.next_client_plugin.configure_client(config) |
| 1625 | + |
| 1626 | + async def connect_service_client( |
| 1627 | + self, config: temporalio.service.ConnectConfig |
| 1628 | + ) -> temporalio.service.ServiceClient: |
| 1629 | + # Add authentication to the connection |
| 1630 | + config.api_key = self.api_key |
| 1631 | + return await self.next_client_plugin.connect_service_client(config) |
| 1632 | + |
| 1633 | + def configure_worker(self, config: WorkerConfig) -> WorkerConfig: |
| 1634 | + # Worker-side customization |
| 1635 | + return self.next_worker_plugin.configure_worker(config) |
| 1636 | + |
| 1637 | + async def run_worker(self, worker: Worker) -> None: |
| 1638 | + print("Starting unified worker") |
| 1639 | + await self.next_worker_plugin.run_worker(worker) |
| 1640 | + |
| 1641 | + def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: |
| 1642 | + config["data_converter"] = pydantic_data_converter |
| 1643 | + return config |
| 1644 | + |
| 1645 | + async def run_replayer( |
| 1646 | + self, |
| 1647 | + replayer: Replayer, |
| 1648 | + histories: AsyncIterator[temporalio.client.WorkflowHistory], |
| 1649 | + ) -> AbstractAsyncContextManager[AsyncIterator[WorkflowReplayResult]]: |
| 1650 | + return self.next_worker_plugin.run_replayer(replayer, histories) |
| 1651 | + |
| 1652 | +# Create client with the unified plugin |
| 1653 | +client = await Client.connect( |
| 1654 | + "localhost:7233", |
| 1655 | + plugins=[UnifiedPlugin()] |
| 1656 | +) |
| 1657 | + |
| 1658 | +# Worker will automatically inherit the plugin from the client |
| 1659 | +worker = Worker( |
| 1660 | + client, |
| 1661 | + task_queue="my-task-queue", |
| 1662 | + workflows=[MyWorkflow], |
| 1663 | + activities=[my_activity] |
| 1664 | +) |
| 1665 | +``` |
| 1666 | + |
| 1667 | +**Important Notes:** |
| 1668 | + |
| 1669 | +- Plugins are executed in reverse order (last plugin wraps the first), forming a chain of responsibility |
| 1670 | +- Client plugins that also implement worker plugin interfaces are automatically propagated to workers |
| 1671 | +- Avoid providing the same plugin to both client and worker to prevent double execution |
| 1672 | +- Plugin methods should call the plugin provided during initialization to maintain the plugin chain |
| 1673 | +- Each plugin's `name()` method returns a unique identifier for debugging purposes |
| 1674 | + |
| 1675 | + |
1419 | 1676 | ### Workflow Replay |
1420 | 1677 |
|
1421 | 1678 | Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example, |
|
0 commit comments