Skip to content

Commit 651557c

Browse files
authored
Merge branch 'main' into openai/rebuild
2 parents b1669c7 + 33b4a43 commit 651557c

File tree

12 files changed

+337
-72
lines changed

12 files changed

+337
-72
lines changed

README.md

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ informal introduction to the features and their implementation.
9494
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9595
- [Worker Shutdown](#worker-shutdown)
9696
- [Testing](#testing-1)
97+
- [Interceptors](#interceptors)
9798
- [Nexus](#nexus)
9899
- [Workflow Replay](#workflow-replay)
99100
- [Observability](#observability)
@@ -1256,6 +1257,7 @@ calls in the `temporalio.activity` package make use of it. Specifically:
12561257

12571258
* `in_activity()` - Whether an activity context is present
12581259
* `info()` - Returns the immutable info of the currently running activity
1260+
* `client()` - Returns the Temporal client used by this worker. Only available in `async def` activities.
12591261
* `heartbeat(*details)` - Record a heartbeat
12601262
* `is_cancelled()` - Whether a cancellation has been requested on this activity
12611263
* `wait_for_cancelled()` - `async` call to wait for cancellation request
@@ -1310,6 +1312,70 @@ affect calls activity code might make to functions on the `temporalio.activity`
13101312
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
13111313

13121314

1315+
### Interceptors
1316+
1317+
The behavior of the SDK can be customized in many useful ways by modifying inbound and outbound calls using
1318+
interceptors. This is similar to the use of middleware in other frameworks.
1319+
1320+
There are five categories of inbound and outbound calls that you can modify in this way:
1321+
1322+
1. Outbound client calls, such as `start_workflow()`, `signal_workflow()`, `list_workflows()`, `update_schedule()`, etc.
1323+
1324+
2. Inbound workflow calls: `execute_workflow()`, `handle_signal()`, `handle_update_handler()`, etc
1325+
1326+
3. Outbound workflow calls: `start_activity()`, `start_child_workflow()`, `start_nexus_operation()`, etc
1327+
1328+
4. Inbound call to execute an activity: `execute_activity()`
1329+
1330+
5. Outbound activity calls: `info()` and `heartbeat()`
1331+
1332+
1333+
To modify outbound client calls, define a class inheriting from
1334+
[`client.Interceptor`](https://python.temporal.io/temporalio.client.Interceptor.html), and implement the method
1335+
`intercept_client()` to return an instance of
1336+
[`OutboundInterceptor`](https://python.temporal.io/temporalio.client.OutboundInterceptor.html) that implements the
1337+
subset of outbound client calls that you wish to modify.
1338+
1339+
Then, pass a list containing an instance of your `client.Interceptor` class as the
1340+
`interceptors` argument of [`Client.connect()`](https://python.temporal.io/temporalio.client.Client.html#connect).
1341+
1342+
The purpose of the interceptor framework is that the methods you implement on your interceptor classes can perform
1343+
arbitrary side effects and/or arbitrary modifications to the data, before it is received by the SDK's "real"
1344+
implementation. The `interceptors` list can contain multiple interceptors. In this case they form a chain: a method
1345+
implemented on an interceptor instance in the list can perform side effects, and modify the data, before passing it on
1346+
to the corresponding method on the next interceptor in the list. Your interceptor classes need not implement every
1347+
method; the default implementation is always to pass the data on to the next method in the interceptor chain.
1348+
1349+
The remaining four categories are worker calls. To modify these, define a class inheriting from
1350+
[`worker.Interceptor`](https://python.temporal.io/temporalio.worker.Interceptor.html) and implement methods on that
1351+
class to define the
1352+
[`ActivityInboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityInboundInterceptor.html),
1353+
[`ActivityOutboundInterceptor`](https://python.temporal.io/temporalio.worker.ActivityOutboundInterceptor.html),
1354+
[`WorkflowInboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowInboundInterceptor.html), and
1355+
[`WorkflowOutboundInterceptor`](https://python.temporal.io/temporalio.worker.WorkflowOutboundInterceptor.html) classes
1356+
that you wish to use to effect your modifications. Then, pass a list containing an instance of your `worker.Interceptor`
1357+
class as the `interceptors` argument of the [`Worker()`](https://python.temporal.io/temporalio.worker.Worker.html)
1358+
constructor.
1359+
1360+
It often happens that your worker and client interceptors will share code because they implement closely related logic.
1361+
For convenience, you can create an interceptor class that inherits from _both_ `client.Interceptor` and
1362+
`worker.Interceptor` (their method sets do not overlap). You can then pass this in the `interceptors` argument of
1363+
`Client.connect()` when starting your worker _as well as_ in your client/starter code. If you do this, your worker will
1364+
automatically pick up the interceptors from its underlying client (and you should not pass them directly to the
1365+
`Worker()` constructor).
1366+
1367+
This is best explained by example. The [Context Propagation Interceptor
1368+
Sample](https://github.com/temporalio/samples-python/tree/main/context_propagation) is a good starting point. In
1369+
[context_propagation/interceptor.py](https://github.com/temporalio/samples-python/blob/main/context_propagation/interceptor.py)
1370+
a class is defined that inherits from both `client.Interceptor` and `worker.Interceptor`. It implements the various
1371+
methods such that the outbound client and workflow calls set a certain key in the outbound `headers` field, and the
1372+
inbound workflow and activity calls retrieve the header value from the inbound workflow/activity input data. An instance
1373+
of this interceptor class is passed to `Client.connect()` when [starting the
1374+
worker](https://github.com/temporalio/samples-python/blob/main/context_propagation/worker.py) and when connecting the
1375+
client in the [workflow starter
1376+
code](https://github.com/temporalio/samples-python/blob/main/context_propagation/starter.py).
1377+
1378+
13131379
### Nexus
13141380

13151381
⚠️ **Nexus support is currently at an experimental release stage. Backwards-incompatible changes are anticipated until a stable release is announced.** ⚠️

temporalio/activity.py

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from dataclasses import dataclass
2020
from datetime import datetime, timedelta
2121
from typing import (
22+
TYPE_CHECKING,
2223
Any,
2324
Callable,
2425
Iterator,
@@ -42,6 +43,9 @@
4243

4344
from .types import CallableType
4445

46+
if TYPE_CHECKING:
47+
from temporalio.client import Client
48+
4549

4650
@overload
4751
def defn(fn: CallableType) -> CallableType: ...
@@ -179,6 +183,7 @@ class _Context:
179183
temporalio.converter.PayloadConverter,
180184
]
181185
runtime_metric_meter: Optional[temporalio.common.MetricMeter]
186+
client: Optional[Client]
182187
cancellation_details: _ActivityCancellationDetailsHolder
183188
_logger_details: Optional[Mapping[str, Any]] = None
184189
_payload_converter: Optional[temporalio.converter.PayloadConverter] = None
@@ -271,13 +276,37 @@ def wait_sync(self, timeout: Optional[float] = None) -> None:
271276
self.thread_event.wait(timeout)
272277

273278

279+
def client() -> Client:
280+
"""Return a Temporal Client for use in the current activity.
281+
282+
The client is only available in `async def` activities.
283+
284+
In tests it is not available automatically, but you can pass a client when creating a
285+
:py:class:`temporalio.testing.ActivityEnvironment`.
286+
287+
Returns:
288+
:py:class:`temporalio.client.Client` for use in the current activity.
289+
290+
Raises:
291+
RuntimeError: When the client is not available.
292+
"""
293+
client = _Context.current().client
294+
if not client:
295+
raise RuntimeError(
296+
"No client available. The client is only available in `async def` "
297+
"activities; not in `def` activities. In tests you can pass a "
298+
"client when creating ActivityEnvironment."
299+
)
300+
return client
301+
302+
274303
def in_activity() -> bool:
275304
"""Whether the current code is inside an activity.
276305
277306
Returns:
278307
True if in an activity, False otherwise.
279308
"""
280-
return not _current_context.get(None) is None
309+
return _current_context.get(None) is not None
281310

282311

283312
def info() -> Info:
@@ -574,8 +603,10 @@ def _apply_to_callable(
574603
fn=fn,
575604
# iscoroutinefunction does not return true for async __call__
576605
# TODO(cretz): Why can't MyPy handle this?
577-
is_async=inspect.iscoroutinefunction(fn)
578-
or inspect.iscoroutinefunction(fn.__call__), # type: ignore
606+
is_async=(
607+
inspect.iscoroutinefunction(fn)
608+
or inspect.iscoroutinefunction(fn.__call__) # type: ignore
609+
),
579610
no_thread_cancel_exception=no_thread_cancel_exception,
580611
),
581612
)

temporalio/client.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -936,9 +936,6 @@ async def execute_update_with_start_workflow(
936936
the call will not return successfully until the update has been delivered to a
937937
worker.
938938
939-
.. warning::
940-
This API is experimental
941-
942939
Args:
943940
update: Update function or name on the workflow. arg: Single argument to the
944941
update.
@@ -1542,6 +1539,12 @@ def __init__(
15421539
result_run_id: Optional[str] = None,
15431540
first_execution_run_id: Optional[str] = None,
15441541
result_type: Optional[Type] = None,
1542+
start_workflow_response: Optional[
1543+
Union[
1544+
temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse,
1545+
temporalio.api.workflowservice.v1.SignalWithStartWorkflowExecutionResponse,
1546+
]
1547+
] = None,
15451548
) -> None:
15461549
"""Create workflow handle."""
15471550
self._client = client
@@ -1550,6 +1553,7 @@ def __init__(
15501553
self._result_run_id = result_run_id
15511554
self._first_execution_run_id = first_execution_run_id
15521555
self._result_type = result_type
1556+
self._start_workflow_response = start_workflow_response
15531557
self.__temporal_eagerly_started = False
15541558

15551559
@property
@@ -5347,11 +5351,7 @@ class StartWorkflowUpdateInput:
53475351

53485352
@dataclass
53495353
class UpdateWithStartUpdateWorkflowInput:
5350-
"""Update input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`.
5351-
5352-
.. warning::
5353-
This API is experimental
5354-
"""
5354+
"""Update input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`."""
53555355

53565356
update_id: Optional[str]
53575357
update: str
@@ -5365,11 +5365,7 @@ class UpdateWithStartUpdateWorkflowInput:
53655365

53665366
@dataclass
53675367
class UpdateWithStartStartWorkflowInput:
5368-
"""StartWorkflow input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`.
5369-
5370-
.. warning::
5371-
This API is experimental
5372-
"""
5368+
"""StartWorkflow input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`."""
53735369

53745370
# Similar to StartWorkflowInput but without e.g. run_id, start_signal,
53755371
# start_signal_args, request_eager_start.
@@ -5405,11 +5401,7 @@ class UpdateWithStartStartWorkflowInput:
54055401

54065402
@dataclass
54075403
class StartWorkflowUpdateWithStartInput:
5408-
"""Input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`.
5409-
5410-
.. warning::
5411-
This API is experimental
5412-
"""
5404+
"""Input for :py:meth:`OutboundInterceptor.start_update_with_start_workflow`."""
54135405

54145406
start_workflow_input: UpdateWithStartStartWorkflowInput
54155407
update_workflow_input: UpdateWithStartUpdateWorkflowInput
@@ -5683,11 +5675,7 @@ async def start_workflow_update(
56835675
async def start_update_with_start_workflow(
56845676
self, input: StartWorkflowUpdateWithStartInput
56855677
) -> WorkflowUpdateHandle[Any]:
5686-
"""Called for every :py:meth:`Client.start_update_with_start_workflow` and :py:meth:`Client.execute_update_with_start_workflow` call.
5687-
5688-
.. warning::
5689-
This API is experimental
5690-
"""
5678+
"""Called for every :py:meth:`Client.start_update_with_start_workflow` and :py:meth:`Client.execute_update_with_start_workflow` call."""
56915679
return await self.next.start_update_with_start_workflow(input)
56925680

56935681
### Async activity calls
@@ -5832,6 +5820,7 @@ async def start_workflow(
58325820
result_run_id=resp.run_id,
58335821
first_execution_run_id=first_execution_run_id,
58345822
result_type=input.ret_type,
5823+
start_workflow_response=resp,
58355824
)
58365825
setattr(handle, "__temporal_eagerly_started", eagerly_started)
58375826
return handle

temporalio/contrib/openai_agents/workflow.py

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
"""Workflow-specific primitives for working with the OpenAI Agents SDK in a workflow context"""
22

3+
import functools
4+
import inspect
35
import json
46
from datetime import timedelta
5-
from typing import Any, Callable, Optional, Type
7+
from typing import Any, Callable, Optional, Type, Union, overload
68

79
import nexusrpc
810
from agents import (
11+
Agent,
912
RunContextWrapper,
1013
Tool,
1114
)
12-
from agents.function_schema import function_schema
15+
from agents.function_schema import DocstringStyle, function_schema
1316
from agents.tool import (
1417
FunctionTool,
18+
ToolErrorFunction,
19+
ToolFunction,
20+
ToolParams,
21+
default_tool_error_function,
22+
function_tool,
1523
)
24+
from agents.util._types import MaybeAwaitable
1625

1726
from temporalio import activity
1827
from temporalio import workflow as temporal_workflow
@@ -78,6 +87,25 @@ def activity_as_tool(
7887
"Bare function without tool and activity decorators is not supported",
7988
"invalid_tool",
8089
)
90+
if ret.name is None:
91+
raise ApplicationError(
92+
"Input activity must have a name to be made into a tool",
93+
"invalid_tool",
94+
)
95+
# If the provided callable has a first argument of `self`, partially apply it with the same metadata
96+
# The actual instance will be picked up by the activity execution, the partially applied function will never actually be executed
97+
params = list(inspect.signature(fn).parameters.keys())
98+
if len(params) > 0 and params[0] == "self":
99+
partial = functools.partial(fn, None)
100+
setattr(partial, "__name__", fn.__name__)
101+
partial.__annotations__ = getattr(fn, "__annotations__")
102+
setattr(
103+
partial,
104+
"__temporal_activity_definition",
105+
getattr(fn, "__temporal_activity_definition"),
106+
)
107+
partial.__doc__ = fn.__doc__
108+
fn = partial
81109
schema = function_schema(fn)
82110

83111
async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
@@ -94,9 +122,8 @@ async def run_activity(ctx: RunContextWrapper[Any], input: str) -> Any:
94122
# Add the context to the arguments if it takes that
95123
if schema.takes_context:
96124
args = [ctx] + args
97-
98125
result = await temporal_workflow.execute_activity(
99-
fn,
126+
ret.name, # type: ignore
100127
args=args,
101128
task_queue=task_queue,
102129
schedule_to_close_timeout=schedule_to_close_timeout,

temporalio/nexus/_link_conversion.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
LINK_EVENT_TYPE_PARAM_NAME = "eventType"
2424

2525

26-
def workflow_handle_to_workflow_execution_started_event_link(
26+
def workflow_execution_started_event_link_from_workflow_handle(
2727
handle: temporalio.client.WorkflowHandle[Any, Any],
2828
) -> temporalio.api.common.v1.Link.WorkflowEvent:
2929
"""Create a WorkflowEvent link corresponding to a started workflow"""

temporalio/nexus/_operation_context.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing_extensions import Concatenate
1919

2020
import temporalio.api.common.v1
21+
import temporalio.api.workflowservice.v1
2122
import temporalio.client
2223
import temporalio.common
2324
from temporalio.nexus import _link_conversion
@@ -142,25 +143,30 @@ def _get_workflow_event_links(
142143
def _add_outbound_links(
143144
self, workflow_handle: temporalio.client.WorkflowHandle[Any, Any]
144145
):
146+
# If links were not sent in StartWorkflowExecutionResponse then construct them.
147+
wf_event_links: list[temporalio.api.common.v1.Link.WorkflowEvent] = []
145148
try:
146-
link = _link_conversion.workflow_event_to_nexus_link(
147-
_link_conversion.workflow_handle_to_workflow_execution_started_event_link(
148-
workflow_handle
149-
)
149+
if isinstance(
150+
workflow_handle._start_workflow_response,
151+
temporalio.api.workflowservice.v1.StartWorkflowExecutionResponse,
152+
):
153+
if workflow_handle._start_workflow_response.HasField("link"):
154+
if link := workflow_handle._start_workflow_response.link:
155+
if link.HasField("workflow_event"):
156+
wf_event_links.append(link.workflow_event)
157+
if not wf_event_links:
158+
wf_event_links = [
159+
_link_conversion.workflow_execution_started_event_link_from_workflow_handle(
160+
workflow_handle
161+
)
162+
]
163+
self.nexus_context.outbound_links.extend(
164+
_link_conversion.workflow_event_to_nexus_link(link)
165+
for link in wf_event_links
150166
)
151167
except Exception as e:
152168
logger.warning(
153-
f"Failed to create WorkflowExecutionStarted event link for workflow {id}: {e}"
154-
)
155-
else:
156-
self.nexus_context.outbound_links.append(
157-
# TODO(nexus-prerelease): Before, WorkflowRunOperation was generating an EventReference
158-
# link to send back to the caller. Now, it checks if the server returned
159-
# the link in the StartWorkflowExecutionResponse, and if so, send the link
160-
# from the response to the caller. Fallback to generating the link for
161-
# backwards compatibility. PR reference in Go SDK:
162-
# https://github.com/temporalio/sdk-go/pull/1934
163-
link
169+
f"Failed to create WorkflowExecutionStarted event links for workflow {workflow_handle}: {e}"
164170
)
165171
return workflow_handle
166172

0 commit comments

Comments
 (0)