Skip to content

Commit 2f2a6ed

Browse files
committed
Add custom streaming to open ai agents sdk and temporal integration
1 parent b5642eb commit 2f2a6ed

File tree

19 files changed

+3695
-4
lines changed

19 files changed

+3695
-4
lines changed

src/agentex/lib/core/clients/temporal/utils.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any
44

55
from temporalio.client import Client, Plugin as ClientPlugin
6+
from temporalio.worker import Interceptor
67
from temporalio.runtime import Runtime, TelemetryConfig, OpenTelemetryConfig
78
from temporalio.contrib.pydantic import pydantic_data_converter
89
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
@@ -61,6 +62,24 @@ def validate_client_plugins(plugins: list[Any]) -> None:
6162
)
6263

6364

65+
def validate_worker_interceptors(interceptors: list[Any]) -> None:
66+
"""
67+
Validate that all items in the interceptors list are valid Temporal worker interceptors.
68+
69+
Args:
70+
interceptors: List of interceptors to validate
71+
72+
Raises:
73+
TypeError: If any interceptor is not a valid Interceptor instance
74+
"""
75+
for i, interceptor in enumerate(interceptors):
76+
if not isinstance(interceptor, Interceptor):
77+
raise TypeError(
78+
f"Interceptor at index {i} must be an instance of temporalio.worker.Interceptor, "
79+
f"got {type(interceptor).__name__}"
80+
)
81+
82+
6483
async def get_temporal_client(temporal_address: str, metrics_url: str | None = None, plugins: list[Any] = []) -> Client:
6584
"""
6685
Create a Temporal client with plugin integration.
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
"""OpenAI Agents SDK Temporal Plugin with Streaming Support.
2+
3+
This module provides streaming capabilities for the OpenAI Agents SDK in Temporal
4+
using interceptors to thread task_id through workflows to activities.
5+
6+
The streaming implementation works by:
7+
1. Using Temporal interceptors to thread task_id through the execution
8+
2. Streaming LLM responses to Redis in real-time from activities
9+
3. Returning complete responses to maintain Temporal determinism
10+
11+
Example:
12+
>>> from agentex.lib.core.temporal.plugins.openai_agents import (
13+
... TemporalStreamingModelProvider,
14+
... TemporalTracingModelProvider,
15+
... ContextInterceptor,
16+
... )
17+
>>> from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
18+
>>> from datetime import timedelta
19+
>>>
20+
>>> # Create streaming model provider
21+
>>> model_provider = TemporalStreamingModelProvider()
22+
>>>
23+
>>> # Create STANDARD plugin with streaming model provider
24+
>>> plugin = OpenAIAgentsPlugin(
25+
... model_params=ModelActivityParameters(
26+
... start_to_close_timeout=timedelta(seconds=120),
27+
... ),
28+
... model_provider=model_provider,
29+
... )
30+
>>>
31+
>>> # Register interceptor with worker
32+
>>> interceptor = ContextInterceptor()
33+
>>> # Add interceptor to worker configuration
34+
"""
35+
36+
from agentex.lib.core.temporal.plugins.openai_agents import (
37+
ContextInterceptor,
38+
TemporalStreamingHooks,
39+
TemporalStreamingModel,
40+
TemporalTracingModelProvider,
41+
TemporalStreamingModelProvider,
42+
streaming_task_id,
43+
streaming_trace_id,
44+
stream_lifecycle_content,
45+
streaming_parent_span_id,
46+
)
47+
48+
__all__ = [
49+
"TemporalStreamingModel",
50+
"TemporalStreamingModelProvider",
51+
"TemporalTracingModelProvider",
52+
"ContextInterceptor",
53+
"streaming_task_id",
54+
"streaming_trace_id",
55+
"streaming_parent_span_id",
56+
"TemporalStreamingHooks",
57+
"stream_lifecycle_content",
58+
]

0 commit comments

Comments
 (0)