Skip to content

Commit 5c8dfd7

Browse files
committed
Add hooks and tests
1 parent 62c252e commit 5c8dfd7

File tree

10 files changed

+2271
-422
lines changed

10 files changed

+2271
-422
lines changed

src/agentex/lib/core/temporal/plugins/openai_agents/README.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,52 @@ The most important innovation is **runtime task_id extraction**. Unlike passing
700700

701701
---
702702

703+
## Testing
704+
705+
### Running Tests
706+
707+
The streaming model implementation has comprehensive tests in `tests/test_streaming_model.py` that verify all configurations, tool types, and edge cases.
708+
709+
#### From Repository Root
710+
711+
```bash
712+
# Run all tests
713+
rye run pytest src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py -v
714+
715+
# Run without parallel execution (more stable)
716+
rye run pytest src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py -v -n0
717+
718+
# Run specific test
719+
rye run pytest src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py::TestStreamingModelSettings::test_temperature_setting -v
720+
```
721+
722+
#### From Test Directory
723+
724+
```bash
725+
cd src/agentex/lib/core/temporal/plugins/openai_agents/tests
726+
727+
# Run all tests
728+
rye run pytest test_streaming_model.py -v
729+
730+
# Run without parallel execution (recommended)
731+
rye run pytest test_streaming_model.py -v -n0
732+
733+
# Run specific test class
734+
rye run pytest test_streaming_model.py::TestStreamingModelSettings -v
735+
```
736+
737+
#### Test Coverage
738+
739+
The test suite covers:
740+
- **ModelSettings**: All configuration parameters (temperature, reasoning, truncation, etc.)
741+
- **Tool Types**: Function tools, web search, file search, computer tools, MCP tools, etc.
742+
- **Streaming**: Redis context creation, task ID threading, error handling
743+
- **Edge Cases**: Missing task IDs, multiple computer tools, handoffs
744+
745+
**Note**: Tests run faster without parallel execution (`-n0` flag) and avoid potential state pollution between test workers. All 29 tests pass individually; parallel execution may show 4-6 intermittent failures due to shared mock state.
746+
747+
---
748+
703749
## Conclusion
704750

705751
This implementation threads task_id through Temporal's OpenAI plugin architecture to enable real-time streaming while maintaining workflow determinism. The key innovation is extracting task_id at runtime rather than at plugin initialization, allowing dynamic streaming channels per workflow execution.

src/agentex/lib/core/temporal/plugins/openai_agents/__init__.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,43 @@
66
The streaming implementation works by:
77
1. Threading a task_id through the workflow execution
88
2. Streaming LLM responses to Redis in real-time from activities
9-
3. Returning complete responses to maintain Temporal determinism
9+
3. Streaming lifecycle events (tool calls, handoffs) via hooks and activities
10+
4. Returning complete responses to maintain Temporal determinism
1011
11-
Example:
12+
Example - Complete Setup:
1213
>>> from agentex.lib.core.temporal.plugins.openai_agents import (
1314
... CustomStreamingOpenAIAgentsPlugin,
1415
... StreamingModelProvider,
16+
... TemporalStreamingHooks,
1517
... )
1618
>>> from temporalio.contrib.openai_agents import ModelActivityParameters
1719
>>> from datetime import timedelta
20+
>>> from agents import Agent, Runner
1821
>>>
19-
>>> # Create streaming model provider
22+
>>> # 1. Create streaming model provider
2023
>>> model_provider = StreamingModelProvider()
2124
>>>
22-
>>> # Create plugin with streaming support
25+
>>> # 2. Create plugin with streaming support
2326
>>> plugin = CustomStreamingOpenAIAgentsPlugin(
2427
... model_params=ModelActivityParameters(
2528
... start_to_close_timeout=timedelta(seconds=120),
2629
... ),
2730
... model_provider=model_provider,
2831
... )
2932
>>>
30-
>>> # In workflow, pass task_id through context
33+
>>> # 3. In workflow, create hooks for streaming lifecycle events
34+
>>> # The hooks automatically use the built-in streaming activity
35+
>>> hooks = TemporalStreamingHooks(task_id="your-task-id")
36+
>>>
37+
>>> # 4. Run agent with streaming context and hooks
3138
>>> context = {"task_id": "your-task-id"}
32-
>>> result = await runner.run(agent, input, context=context)
39+
>>> result = await Runner.run(agent, input, context=context, hooks=hooks)
40+
41+
This gives you:
42+
- Real-time streaming of LLM responses (via StreamingModel)
43+
- Real-time streaming of tool calls (via TemporalStreamingHooks)
44+
- Real-time streaming of agent handoffs (via TemporalStreamingHooks)
45+
- Full Temporal durability and observability
3346
"""
3447

3548
from agentex.lib.core.temporal.plugins.openai_agents.streaming_model import (
@@ -45,6 +58,12 @@
4558
StreamingTemporalRunner,
4659
StreamingTemporalModelStub,
4760
)
61+
from agentex.lib.core.temporal.plugins.openai_agents.hooks import (
62+
TemporalStreamingHooks,
63+
)
64+
from agentex.lib.core.temporal.plugins.openai_agents.activities import (
65+
stream_lifecycle_content,
66+
)
4867

4968
__all__ = [
5069
"CustomStreamingOpenAIAgentsPlugin",
@@ -54,4 +73,6 @@
5473
"StreamingActivityModelInput",
5574
"StreamingTemporalRunner",
5675
"StreamingTemporalModelStub",
76+
"TemporalStreamingHooks",
77+
"stream_lifecycle_content",
5778
]
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
"""Temporal activities for streaming agent lifecycle events.
2+
3+
This module provides reusable Temporal activities for streaming content
4+
to the AgentEx UI, designed to work with TemporalStreamingHooks.
5+
"""
6+
7+
from typing import Union
8+
9+
from temporalio import activity
10+
11+
from agentex.lib import adk
12+
from agentex.types.text_content import TextContent
13+
from agentex.types.task_message_update import StreamTaskMessageFull
14+
from agentex.types.task_message_content import (
15+
TaskMessageContent,
16+
ToolRequestContent,
17+
ToolResponseContent,
18+
)
19+
20+
21+
@activity.defn(name="stream_lifecycle_content")
22+
async def stream_lifecycle_content(
23+
task_id: str,
24+
content: Union[TextContent, ToolRequestContent, ToolResponseContent, TaskMessageContent],
25+
) -> None:
26+
"""Stream agent lifecycle content to the AgentEx UI.
27+
28+
This is a universal streaming activity that can handle any type of agent
29+
lifecycle content (text messages, tool requests, tool responses, etc.).
30+
It uses the AgentEx streaming context to send updates to the UI in real-time.
31+
32+
Designed to work seamlessly with TemporalStreamingHooks. The hooks class
33+
will call this activity automatically when lifecycle events occur.
34+
35+
Args:
36+
task_id: The AgentEx task ID for routing the content to the correct UI session
37+
content: The content to stream - can be any of:
38+
- TextContent: Plain text messages (e.g., handoff notifications)
39+
- ToolRequestContent: Tool invocation requests with call_id and name
40+
- ToolResponseContent: Tool execution results with call_id and output
41+
- TaskMessageContent: Generic task message content
42+
43+
Example:
44+
Register this activity with your Temporal worker::
45+
46+
from agentex.lib.core.temporal.plugins.openai_agents import (
47+
TemporalStreamingHooks,
48+
stream_lifecycle_content,
49+
)
50+
51+
# In your workflow
52+
hooks = TemporalStreamingHooks(
53+
task_id=params.task.id,
54+
stream_activity=stream_lifecycle_content
55+
)
56+
result = await Runner.run(agent, input, hooks=hooks)
57+
58+
Note:
59+
This activity is non-blocking and will not throw exceptions to the workflow.
60+
Any streaming errors are logged but do not fail the activity. This ensures
61+
that streaming failures don't break the agent execution.
62+
"""
63+
try:
64+
async with adk.streaming.streaming_task_message_context(
65+
task_id=task_id,
66+
initial_content=content,
67+
) as streaming_context:
68+
# Send the content as a full message update
69+
await streaming_context.stream_update(
70+
StreamTaskMessageFull(
71+
parent_task_message=streaming_context.task_message,
72+
content=content,
73+
type="full",
74+
)
75+
)
76+
except Exception as e:
77+
# Log error but don't fail the activity - streaming failures shouldn't break execution
78+
activity.logger.warning(f"Failed to stream content to task {task_id}: {e}")
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
"""Temporal streaming hooks for OpenAI Agents SDK lifecycle events.
2+
3+
This module provides a convenience class for streaming agent lifecycle events
4+
to the AgentEx UI via Temporal activities.
5+
"""
6+
7+
from typing import Any
8+
from datetime import timedelta
9+
10+
from agents import Agent, RunContextWrapper, RunHooks, Tool
11+
from agents.tool_context import ToolContext
12+
from temporalio import workflow
13+
14+
from agentex.types.text_content import TextContent
15+
from agentex.types.task_message_content import ToolRequestContent, ToolResponseContent
16+
from agentex.lib.core.temporal.plugins.openai_agents.activities import stream_lifecycle_content
17+
18+
19+
class TemporalStreamingHooks(RunHooks):
20+
"""Convenience hooks class for streaming OpenAI Agent lifecycle events to the AgentEx UI.
21+
22+
This class automatically streams agent lifecycle events (tool calls, handoffs) to the
23+
AgentEx UI via Temporal activities. It subclasses the OpenAI Agents SDK's RunHooks
24+
to intercept lifecycle events and forward them for real-time UI updates.
25+
26+
Lifecycle events streamed:
27+
- Tool requests (on_tool_start): Streams when a tool is about to be invoked
28+
- Tool responses (on_tool_end): Streams the tool's execution result
29+
- Agent handoffs (on_handoff): Streams when control transfers between agents
30+
31+
Usage:
32+
Basic usage - streams all lifecycle events::
33+
34+
from agentex.lib.core.temporal.plugins.openai_agents import TemporalStreamingHooks
35+
36+
hooks = TemporalStreamingHooks(task_id="abc123")
37+
result = await Runner.run(agent, input, hooks=hooks)
38+
39+
Advanced - subclass for custom behavior::
40+
41+
class MyCustomHooks(TemporalStreamingHooks):
42+
async def on_tool_start(self, context, agent, tool):
43+
# Add custom logic before streaming
44+
await self.my_custom_logging(tool)
45+
# Call parent to stream to UI
46+
await super().on_tool_start(context, agent, tool)
47+
48+
async def on_agent_start(self, context, agent):
49+
# Override empty methods for additional tracking
50+
print(f"Agent {agent.name} started")
51+
52+
Power users can ignore this class and subclass agents.RunHooks directly for full control.
53+
54+
Note:
55+
Tool arguments are not available in hooks due to OpenAI SDK architecture.
56+
The SDK's hook signature doesn't include tool arguments - they're only passed
57+
to the actual tool function. This is why arguments={} in ToolRequestContent.
58+
59+
Attributes:
60+
task_id: The AgentEx task ID for routing streamed events
61+
timeout: Timeout for streaming activity calls (default: 10 seconds)
62+
"""
63+
64+
def __init__(
65+
self,
66+
task_id: str,
67+
timeout: timedelta = timedelta(seconds=10),
68+
):
69+
"""Initialize the streaming hooks.
70+
71+
Args:
72+
task_id: AgentEx task ID for routing streamed events to the correct UI session
73+
timeout: Timeout for streaming activity invocations (default: 10 seconds)
74+
"""
75+
super().__init__()
76+
self.task_id = task_id
77+
self.timeout = timeout
78+
79+
async def on_agent_start(self, context: RunContextWrapper, agent: Agent) -> None:
80+
"""Called when an agent starts execution.
81+
82+
Default implementation does nothing. Override to add custom behavior.
83+
84+
Args:
85+
context: The run context wrapper
86+
agent: The agent that is starting
87+
"""
88+
pass
89+
90+
async def on_agent_end(self, context: RunContextWrapper, agent: Agent, output: Any) -> None:
91+
"""Called when an agent completes execution.
92+
93+
Default implementation does nothing. Override to add custom behavior.
94+
95+
Args:
96+
context: The run context wrapper
97+
agent: The agent that completed
98+
output: The agent's output
99+
"""
100+
pass
101+
102+
async def on_tool_start(self, context: RunContextWrapper, agent: Agent, tool: Tool) -> None:
103+
"""Stream tool request when a tool starts execution.
104+
105+
Extracts the tool_call_id from the context and streams a ToolRequestContent
106+
message to the UI showing that the tool is about to execute.
107+
108+
Note: Tool arguments are not available in the hook context due to OpenAI SDK
109+
design. The hook signature doesn't include tool arguments - they're passed
110+
directly to the tool function instead. We send an empty dict as a placeholder.
111+
112+
Args:
113+
context: The run context wrapper (will be a ToolContext with tool_call_id)
114+
agent: The agent executing the tool
115+
tool: The tool being executed
116+
"""
117+
tool_context = context if isinstance(context, ToolContext) else None
118+
tool_call_id = tool_context.tool_call_id if tool_context else f"call_{id(tool)}"
119+
120+
await workflow.execute_activity_method(
121+
stream_lifecycle_content,
122+
args=[
123+
self.task_id,
124+
ToolRequestContent(
125+
author="agent",
126+
tool_call_id=tool_call_id,
127+
name=tool.name,
128+
arguments={}, # Not available in hook context - SDK limitation
129+
),
130+
],
131+
start_to_close_timeout=self.timeout,
132+
)
133+
134+
async def on_tool_end(
135+
self, context: RunContextWrapper, agent: Agent, tool: Tool, result: str
136+
) -> None:
137+
"""Stream tool response when a tool completes execution.
138+
139+
Extracts the tool_call_id and streams a ToolResponseContent message to the UI
140+
showing the tool's execution result.
141+
142+
Args:
143+
context: The run context wrapper (will be a ToolContext with tool_call_id)
144+
agent: The agent that executed the tool
145+
tool: The tool that was executed
146+
result: The tool's execution result
147+
"""
148+
tool_context = context if isinstance(context, ToolContext) else None
149+
tool_call_id = (
150+
getattr(tool_context, "tool_call_id", f"call_{id(tool)}")
151+
if tool_context
152+
else f"call_{id(tool)}"
153+
)
154+
155+
await workflow.execute_activity_method(
156+
stream_lifecycle_content,
157+
args=[
158+
self.task_id,
159+
ToolResponseContent(
160+
author="agent",
161+
tool_call_id=tool_call_id,
162+
name=tool.name,
163+
content=result,
164+
),
165+
],
166+
start_to_close_timeout=self.timeout,
167+
)
168+
169+
async def on_handoff(
170+
self, context: RunContextWrapper, from_agent: Agent, to_agent: Agent
171+
) -> None:
172+
"""Stream handoff message when control transfers between agents.
173+
174+
Sends a text message to the UI indicating that one agent is handing off
175+
to another agent.
176+
177+
Args:
178+
context: The run context wrapper
179+
from_agent: The agent transferring control
180+
to_agent: The agent receiving control
181+
"""
182+
await workflow.execute_activity_method(
183+
stream_lifecycle_content,
184+
args=[
185+
self.task_id,
186+
TextContent(
187+
author="agent",
188+
content=f"Handoff from {from_agent.name} to {to_agent.name}",
189+
type="text",
190+
),
191+
],
192+
start_to_close_timeout=self.timeout,
193+
)

0 commit comments

Comments
 (0)