Skip to content

Commit 1a4e622

Browse files
committed
Update readme
1 parent cfc9b4c commit 1a4e622

File tree

1 file changed

+234
-48
lines changed
  • src/agentex/lib/core/temporal/plugins/openai_agents

1 file changed

+234
-48
lines changed

src/agentex/lib/core/temporal/plugins/openai_agents/README.md

Lines changed: 234 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ We modified Temporal's OpenAI Agents plugin to add real-time streaming to Redis/
1616
3. [The Streaming Challenge](#the-streaming-challenge)
1717
4. [Our Streaming Solution](#our-streaming-solution)
1818
5. [Implementation Details](#implementation-details)
19-
6. [Usage](#usage)
20-
7. [Drawbacks and Maintenance](#drawbacks-and-maintenance)
19+
6. [Streaming Lifecycle Events with Hooks](#streaming-lifecycle-events-with-hooks)
20+
7. [Usage](#usage)
21+
8. [Drawbacks and Maintenance](#drawbacks-and-maintenance)
2122

2223
---
2324

@@ -431,38 +432,13 @@ class StreamingModel(Model):
431432
)
432433
await streaming_context.stream_update(update)
433434

434-
# 5. Handle tool calls (sent as complete messages, not streamed)
435-
if tool_calls:
436-
for tool_call_data in tool_calls.values():
437-
tool_request = ToolRequestContent(
438-
author="agent",
439-
tool_call_id=tool_call_data["id"],
440-
name=tool_call_data["function"]["name"],
441-
arguments=json.loads(tool_call_data["function"]["arguments"])
442-
)
435+
# 5. Handle reasoning tokens - streamed as they're generated
436+
# Reasoning summaries (for o1/o3 models) are streamed token-by-token
437+
# as ReasoningSummaryDelta events during the response stream
443438

444-
# Tool calls use StreamTaskMessageFull (complete message)
445-
async with adk.streaming.streaming_task_message_context(
446-
task_id=task_id,
447-
initial_content=tool_request
448-
) as tool_context:
449-
await tool_context.stream_update(
450-
StreamTaskMessageFull(
451-
parent_task_message=tool_context.task_message,
452-
content=tool_request,
453-
type="full"
454-
)
455-
)
456-
457-
# 6. Handle reasoning tokens (o1 models)
458-
if reasoning_content: # For o1 models
459-
reasoning = ReasoningContent(
460-
author="agent",
461-
summary=[reasoning_content],
462-
type="reasoning"
463-
)
464-
# Stream reasoning as complete message
465-
await stream_reasoning_update(reasoning)
439+
# Note: Tool calls are NOT streamed from get_response
440+
# They are collected during streaming and returned in the final response
441+
# Use TemporalStreamingHooks to stream tool lifecycle events separately
466442

467443
# 7. Context auto-closes and saves to DB
468444
# The streaming_task_message_context:
@@ -501,28 +477,22 @@ await streaming_context.stream_update(
501477
StreamTaskMessageDelta(delta=TextDelta(text_delta=chunk))
502478
)
503479

504-
# Tool calls - sent as complete messages
505-
await streaming_context.stream_update(
506-
StreamTaskMessageFull(content=ToolRequestContent(...))
507-
)
508-
509-
# Reasoning (o1 models) - sent as complete
510-
await streaming_context.stream_update(
511-
StreamTaskMessageFull(content=ReasoningContent(...))
480+
# Reasoning summaries (o1/o3 models) - streamed token by token
481+
await reasoning_context.stream_update(
482+
StreamTaskMessageDelta(delta=ReasoningSummaryDelta(summary_delta=chunk))
512483
)
513484

514-
# Guardrails - sent as complete
515-
await streaming_context.stream_update(
516-
StreamTaskMessageFull(content=GuardrailContent(...))
517-
)
485+
# NOTE: Tool calls and handoffs are NOT streamed from streaming_model.py
486+
# They are collected during streaming and returned in the final ModelResponse
487+
# To stream tool lifecycle events, use TemporalStreamingHooks (see below)
518488
```
519489

520490
#### UI Subscription
521491

522492
The frontend subscribes to `stream:{task_id}` and receives:
523-
1. Real-time text chunks as they're generated
524-
2. Complete tool calls when they're ready
525-
3. Reasoning summaries for o1 models
493+
1. Real-time text chunks as they're generated (from StreamingModel)
494+
2. Reasoning summaries for o1/o3 models (from StreamingModel)
495+
3. Tool lifecycle events (from TemporalStreamingHooks - see section below)
526496
4. DONE signal when complete
527497

528498
This decoupling means we can stream anything we want through Redis!
@@ -543,6 +513,222 @@ class ExampleWorkflow:
543513

544514
---
545515

516+
## Streaming Lifecycle Events with Hooks
517+
518+
### Overview: Two Types of Streaming
519+
520+
The streaming implementation described above (StreamingModel) only streams **LLM text responses and reasoning tokens**. It does NOT stream tool calls or agent handoffs - those are collected during execution and returned in the final response.
521+
522+
To stream **agent lifecycle events** (tool requests, tool responses, handoffs), we provide **`TemporalStreamingHooks`** - a simpler, complementary approach that works alongside the streaming model.
523+
524+
| What Gets Streamed | StreamingModel | TemporalStreamingHooks |
525+
|-------------------|----------------|------------------------|
526+
| **LLM text responses** | ✅ Token-by-token ||
527+
| **Reasoning summaries (o1/o3)** | ✅ Token-by-token ||
528+
| **Tool requests** || ✅ When tool starts |
529+
| **Tool responses** || ✅ When tool completes |
530+
| **Agent handoffs** || ✅ When handoff occurs |
531+
532+
**Best practice**: Use both together for complete streaming visibility!
533+
534+
### What are Hooks?
535+
536+
Hooks are callbacks provided by the OpenAI Agents SDK that fire during agent execution lifecycle events. They provide interception points for:
537+
- `on_agent_start` - When an agent begins execution
538+
- `on_agent_end` - When an agent completes execution
539+
- `on_tool_start` - When a tool is about to be invoked
540+
- `on_tool_end` - When a tool completes execution
541+
- `on_handoff` - When control transfers between agents
542+
543+
### Why Use Hooks vs. Streaming Model?
544+
545+
The streaming model approach operates at the LLM response level - it sees tokens as they're generated but doesn't have visibility into tool lifecycle events. Hooks provide a simpler, more configurable way to track what the agent is doing without understanding the plugin architecture internals.
546+
547+
### Quick Start with TemporalStreamingHooks
548+
549+
```python
550+
from agentex.lib.core.temporal.plugins.openai_agents import TemporalStreamingHooks
551+
from agents import Agent, Runner
552+
553+
# Create an agent
554+
agent = Agent(
555+
name="Assistant",
556+
model="gpt-4o",
557+
instructions="You are a helpful assistant",
558+
tools=[my_tool] # Assume we have some tools
559+
)
560+
561+
# Initialize hooks with your task_id
562+
hooks = TemporalStreamingHooks(task_id="abc123")
563+
564+
# Run the agent - lifecycle events automatically stream to the UI!
565+
result = await Runner.run(agent, "Hello", hooks=hooks)
566+
```
567+
568+
That's it! Tool requests, tool responses, and handoffs are now automatically streamed to the AgentEx UI in real-time.
569+
570+
### What Gets Streamed by Hooks
571+
572+
The `TemporalStreamingHooks` class automatically streams:
573+
574+
1. **Tool Requests** (`on_tool_start`):
575+
- Fires when a tool is about to execute
576+
- Streams `ToolRequestContent` with tool name and call ID
577+
- Shows in UI that tool is being invoked
578+
- **Note**: Tool arguments are not available due to OpenAI SDK architecture
579+
580+
2. **Tool Responses** (`on_tool_end`):
581+
- Fires when a tool completes execution
582+
- Streams `ToolResponseContent` with tool result
583+
- Shows tool output in the UI
584+
585+
3. **Agent Handoffs** (`on_handoff`):
586+
- Fires when control transfers between agents
587+
- Streams `TextContent` with "Handoff from AgentA to AgentB" message
588+
589+
### Using Hooks in Temporal Workflows
590+
591+
When using hooks inside a Temporal workflow, combine them with the streaming context:
592+
593+
```python
594+
from agents import Agent
595+
from agents.run import get_default_agent_runner
596+
from agentex.lib.core.temporal.plugins.openai_agents import TemporalStreamingHooks
597+
598+
@workflow.defn
599+
class MyWorkflow:
600+
@workflow.run
601+
async def run(self, params):
602+
agent = Agent(
603+
name="Assistant",
604+
instructions="You are a helpful assistant",
605+
model="gpt-4o",
606+
tools=[my_search_tool, my_calculator_tool]
607+
)
608+
609+
# Create hooks with the task_id for lifecycle event streaming
610+
hooks = TemporalStreamingHooks(task_id=params.task.id)
611+
612+
# Pass context for LLM response streaming (from StreamingModel)
613+
context = {"task_id": params.task.id}
614+
615+
# Run with BOTH hooks AND context for complete streaming coverage
616+
runner = get_default_agent_runner()
617+
result = await runner.run(
618+
agent,
619+
params.event.content,
620+
context=context, # Enables LLM text/reasoning streaming
621+
hooks=hooks # Enables tool/handoff lifecycle streaming
622+
)
623+
624+
return result.final_output
625+
```
626+
627+
### Advanced: Custom Hooks
628+
629+
Need custom behavior? Subclass `TemporalStreamingHooks` and override methods:
630+
631+
```python
632+
from agentex.lib.core.temporal.plugins.openai_agents import TemporalStreamingHooks
633+
634+
class MyCustomHooks(TemporalStreamingHooks):
635+
async def on_tool_start(self, context, agent, tool):
636+
# Add custom logic before streaming
637+
print(f"About to call tool: {tool.name}")
638+
await self.my_custom_logging(tool)
639+
640+
# Call parent to stream to UI
641+
await super().on_tool_start(context, agent, tool)
642+
643+
async def on_agent_start(self, context, agent):
644+
# Override empty methods for additional tracking
645+
print(f"Agent {agent.name} started")
646+
await self.track_agent_metrics(agent)
647+
648+
async def on_agent_end(self, context, agent, output):
649+
# Track completion metrics
650+
print(f"Agent {agent.name} completed")
651+
await self.save_agent_output(agent, output)
652+
653+
# Use your custom hooks
654+
hooks = MyCustomHooks(task_id="abc123")
655+
result = await Runner.run(agent, input, hooks=hooks)
656+
```
657+
658+
### Configuration Options
659+
660+
```python
661+
from datetime import timedelta
662+
663+
# Customize timeout for streaming activities
664+
hooks = TemporalStreamingHooks(
665+
task_id="abc123",
666+
timeout=timedelta(seconds=30) # Default is 10 seconds
667+
)
668+
```
669+
670+
### Hooks Implementation Details
671+
672+
Under the hood, `TemporalStreamingHooks` uses Temporal activities to stream events:
673+
674+
```python
675+
# From hooks.py - simplified for illustration
676+
class TemporalStreamingHooks(RunHooks):
677+
async def on_tool_start(self, context, agent, tool):
678+
# Extract tool_call_id from context
679+
tool_call_id = context.tool_call_id if isinstance(context, ToolContext) else f"call_{id(tool)}"
680+
681+
# Stream via Temporal activity
682+
await workflow.execute_activity_method(
683+
stream_lifecycle_content,
684+
args=[
685+
self.task_id,
686+
ToolRequestContent(
687+
author="agent",
688+
tool_call_id=tool_call_id,
689+
name=tool.name,
690+
arguments={}, # Not available in hook context
691+
),
692+
],
693+
start_to_close_timeout=self.timeout,
694+
)
695+
```
696+
697+
The `stream_lifecycle_content` activity then uses the AgentEx streaming infrastructure to push events to Redis, just like the streaming model does.
698+
699+
### Limitations
700+
701+
**Important**: Tool arguments are not available in `on_tool_start` hooks due to OpenAI SDK architecture. The hook signature doesn't include tool arguments - they're only passed to the actual tool function. This is why `arguments={}` appears in `ToolRequestContent`.
702+
703+
If you need tool arguments in your streaming data, you'll need to:
704+
1. Stream them from within the tool function itself, or
705+
2. Wait for `on_tool_end` where you can log the full tool context
706+
707+
### Power Users: Direct RunHooks Subclassing
708+
709+
If you need complete control, ignore `TemporalStreamingHooks` and subclass `agents.RunHooks` directly:
710+
711+
```python
712+
from agents import RunHooks
713+
from temporalio import workflow
714+
from agentex.lib.core.temporal.plugins.openai_agents.activities import stream_lifecycle_content
715+
716+
class MyPowerUserHooks(RunHooks):
717+
def __init__(self, task_id: str):
718+
super().__init__()
719+
self.task_id = task_id
720+
721+
async def on_tool_start(self, context, agent, tool):
722+
# Implement completely custom streaming logic
723+
await workflow.execute_activity_method(
724+
stream_lifecycle_content,
725+
args=[self.task_id, my_custom_content],
726+
start_to_close_timeout=timedelta(seconds=10)
727+
)
728+
```
729+
730+
---
731+
546732
## Usage
547733

548734
### Installation

0 commit comments

Comments
 (0)