Skip to content

Commit d3ecb59

Browse files
committed
Add temporal tests
1 parent 7e3db91 commit d3ecb59

File tree

17 files changed

+657
-303
lines changed

17 files changed

+657
-303
lines changed

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,11 @@
3434

3535
from agentex.lib.types.fastacp import TemporalACPConfig
3636
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
38+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import TemporalStreamingModelProvider
39+
40+
context_interceptor = ContextInterceptor()
41+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3742

3843
# Create the ACP server
3944
acp = FastACP.create(
@@ -44,7 +49,8 @@
4449
# We are also adding the Open AI Agents SDK plugin to the ACP.
4550
type="temporal",
4651
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
47-
plugins=[OpenAIAgentsPlugin()]
52+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
53+
interceptors=[context_interceptor]
4854
)
4955
)
5056

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from agentex.lib.environment_variables import EnvironmentVariables
99
from agentex.lib.core.temporal.activities import get_all_activities
1010
from agentex.lib.core.temporal.workers.worker import AgentexWorker
11+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
12+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import TemporalStreamingModelProvider
1113

1214
environment_variables = EnvironmentVariables.refresh()
1315

@@ -24,12 +26,36 @@ async def main():
2426

2527
# Add activities to the worker
2628
all_activities = get_all_activities() + [] # add your own activities here
27-
29+
30+
# ============================================================================
31+
# STREAMING SETUP: Interceptor + Model Provider
32+
# ============================================================================
33+
# This is where the streaming magic is configured! Two key components:
34+
#
35+
# 1. ContextInterceptor (StreamingInterceptor)
36+
# - Threads task_id through activity headers using Temporal's interceptor pattern
37+
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
38+
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
39+
# - This enables runtime context without forking the Temporal plugin!
40+
#
41+
# 2. TemporalStreamingModelProvider
42+
# - Returns StreamingModel instances that read task_id from ContextVar
43+
# - StreamingModel.get_response() streams tokens to Redis in real-time
44+
# - Still returns complete response to Temporal for determinism/replay safety
45+
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
46+
#
47+
# Together, these enable real-time LLM streaming while maintaining Temporal's
48+
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
49+
context_interceptor = ContextInterceptor()
50+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
51+
2852
# Create a worker with automatic tracing
29-
# We are also adding the Open AI Agents SDK plugin to the worker.
53+
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
54+
# No forking needed! The interceptor + model provider handle all streaming logic.
3055
worker = AgentexWorker(
3156
task_queue=task_queue_name,
32-
plugins=[OpenAIAgentsPlugin()]
57+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
58+
interceptors=[context_interceptor]
3359
)
3460

3561
await worker.run(

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
"""
2020

2121
import json
22+
import os
2223

2324
from agents import Agent, Runner
2425
from temporalio import workflow
@@ -39,6 +40,14 @@
3940
if environment_variables.AGENT_NAME is None:
4041
raise ValueError("Environment variable AGENT_NAME is not set")
4142

43+
# Validate OpenAI API key is set
44+
if not os.environ.get("OPENAI_API_KEY"):
45+
raise ValueError(
46+
"OPENAI_API_KEY environment variable is not set. "
47+
"This tutorial requires an OpenAI API key to run the OpenAI Agents SDK. "
48+
"Please set OPENAI_API_KEY in your environment or manifest.yaml file."
49+
)
50+
4251
logger = make_logger(__name__)
4352

4453
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
@@ -59,6 +68,9 @@ class ExampleTutorialWorkflow(BaseWorkflow):
5968
def __init__(self):
6069
super().__init__(display_name=environment_variables.AGENT_NAME)
6170
self._complete_task = False
71+
self._task_id = None
72+
self._trace_id = None
73+
self._parent_span_id = None
6274

6375
@workflow.signal(name=SignalName.RECEIVE_EVENT)
6476
async def on_task_event_send(self, params: SendEventParams) -> None:
@@ -103,28 +115,55 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
103115
)
104116

105117
# ============================================================================
106-
# STEP 3: Run Agent with Temporal Durability
118+
# STREAMING SETUP: Store task_id for the Interceptor
119+
# ============================================================================
120+
# These instance variables are read by StreamingWorkflowOutboundInterceptor
121+
# which injects them into activity headers. This enables streaming without
122+
# forking the Temporal plugin!
123+
#
124+
# How it works:
125+
# 1. We store task_id in workflow instance variable (here)
126+
# 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance()
127+
# 3. Interceptor injects task_id into activity headers
128+
# 4. StreamingActivityInboundInterceptor extracts from headers
129+
# 5. Sets streaming_task_id ContextVar inside the activity
130+
# 6. StreamingModel reads from ContextVar and streams to Redis
131+
#
132+
# This approach uses STANDARD Temporal components - no forked plugin needed!
133+
self._task_id = params.task.id
134+
self._trace_id = params.task.id
135+
self._parent_span_id = params.task.id
136+
137+
# ============================================================================
138+
# STEP 3: Run Agent with Temporal Durability + Streaming
107139
# ============================================================================
108140
# This is where the magic happens! When Runner.run() executes:
109141
# 1. The OpenAI Agents SDK makes LLM calls to generate responses
110142
# 2. The plugin automatically wraps these calls as Temporal activities
111143
# 3. You'll see "invoke_model_activity" appear in the Temporal UI
112144
# 4. If the LLM call fails, Temporal retries it automatically
113145
# 5. The conversation state is preserved even if the worker restarts
114-
146+
#
147+
# STREAMING MAGIC (via Interceptors + Model Provider):
148+
# - The StreamingInterceptor threads task_id through activity headers
149+
# - The StreamingModelProvider returns a model that streams to Redis
150+
# - The model streams tokens in real-time while maintaining determinism
151+
# - Complete response is still returned to Temporal for replay safety
152+
115153
# IMPORTANT NOTE ABOUT AGENT RUN CALLS:
116154
# =====================================
117155
# Notice that we don't need to wrap the Runner.run() call in an activity!
118-
# This might feel weird for anyone who has used Temporal before, as typically
156+
# This might feel weird for anyone who has used Temporal before, as typically
119157
# non-deterministic operations like LLM calls would need to be wrapped in activities.
120-
# However, the OpenAI Agents SDK plugin is handling all of this automatically
158+
# However, the OpenAI Agents SDK plugin is handling all of this automatically
121159
# behind the scenes.
122160
#
123161
# Another benefit of this approach is that we don't have to serialize the arguments,
124-
# which would typically be the case with Temporal activities - the plugin handles
162+
# which would typically be the case with Temporal activities - the plugin handles
125163
# all of this for us, making the developer experience much smoother.
126-
164+
127165
# Pass the text content directly to Runner.run (it accepts strings)
166+
# No need to pass context - the interceptor handles task_id threading!
128167
result = await Runner.run(agent, params.event.content.content)
129168

130169
# ============================================================================

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py

Lines changed: 60 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616
"""
1717

1818
import os
19+
import uuid
1920

2021
import pytest
2122
import pytest_asyncio
23+
from test_utils.agentic import (
24+
send_event_and_poll_yielding,
25+
poll_messages,
26+
stream_agent_response,
27+
)
2228

2329
from agentex import AsyncAgentex
24-
30+
from agentex.types import TextContent
31+
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest
32+
from agentex.types.text_content_param import TextContentParam
33+
from agentex.types.task_message import TaskMessage
2534
# Configuration from environment variables
2635
AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003")
2736
AGENT_NAME = os.environ.get("AGENT_NAME", "example-tutorial")
@@ -58,77 +67,66 @@ class TestNonStreamingEvents:
5867
async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
5968
"""Test sending an event and polling for the response."""
6069
# TODO: Create a task for this conversation
61-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
62-
# task = task_response.result
63-
# assert task is not None
64-
65-
# TODO: Poll for the initial task creation message (if your agent sends one)
66-
# async for message in poll_messages(
67-
# client=client,
68-
# task_id=task.id,
69-
# timeout=30,
70-
# sleep_interval=1.0,
71-
# ):
72-
# assert isinstance(message, TaskMessage)
73-
# if message.content and message.content.type == "text" and message.content.author == "agent":
74-
# # Check for your expected initial message
75-
# assert "expected initial text" in message.content.content
76-
# break
70+
task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
71+
task = task_response.result
72+
assert task is not None
73+
74+
# Poll for the initial task creation message
75+
async for message in poll_messages(
76+
client=client,
77+
task_id=task.id,
78+
timeout=30,
79+
sleep_interval=1.0,
80+
):
81+
assert isinstance(message, TaskMessage)
82+
if message.content and message.content.type == "text" and message.content.author == "agent":
83+
# Check for the Haiku Assistant welcome message
84+
assert "Haiku Assistant" in message.content.content
85+
assert "Temporal" in message.content.content
86+
break
7787

7888
# TODO: Send an event and poll for response using the yielding helper function
79-
# user_message = "Your test message here"
80-
# async for message in send_event_and_poll_yielding(
81-
# client=client,
82-
# agent_id=agent_id,
83-
# task_id=task.id,
84-
# user_message=user_message,
85-
# timeout=30,
86-
# sleep_interval=1.0,
87-
# ):
88-
# assert isinstance(message, TaskMessage)
89-
# if message.content and message.content.type == "text" and message.content.author == "agent":
90-
# # Check for your expected response
91-
# assert "expected response text" in message.content.content
92-
# break
89+
user_message = "Hello how is life?"
90+
print(f"[DEBUG 060 POLL] Sending message: '{user_message}'")
91+
92+
async for message in send_event_and_poll_yielding(
93+
client=client,
94+
agent_id=agent_id,
95+
task_id=task.id,
96+
user_message=user_message,
97+
timeout=30,
98+
sleep_interval=1.0,
99+
):
100+
assert isinstance(message, TaskMessage)
101+
print(f"[DEBUG 060 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}")
102+
103+
if message.content and message.content.type == "text" and message.content.author == "agent":
104+
print(f"[DEBUG 060 POLL] Agent message content length: {len(message.content.content)}")
105+
print(f"[DEBUG 060 POLL] Content preview: {message.content.content[:100] if message.content.content else '(empty)'}")
106+
107+
# Skip messages that are still streaming (IN_PROGRESS) - wait for complete messages
108+
if message.streaming_status == "IN_PROGRESS":
109+
print(f"[DEBUG 060 POLL] Skipping IN_PROGRESS message, waiting for completion...")
110+
continue
111+
112+
# Check for your expected response
113+
print(f"[DEBUG 060 POLL] ✅ Found complete agent message!")
114+
assert "life" in message.content.content.lower() or len(message.content.content) > 0
115+
break
93116
pass
94117

95118

96119
class TestStreamingEvents:
97-
"""Test streaming event sending."""
120+
"""Test streaming event sending (backend verification via polling)."""
98121

99122
@pytest.mark.asyncio
100123
async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
101-
"""Test sending an event and streaming the response."""
102-
# TODO: Create a task for this conversation
103-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
104-
# task = task_response.result
105-
# assert task is not None
106-
107-
# user_message = "Your test message here"
108-
109-
# # Collect events from stream
110-
# all_events = []
111-
112-
# async def collect_stream_events():
113-
# async for event in stream_agent_response(
114-
# client=client,
115-
# task_id=task.id,
116-
# timeout=30,
117-
# ):
118-
# all_events.append(event)
119-
120-
# # Start streaming task
121-
# stream_task = asyncio.create_task(collect_stream_events())
122-
123-
# # Send the event
124-
# event_content = TextContentParam(type="text", author="user", content=user_message)
125-
# await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})
126-
127-
# # Wait for streaming to complete
128-
# await stream_task
124+
"""
125+
Streaming test placeholder.
129126
130-
# # TODO: Add your validation here
131-
# assert len(all_events) > 0, "No events received in streaming response"
127+
NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState).
128+
Backend streaming functionality is verified in test_send_event_and_poll.
129+
"""
132130
pass
133131

134132

examples/tutorials/10_agentic/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import os
22
import sys
3-
from datetime import timedelta
43

5-
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
4+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
65

76
# === DEBUG SETUP (AgentEx CLI Debug Support) ===
87
if os.getenv("AGENTEX_DEBUG_ENABLED") == "true":
@@ -11,20 +10,20 @@
1110
debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679"))
1211
debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp")
1312
wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true"
14-
13+
1514
# Configure debugpy
1615
debugpy.configure(subProcess=False)
1716
debugpy.listen(debug_port)
18-
17+
1918
print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}")
20-
19+
2120
if wait_for_attach:
2221
print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...")
2322
debugpy.wait_for_client()
2423
print(f"✅ [{debug_type.upper()}] Debugger attached!")
2524
else:
2625
print(f"📡 [{debug_type.upper()}] Ready for debugger attachment")
27-
26+
2827
except ImportError:
2928
print("❌ debugpy not available. Install with: pip install debugpy")
3029
sys.exit(1)
@@ -35,6 +34,11 @@
3534

3635
from agentex.lib.types.fastacp import TemporalACPConfig
3736
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
38+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import TemporalStreamingModelProvider
39+
40+
context_interceptor = ContextInterceptor()
41+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3842

3943
# Create the ACP server
4044
acp = FastACP.create(
@@ -45,11 +49,8 @@
4549
# We are also adding the Open AI Agents SDK plugin to the ACP.
4650
type="temporal",
4751
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
48-
plugins=[OpenAIAgentsPlugin(
49-
model_params=ModelActivityParameters(
50-
start_to_close_timeout=timedelta(days=1)
51-
)
52-
)]
52+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
53+
interceptors=[context_interceptor]
5354
)
5455
)
5556

0 commit comments

Comments
 (0)