Skip to content

Commit de48d62

Browse files
committed
Fix tutorials and add tests
1 parent a25ab9f commit de48d62

File tree

18 files changed

+1082
-407
lines changed

18 files changed

+1082
-407
lines changed

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434

3535
from agentex.lib.types.fastacp import TemporalACPConfig
3636
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
38+
TemporalStreamingModelProvider,
39+
)
40+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
41+
42+
context_interceptor = ContextInterceptor()
43+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3744

3845
# Create the ACP server
3946
acp = FastACP.create(
@@ -44,7 +51,8 @@
4451
# We are also adding the Open AI Agents SDK plugin to the ACP.
4552
type="temporal",
4653
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
47-
plugins=[OpenAIAgentsPlugin()]
54+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
55+
interceptors=[context_interceptor]
4856
)
4957
)
5058

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
from agentex.lib.environment_variables import EnvironmentVariables
99
from agentex.lib.core.temporal.activities import get_all_activities
1010
from agentex.lib.core.temporal.workers.worker import AgentexWorker
11+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
12+
TemporalStreamingModelProvider,
13+
)
14+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
1115

1216
environment_variables = EnvironmentVariables.refresh()
1317

@@ -24,12 +28,36 @@ async def main():
2428

2529
# Add activities to the worker
2630
all_activities = get_all_activities() + [] # add your own activities here
27-
31+
32+
# ============================================================================
33+
# STREAMING SETUP: Interceptor + Model Provider
34+
# ============================================================================
35+
# This is where the streaming magic is configured! Two key components:
36+
#
37+
# 1. ContextInterceptor (StreamingInterceptor)
38+
# - Threads task_id through activity headers using Temporal's interceptor pattern
39+
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
40+
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
41+
# - This enables runtime context without forking the Temporal plugin!
42+
#
43+
# 2. TemporalStreamingModelProvider
44+
# - Returns StreamingModel instances that read task_id from ContextVar
45+
# - StreamingModel.get_response() streams tokens to Redis in real-time
46+
# - Still returns complete response to Temporal for determinism/replay safety
47+
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
48+
#
49+
# Together, these enable real-time LLM streaming while maintaining Temporal's
50+
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
51+
context_interceptor = ContextInterceptor()
52+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
53+
2854
# Create a worker with automatic tracing
29-
# We are also adding the Open AI Agents SDK plugin to the worker.
55+
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
56+
# No forking needed! The interceptor + model provider handle all streaming logic.
3057
worker = AgentexWorker(
3158
task_queue=task_queue_name,
32-
plugins=[OpenAIAgentsPlugin()]
59+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
60+
interceptors=[context_interceptor]
3361
)
3462

3563
await worker.run(

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py

Lines changed: 150 additions & 64 deletions
Large diffs are not rendered by default.

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/pyproject.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@ version = "0.1.0"
88
description = "An AgentEx agent"
99
requires-python = ">=3.12"
1010
dependencies = [
11-
"agentex-sdk>=0.4.18",
11+
"agentex-sdk==0.6.0",
12+
"openai-agents-sdk==0.4.2",
13+
"temporalio==1.18.2",
1214
"scale-gp",
13-
"temporalio>=1.18.0,<2",
1415
]
1516

1617
[project.optional-dependencies]

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
"""
1717

1818
import os
19+
import uuid
1920

2021
import pytest
2122
import pytest_asyncio
23+
from test_utils.agentic import (
24+
poll_messages,
25+
send_event_and_poll_yielding,
26+
)
2227

2328
from agentex import AsyncAgentex
29+
from agentex.types.task_message import TaskMessage
30+
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest
2431

2532
# Configuration from environment variables
2633
AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003")
@@ -57,80 +64,74 @@ class TestNonStreamingEvents:
5764
@pytest.mark.asyncio
5865
async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
5966
"""Test sending an event and polling for the response."""
60-
# TODO: Create a task for this conversation
61-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
62-
# task = task_response.result
63-
# assert task is not None
64-
65-
# TODO: Poll for the initial task creation message (if your agent sends one)
66-
# async for message in poll_messages(
67-
# client=client,
68-
# task_id=task.id,
69-
# timeout=30,
70-
# sleep_interval=1.0,
71-
# ):
72-
# assert isinstance(message, TaskMessage)
73-
# if message.content and message.content.type == "text" and message.content.author == "agent":
74-
# # Check for your expected initial message
75-
# assert "expected initial text" in message.content.content
76-
# break
77-
78-
# TODO: Send an event and poll for response using the yielding helper function
79-
# user_message = "Your test message here"
80-
# async for message in send_event_and_poll_yielding(
81-
# client=client,
82-
# agent_id=agent_id,
83-
# task_id=task.id,
84-
# user_message=user_message,
85-
# timeout=30,
86-
# sleep_interval=1.0,
87-
# ):
88-
# assert isinstance(message, TaskMessage)
89-
# if message.content and message.content.type == "text" and message.content.author == "agent":
90-
# # Check for your expected response
91-
# assert "expected response text" in message.content.content
92-
# break
67+
task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
68+
task = task_response.result
69+
assert task is not None
70+
71+
# Poll for the initial task creation message
72+
async for message in poll_messages(
73+
client=client,
74+
task_id=task.id,
75+
timeout=30,
76+
sleep_interval=1.0,
77+
):
78+
assert isinstance(message, TaskMessage)
79+
if message.content and message.content.type == "text" and message.content.author == "agent":
80+
# Check for the Haiku Assistant welcome message
81+
assert "Haiku Assistant" in message.content.content
82+
assert "Temporal" in message.content.content
83+
break
84+
85+
# Send event and poll for response with streaming updates
86+
user_message = "Hello how is life?"
87+
print(f"[DEBUG 060 POLL] Sending message: '{user_message}'")
88+
89+
# Use yield_updates=True to get all streaming chunks as they're written
90+
final_message = None
91+
async for message in send_event_and_poll_yielding(
92+
client=client,
93+
agent_id=agent_id,
94+
task_id=task.id,
95+
user_message=user_message,
96+
timeout=30,
97+
sleep_interval=1.0,
98+
yield_updates=True, # Get updates as streaming writes chunks
99+
):
100+
if message.content and message.content.type == "text" and message.content.author == "agent":
101+
print(
102+
f"[DEBUG 060 POLL] Received update - Status: {message.streaming_status}, "
103+
f"Content length: {len(message.content.content)}"
104+
)
105+
final_message = message
106+
107+
# Stop polling once we get a DONE message
108+
if message.streaming_status == "DONE":
109+
print(f"[DEBUG 060 POLL] Streaming complete!")
110+
break
111+
112+
# Verify the final message has content (the haiku)
113+
assert final_message is not None, "Should have received an agent message"
114+
assert final_message.content is not None, "Final message should have content"
115+
assert len(final_message.content.content) > 0, "Final message should have haiku content"
116+
117+
print(f"[DEBUG 060 POLL] ✅ Successfully received haiku response!")
118+
print(f"[DEBUG 060 POLL] Final haiku:\n{final_message.content.content}")
93119
pass
94120

95121

96122
class TestStreamingEvents:
97-
"""Test streaming event sending."""
123+
"""Test streaming event sending (backend verification via polling)."""
98124

99125
@pytest.mark.asyncio
100126
async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
101-
"""Test sending an event and streaming the response."""
102-
# TODO: Create a task for this conversation
103-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
104-
# task = task_response.result
105-
# assert task is not None
127+
"""
128+
Streaming test placeholder.
106129
107-
# user_message = "Your test message here"
108-
109-
# # Collect events from stream
110-
# all_events = []
111-
112-
# async def collect_stream_events():
113-
# async for event in stream_agent_response(
114-
# client=client,
115-
# task_id=task.id,
116-
# timeout=30,
117-
# ):
118-
# all_events.append(event)
119-
120-
# # Start streaming task
121-
# stream_task = asyncio.create_task(collect_stream_events())
122-
123-
# # Send the event
124-
# event_content = TextContentParam(type="text", author="user", content=user_message)
125-
# await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})
126-
127-
# # Wait for streaming to complete
128-
# await stream_task
129-
130-
# # TODO: Add your validation here
131-
# assert len(all_events) > 0, "No events received in streaming response"
130+
NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState).
131+
Backend streaming functionality is verified in test_send_event_and_poll.
132+
"""
132133
pass
133134

134135

135136
if __name__ == "__main__":
136-
pytest.main([__file__, "-v"])
137+
pytest.main([__file__, "-v"])

examples/tutorials/10_agentic/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import os
22
import sys
3-
from datetime import timedelta
43

5-
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
4+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
65

76
# === DEBUG SETUP (AgentEx CLI Debug Support) ===
87
if os.getenv("AGENTEX_DEBUG_ENABLED") == "true":
@@ -11,20 +10,20 @@
1110
debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679"))
1211
debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp")
1312
wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true"
14-
13+
1514
# Configure debugpy
1615
debugpy.configure(subProcess=False)
1716
debugpy.listen(debug_port)
18-
17+
1918
print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}")
20-
19+
2120
if wait_for_attach:
2221
print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...")
2322
debugpy.wait_for_client()
2423
print(f"✅ [{debug_type.upper()}] Debugger attached!")
2524
else:
2625
print(f"📡 [{debug_type.upper()}] Ready for debugger attachment")
27-
26+
2827
except ImportError:
2928
print("❌ debugpy not available. Install with: pip install debugpy")
3029
sys.exit(1)
@@ -35,6 +34,13 @@
3534

3635
from agentex.lib.types.fastacp import TemporalACPConfig
3736
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
38+
TemporalStreamingModelProvider,
39+
)
40+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
41+
42+
context_interceptor = ContextInterceptor()
43+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3844

3945
# Create the ACP server
4046
acp = FastACP.create(
@@ -45,11 +51,8 @@
4551
# We are also adding the Open AI Agents SDK plugin to the ACP.
4652
type="temporal",
4753
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
48-
plugins=[OpenAIAgentsPlugin(
49-
model_params=ModelActivityParameters(
50-
start_to_close_timeout=timedelta(days=1)
51-
)
52-
)]
54+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
55+
interceptors=[context_interceptor]
5356
)
5457
)
5558

examples/tutorials/10_agentic/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
2-
from datetime import timedelta
32

4-
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
3+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
54

65
from project.workflow import ExampleTutorialWorkflow
76
from project.activities import get_weather, deposit_money, withdraw_money
@@ -10,6 +9,11 @@
109
from agentex.lib.environment_variables import EnvironmentVariables
1110
from agentex.lib.core.temporal.activities import get_all_activities
1211
from agentex.lib.core.temporal.workers.worker import AgentexWorker
12+
from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content
13+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
14+
TemporalStreamingModelProvider,
15+
)
16+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
1317

1418
environment_variables = EnvironmentVariables.refresh()
1519

@@ -19,23 +23,43 @@
1923
async def main():
2024
# Setup debug mode if enabled
2125
setup_debug_if_enabled()
22-
26+
2327
task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
2428
if task_queue_name is None:
2529
raise ValueError("WORKFLOW_TASK_QUEUE is not set")
26-
30+
2731
# Add activities to the worker
28-
all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather] # add your own activities here
29-
32+
all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather, stream_lifecycle_content] # add your own activities here
33+
34+
# ============================================================================
35+
# STREAMING SETUP: Interceptor + Model Provider
36+
# ============================================================================
37+
# This is where the streaming magic is configured! Two key components:
38+
#
39+
# 1. ContextInterceptor (StreamingInterceptor)
40+
# - Threads task_id through activity headers using Temporal's interceptor pattern
41+
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
42+
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
43+
# - This enables runtime context without forking the Temporal plugin!
44+
#
45+
# 2. TemporalStreamingModelProvider
46+
# - Returns StreamingModel instances that read task_id from ContextVar
47+
# - StreamingModel.get_response() streams tokens to Redis in real-time
48+
# - Still returns complete response to Temporal for determinism/replay safety
49+
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
50+
#
51+
# Together, these enable real-time LLM streaming while maintaining Temporal's
52+
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
53+
context_interceptor = ContextInterceptor()
54+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
55+
3056
# Create a worker with automatic tracing
31-
# We are also adding the Open AI Agents SDK plugin to the worker.
57+
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
58+
# No forking needed! The interceptor + model provider handle all streaming logic.
3259
worker = AgentexWorker(
3360
task_queue=task_queue_name,
34-
plugins=[OpenAIAgentsPlugin(
35-
model_params=ModelActivityParameters(
36-
start_to_close_timeout=timedelta(days=1)
37-
)
38-
)],
61+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
62+
interceptors=[context_interceptor],
3963
)
4064

4165
await worker.run(

0 commit comments

Comments
 (0)