Skip to content

Commit 91907f4

Browse files
committed
Add temporal tests
1 parent 7e3db91 commit 91907f4

File tree

17 files changed

+662
-302
lines changed

17 files changed

+662
-302
lines changed

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@
3434

3535
from agentex.lib.types.fastacp import TemporalACPConfig
3636
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
38+
TemporalStreamingModelProvider,
39+
)
40+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
41+
42+
context_interceptor = ContextInterceptor()
43+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3744

3845
# Create the ACP server
3946
acp = FastACP.create(
@@ -44,7 +51,8 @@
4451
# We are also adding the Open AI Agents SDK plugin to the ACP.
4552
type="temporal",
4653
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
47-
plugins=[OpenAIAgentsPlugin()]
54+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
55+
interceptors=[context_interceptor]
4856
)
4957
)
5058

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
from agentex.lib.environment_variables import EnvironmentVariables
99
from agentex.lib.core.temporal.activities import get_all_activities
1010
from agentex.lib.core.temporal.workers.worker import AgentexWorker
11+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
12+
TemporalStreamingModelProvider,
13+
)
14+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
1115

1216
environment_variables = EnvironmentVariables.refresh()
1317

@@ -24,12 +28,36 @@ async def main():
2428

2529
# Add activities to the worker
2630
all_activities = get_all_activities() + [] # add your own activities here
27-
31+
32+
# ============================================================================
33+
# STREAMING SETUP: Interceptor + Model Provider
34+
# ============================================================================
35+
# This is where the streaming magic is configured! Two key components:
36+
#
37+
# 1. ContextInterceptor (StreamingInterceptor)
38+
# - Threads task_id through activity headers using Temporal's interceptor pattern
39+
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
40+
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
41+
# - This enables runtime context without forking the Temporal plugin!
42+
#
43+
# 2. TemporalStreamingModelProvider
44+
# - Returns StreamingModel instances that read task_id from ContextVar
45+
# - StreamingModel.get_response() streams tokens to Redis in real-time
46+
# - Still returns complete response to Temporal for determinism/replay safety
47+
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
48+
#
49+
# Together, these enable real-time LLM streaming while maintaining Temporal's
50+
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
51+
context_interceptor = ContextInterceptor()
52+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
53+
2854
# Create a worker with automatic tracing
29-
# We are also adding the Open AI Agents SDK plugin to the worker.
55+
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
56+
# No forking needed! The interceptor + model provider handle all streaming logic.
3057
worker = AgentexWorker(
3158
task_queue=task_queue_name,
32-
plugins=[OpenAIAgentsPlugin()]
59+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
60+
interceptors=[context_interceptor]
3361
)
3462

3563
await worker.run(

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
This is the foundation before moving to more advanced patterns with tools and activities.
1919
"""
2020

21+
import os
2122
import json
2223

2324
from agents import Agent, Runner
@@ -39,6 +40,14 @@
3940
if environment_variables.AGENT_NAME is None:
4041
raise ValueError("Environment variable AGENT_NAME is not set")
4142

43+
# Validate OpenAI API key is set
44+
if not os.environ.get("OPENAI_API_KEY"):
45+
raise ValueError(
46+
"OPENAI_API_KEY environment variable is not set. "
47+
"This tutorial requires an OpenAI API key to run the OpenAI Agents SDK. "
48+
"Please set OPENAI_API_KEY in your environment or manifest.yaml file."
49+
)
50+
4251
logger = make_logger(__name__)
4352

4453
@workflow.defn(name=environment_variables.WORKFLOW_NAME)
@@ -59,6 +68,9 @@ class ExampleTutorialWorkflow(BaseWorkflow):
5968
def __init__(self):
6069
super().__init__(display_name=environment_variables.AGENT_NAME)
6170
self._complete_task = False
71+
self._task_id = None
72+
self._trace_id = None
73+
self._parent_span_id = None
6274

6375
@workflow.signal(name=SignalName.RECEIVE_EVENT)
6476
async def on_task_event_send(self, params: SendEventParams) -> None:
@@ -103,28 +115,55 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
103115
)
104116

105117
# ============================================================================
106-
# STEP 3: Run Agent with Temporal Durability
118+
# STREAMING SETUP: Store task_id for the Interceptor
119+
# ============================================================================
120+
# These instance variables are read by StreamingWorkflowOutboundInterceptor
121+
# which injects them into activity headers. This enables streaming without
122+
# forking the Temporal plugin!
123+
#
124+
# How it works:
125+
# 1. We store task_id in workflow instance variable (here)
126+
# 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance()
127+
# 3. Interceptor injects task_id into activity headers
128+
# 4. StreamingActivityInboundInterceptor extracts from headers
129+
# 5. Sets streaming_task_id ContextVar inside the activity
130+
# 6. StreamingModel reads from ContextVar and streams to Redis
131+
#
132+
# This approach uses STANDARD Temporal components - no forked plugin needed!
133+
self._task_id = params.task.id
134+
self._trace_id = params.task.id
135+
self._parent_span_id = params.task.id
136+
137+
# ============================================================================
138+
# STEP 3: Run Agent with Temporal Durability + Streaming
107139
# ============================================================================
108140
# This is where the magic happens! When Runner.run() executes:
109141
# 1. The OpenAI Agents SDK makes LLM calls to generate responses
110142
# 2. The plugin automatically wraps these calls as Temporal activities
111143
# 3. You'll see "invoke_model_activity" appear in the Temporal UI
112144
# 4. If the LLM call fails, Temporal retries it automatically
113145
# 5. The conversation state is preserved even if the worker restarts
114-
146+
#
147+
# STREAMING MAGIC (via Interceptors + Model Provider):
148+
# - The StreamingInterceptor threads task_id through activity headers
149+
# - The StreamingModelProvider returns a model that streams to Redis
150+
# - The model streams tokens in real-time while maintaining determinism
151+
# - Complete response is still returned to Temporal for replay safety
152+
115153
# IMPORTANT NOTE ABOUT AGENT RUN CALLS:
116154
# =====================================
117155
# Notice that we don't need to wrap the Runner.run() call in an activity!
118-
# This might feel weird for anyone who has used Temporal before, as typically
156+
# This might feel weird for anyone who has used Temporal before, as typically
119157
# non-deterministic operations like LLM calls would need to be wrapped in activities.
120-
# However, the OpenAI Agents SDK plugin is handling all of this automatically
158+
# However, the OpenAI Agents SDK plugin is handling all of this automatically
121159
# behind the scenes.
122160
#
123161
# Another benefit of this approach is that we don't have to serialize the arguments,
124-
# which would typically be the case with Temporal activities - the plugin handles
162+
# which would typically be the case with Temporal activities - the plugin handles
125163
# all of this for us, making the developer experience much smoother.
126-
164+
127165
# Pass the text content directly to Runner.run (it accepts strings)
166+
# No need to pass context - the interceptor handles task_id threading!
128167
result = await Runner.run(agent, params.event.content.content)
129168

130169
# ============================================================================

examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py

Lines changed: 57 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
"""
1717

1818
import os
19+
import uuid
1920

2021
import pytest
2122
import pytest_asyncio
23+
from test_utils.agentic import (
24+
poll_messages,
25+
send_event_and_poll_yielding,
26+
)
2227

2328
from agentex import AsyncAgentex
29+
from agentex.types.task_message import TaskMessage
30+
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest
2431

2532
# Configuration from environment variables
2633
AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003")
@@ -58,77 +65,66 @@ class TestNonStreamingEvents:
5865
async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
5966
"""Test sending an event and polling for the response."""
6067
# TODO: Create a task for this conversation
61-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
62-
# task = task_response.result
63-
# assert task is not None
64-
65-
# TODO: Poll for the initial task creation message (if your agent sends one)
66-
# async for message in poll_messages(
67-
# client=client,
68-
# task_id=task.id,
69-
# timeout=30,
70-
# sleep_interval=1.0,
71-
# ):
72-
# assert isinstance(message, TaskMessage)
73-
# if message.content and message.content.type == "text" and message.content.author == "agent":
74-
# # Check for your expected initial message
75-
# assert "expected initial text" in message.content.content
76-
# break
68+
task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
69+
task = task_response.result
70+
assert task is not None
71+
72+
# Poll for the initial task creation message
73+
async for message in poll_messages(
74+
client=client,
75+
task_id=task.id,
76+
timeout=30,
77+
sleep_interval=1.0,
78+
):
79+
assert isinstance(message, TaskMessage)
80+
if message.content and message.content.type == "text" and message.content.author == "agent":
81+
# Check for the Haiku Assistant welcome message
82+
assert "Haiku Assistant" in message.content.content
83+
assert "Temporal" in message.content.content
84+
break
7785

7886
# TODO: Send an event and poll for response using the yielding helper function
79-
# user_message = "Your test message here"
80-
# async for message in send_event_and_poll_yielding(
81-
# client=client,
82-
# agent_id=agent_id,
83-
# task_id=task.id,
84-
# user_message=user_message,
85-
# timeout=30,
86-
# sleep_interval=1.0,
87-
# ):
88-
# assert isinstance(message, TaskMessage)
89-
# if message.content and message.content.type == "text" and message.content.author == "agent":
90-
# # Check for your expected response
91-
# assert "expected response text" in message.content.content
92-
# break
87+
user_message = "Hello how is life?"
88+
print(f"[DEBUG 060 POLL] Sending message: '{user_message}'")
89+
90+
async for message in send_event_and_poll_yielding(
91+
client=client,
92+
agent_id=agent_id,
93+
task_id=task.id,
94+
user_message=user_message,
95+
timeout=30,
96+
sleep_interval=1.0,
97+
):
98+
assert isinstance(message, TaskMessage)
99+
print(f"[DEBUG 060 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}")
100+
101+
if message.content and message.content.type == "text" and message.content.author == "agent":
102+
print(f"[DEBUG 060 POLL] Agent message content length: {len(message.content.content)}")
103+
print(f"[DEBUG 060 POLL] Content preview: {message.content.content[:100] if message.content.content else '(empty)'}")
104+
105+
# Skip messages that are still streaming (IN_PROGRESS) - wait for complete messages
106+
if message.streaming_status == "IN_PROGRESS":
107+
print(f"[DEBUG 060 POLL] Skipping IN_PROGRESS message, waiting for completion...")
108+
continue
109+
110+
# Check for your expected response
111+
print(f"[DEBUG 060 POLL] ✅ Found complete agent message!")
112+
assert "life" in message.content.content.lower() or len(message.content.content) > 0
113+
break
93114
pass
94115

95116

96117
class TestStreamingEvents:
97-
"""Test streaming event sending."""
118+
"""Test streaming event sending (backend verification via polling)."""
98119

99120
@pytest.mark.asyncio
100121
async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
101-
"""Test sending an event and streaming the response."""
102-
# TODO: Create a task for this conversation
103-
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
104-
# task = task_response.result
105-
# assert task is not None
106-
107-
# user_message = "Your test message here"
108-
109-
# # Collect events from stream
110-
# all_events = []
111-
112-
# async def collect_stream_events():
113-
# async for event in stream_agent_response(
114-
# client=client,
115-
# task_id=task.id,
116-
# timeout=30,
117-
# ):
118-
# all_events.append(event)
119-
120-
# # Start streaming task
121-
# stream_task = asyncio.create_task(collect_stream_events())
122-
123-
# # Send the event
124-
# event_content = TextContentParam(type="text", author="user", content=user_message)
125-
# await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})
126-
127-
# # Wait for streaming to complete
128-
# await stream_task
122+
"""
123+
Streaming test placeholder.
129124
130-
# # TODO: Add your validation here
131-
# assert len(all_events) > 0, "No events received in streaming response"
125+
NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState).
126+
Backend streaming functionality is verified in test_send_event_and_poll.
127+
"""
132128
pass
133129

134130

examples/tutorials/10_agentic/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import os
22
import sys
3-
from datetime import timedelta
43

5-
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
4+
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin
65

76
# === DEBUG SETUP (AgentEx CLI Debug Support) ===
87
if os.getenv("AGENTEX_DEBUG_ENABLED") == "true":
@@ -11,20 +10,20 @@
1110
debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679"))
1211
debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp")
1312
wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true"
14-
13+
1514
# Configure debugpy
1615
debugpy.configure(subProcess=False)
1716
debugpy.listen(debug_port)
18-
17+
1918
print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}")
20-
19+
2120
if wait_for_attach:
2221
print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...")
2322
debugpy.wait_for_client()
2423
print(f"✅ [{debug_type.upper()}] Debugger attached!")
2524
else:
2625
print(f"📡 [{debug_type.upper()}] Ready for debugger attachment")
27-
26+
2827
except ImportError:
2928
print("❌ debugpy not available. Install with: pip install debugpy")
3029
sys.exit(1)
@@ -35,6 +34,13 @@
3534

3635
from agentex.lib.types.fastacp import TemporalACPConfig
3736
from agentex.lib.sdk.fastacp.fastacp import FastACP
37+
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
38+
TemporalStreamingModelProvider,
39+
)
40+
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor
41+
42+
context_interceptor = ContextInterceptor()
43+
temporal_streaming_model_provider = TemporalStreamingModelProvider()
3844

3945
# Create the ACP server
4046
acp = FastACP.create(
@@ -45,11 +51,8 @@
4551
# We are also adding the Open AI Agents SDK plugin to the ACP.
4652
type="temporal",
4753
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
48-
plugins=[OpenAIAgentsPlugin(
49-
model_params=ModelActivityParameters(
50-
start_to_close_timeout=timedelta(days=1)
51-
)
52-
)]
54+
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
55+
interceptors=[context_interceptor]
5356
)
5457
)
5558

0 commit comments

Comments
 (0)