diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py index 7f3d1593..fcdbba15 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/acp.py @@ -34,6 +34,13 @@ from agentex.lib.types.fastacp import TemporalACPConfig from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +context_interceptor = ContextInterceptor() +temporal_streaming_model_provider = TemporalStreamingModelProvider() # Create the ACP server acp = FastACP.create( @@ -44,7 +51,8 @@ # We are also adding the Open AI Agents SDK plugin to the ACP. type="temporal", temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), - plugins=[OpenAIAgentsPlugin()] + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor] ) ) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py index 04a44c49..7e1702f5 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py @@ -8,6 +8,10 @@ from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor environment_variables = EnvironmentVariables.refresh() @@ -24,12 +28,36 @@ async def main(): # Add activities to the worker all_activities = get_all_activities() + [] # add your own activities here - + + # ============================================================================ + # STREAMING SETUP: Interceptor + Model Provider + # ============================================================================ + # This is where the streaming magic is configured! Two key components: + # + # 1. ContextInterceptor (StreamingInterceptor) + # - Threads task_id through activity headers using Temporal's interceptor pattern + # - Outbound: Reads _task_id from workflow instance, injects into activity headers + # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar + # - This enables runtime context without forking the Temporal plugin! + # + # 2. TemporalStreamingModelProvider + # - Returns StreamingModel instances that read task_id from ContextVar + # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Still returns complete response to Temporal for determinism/replay safety + # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) + # + # Together, these enable real-time LLM streaming while maintaining Temporal's + # durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin! + context_interceptor = ContextInterceptor() + temporal_streaming_model_provider = TemporalStreamingModelProvider() + # Create a worker with automatic tracing - # We are also adding the Open AI Agents SDK plugin to the worker. + # IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin + # No forking needed! The interceptor + model provider handle all streaming logic. worker = AgentexWorker( task_queue=task_queue_name, - plugins=[OpenAIAgentsPlugin()] + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor] ) await worker.run( diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py index 1c529e05..8b5e9de9 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py @@ -18,18 +18,33 @@ This is the foundation before moving to more advanced patterns with tools and activities. """ +import os import json +from typing import Any, Dict, List from agents import Agent, Runner from temporalio import workflow from agentex.lib import adk from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent +from agentex.lib.utils.model_utils import BaseModel from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import ( + add_tracing_processor_config, +) + +# Configure tracing processor (optional - only if you have SGP credentials) +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + ) +) environment_variables = EnvironmentVariables.refresh() @@ -39,46 +54,84 @@ if environment_variables.AGENT_NAME is None: raise ValueError("Environment variable AGENT_NAME is not set") +# Validate OpenAI API key is set +if not os.environ.get("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable is not set. " + "This tutorial requires an OpenAI API key to run the OpenAI Agents SDK. " + "Please set OPENAI_API_KEY in your environment or manifest.yaml file." + ) + logger = make_logger(__name__) + +class StateModel(BaseModel): + """ + State model for preserving conversation history across turns. + + This allows the agent to maintain context throughout the conversation, + making it possible to reference previous messages and build on the discussion. + """ + + input_list: List[Dict[str, Any]] + turn_number: int + + @workflow.defn(name=environment_variables.WORKFLOW_NAME) class ExampleTutorialWorkflow(BaseWorkflow): """ Hello World Temporal Workflow with OpenAI Agents SDK Integration - + This workflow demonstrates the basic pattern for integrating OpenAI Agents SDK with Temporal workflows. It shows how agent conversations become durable and observable through Temporal's workflow engine. - + KEY FEATURES: - Durable agent conversations that survive process restarts - Automatic activity creation for LLM calls (visible in Temporal UI) - Long-running workflows that can handle multiple user interactions - Full observability and monitoring through Temporal dashboard """ + def __init__(self): super().__init__(display_name=environment_variables.AGENT_NAME) self._complete_task = False + self._state: StateModel | None = None + self._task_id = None + self._trace_id = None + self._parent_span_id = None @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: """ Handle incoming user messages and respond using OpenAI Agents SDK - + This signal handler demonstrates the basic integration pattern: 1. Receive user message through Temporal signal 2. Echo message back to UI for visibility 3. Create and run OpenAI agent (automatically becomes a Temporal activity) 4. Return agent's response to user - + TEMPORAL INTEGRATION MAGIC: - - When Runner.run() executes, it automatically creates a "invoke_model_activity" + - When Runner.run() executes, it automatically creates a "invoke_model_activity" - This activity is visible in Temporal UI with full observability - If the LLM call fails, Temporal automatically retries it - The entire conversation is durable and survives process restarts """ logger.info(f"Received task message instruction: {params}") - + + if self._state is None: + raise ValueError("State is not initialized") + + # Increment turn number for tracing + self._state.turn_number += 1 + + self._task_id = params.task.id + self._trace_id = params.task.id + + # Add the user message to conversation history + self._state.input_list.append({"role": "user", "content": params.event.content.content}) + # ============================================================================ # STEP 1: Echo User Message # ============================================================================ @@ -87,58 +140,81 @@ async def on_task_event_send(self, params: SendEventParams) -> None: await adk.messages.create(task_id=params.task.id, content=params.event.content) # ============================================================================ - # STEP 2: Create OpenAI Agent + # STEP 2: Wrap execution in tracing span # ============================================================================ - # Create a simple agent using OpenAI Agents SDK. This agent will respond in haikus - # to demonstrate the basic functionality. No tools needed for this hello world example. - # - # IMPORTANT: The OpenAI Agents SDK plugin (configured in acp.py and run_worker.py) - # automatically converts agent interactions into Temporal activities for durability. - - - agent = Agent( - name="Haiku Assistant", - instructions="You are a friendly assistant who always responds in the form of a haiku. " - "Each response should be exactly 3 lines following the 5-7-5 syllable pattern.", - ) + # Create a span to track this turn of the conversation + async with adk.tracing.span( + trace_id=params.task.id, + name=f"Turn {self._state.turn_number}", + input=self._state.model_dump(), + ) as span: + self._parent_span_id = span.id if span else None - # ============================================================================ - # STEP 3: Run Agent with Temporal Durability - # ============================================================================ - # This is where the magic happens! When Runner.run() executes: - # 1. The OpenAI Agents SDK makes LLM calls to generate responses - # 2. The plugin automatically wraps these calls as Temporal activities - # 3. You'll see "invoke_model_activity" appear in the Temporal UI - # 4. If the LLM call fails, Temporal retries it automatically - # 5. The conversation state is preserved even if the worker restarts - - # IMPORTANT NOTE ABOUT AGENT RUN CALLS: - # ===================================== - # Notice that we don't need to wrap the Runner.run() call in an activity! - # This might feel weird for anyone who has used Temporal before, as typically - # non-deterministic operations like LLM calls would need to be wrapped in activities. - # However, the OpenAI Agents SDK plugin is handling all of this automatically - # behind the scenes. - # - # Another benefit of this approach is that we don't have to serialize the arguments, - # which would typically be the case with Temporal activities - the plugin handles - # all of this for us, making the developer experience much smoother. - - # Pass the text content directly to Runner.run (it accepts strings) - result = await Runner.run(agent, params.event.content.content) + # ============================================================================ + # STEP 3: Create OpenAI Agent + # ============================================================================ + # Create a simple agent using OpenAI Agents SDK. This agent will respond in haikus + # to demonstrate the basic functionality. No tools needed for this hello world example. + # + # IMPORTANT: The OpenAI Agents SDK plugin (configured in acp.py and run_worker.py) + # automatically converts agent interactions into Temporal activities for durability. - # ============================================================================ - # STEP 4: Send Response Back to User - # ============================================================================ - # Send the agent's response back to the user interface - # The agent's haiku response will be displayed in the chat - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=result.final_output, - ), - ) + agent = Agent( + name="Haiku Assistant", + instructions="You are a friendly assistant who always responds in the form of a haiku. " + "Each response should be exactly 3 lines following the 5-7-5 syllable pattern.", + ) + + # ============================================================================ + # STEP 4: Run Agent with Temporal Durability + Streaming + Conversation History + # ============================================================================ + # This is where the magic happens! When Runner.run() executes: + # 1. The OpenAI Agents SDK makes LLM calls to generate responses + # 2. The plugin automatically wraps these calls as Temporal activities + # 3. You'll see "invoke_model_activity" appear in the Temporal UI + # 4. If the LLM call fails, Temporal retries it automatically + # 5. The conversation state is preserved even if the worker restarts + # + # STREAMING MAGIC (via Interceptors + Model Provider): + # - The StreamingInterceptor threads task_id through activity headers + # - The StreamingModelProvider returns a model that streams to Redis + # - The model streams tokens in real-time while maintaining determinism + # - Complete response is still returned to Temporal for replay safety + # + # CONVERSATION HISTORY: + # - We pass self._state.input_list which contains the full conversation history + # - This allows the agent to maintain context across multiple turns + # - The agent can reference previous messages and build on the discussion + + # IMPORTANT NOTE ABOUT AGENT RUN CALLS: + # ===================================== + # Notice that we don't need to wrap the Runner.run() call in an activity! + # This might feel weird for anyone who has used Temporal before, as typically + # non-deterministic operations like LLM calls would need to be wrapped in activities. + # However, the OpenAI Agents SDK plugin is handling all of this automatically + # behind the scenes. + # + # Another benefit of this approach is that we don't have to serialize the arguments, + # which would typically be the case with Temporal activities - the plugin handles + # all of this for us, making the developer experience much smoother. + + # Pass the conversation history to Runner.run to maintain context + # The input_list contains all previous messages in OpenAI format + result = await Runner.run(agent, self._state.input_list) + + # Update the state with the assistant's response for the next turn + # The result contains the full updated conversation including the assistant's response + if hasattr(result, "messages") and result.messages: + # Extract the assistant message from the result + # OpenAI Agents SDK returns the full conversation including the new assistant message + for msg in result.messages: + # Add new assistant messages to history + # Skip messages we already have (user messages we just added) + if msg.get("role") == "assistant" and msg not in self._state.input_list: + self._state.input_list.append(msg) + + # Set span output for tracing - include full state + span.output = self._state.model_dump() # ============================================================================ # WHAT YOU'LL SEE IN TEMPORAL UI: @@ -159,18 +235,18 @@ async def on_task_event_send(self, params: SendEventParams) -> None: async def on_task_create(self, params: CreateTaskParams) -> str: """ Temporal Workflow Entry Point - Long-Running Agent Conversation - + This method runs when the workflow starts and keeps the agent conversation alive. It demonstrates Temporal's ability to run workflows for extended periods (minutes, hours, days, or even years) while maintaining full durability. - + TEMPORAL WORKFLOW LIFECYCLE: 1. Workflow starts when a task is created 2. Sends initial acknowledgment message to user 3. Waits indefinitely for user messages (handled by on_task_event_send signal) 4. Each user message triggers the signal handler which runs the OpenAI agent 5. Workflow continues running until explicitly completed or canceled - + DURABILITY BENEFITS: - Workflow survives worker restarts, deployments, infrastructure failures - All agent conversation history is preserved in Temporal's event store @@ -179,6 +255,16 @@ async def on_task_create(self, params: CreateTaskParams) -> str: """ logger.info(f"Received task create params: {params}") + # ============================================================================ + # WORKFLOW INITIALIZATION: Initialize State + # ============================================================================ + # Initialize the conversation state with an empty history + # This will be populated as the conversation progresses + self._state = StateModel( + input_list=[], + turn_number=0, + ) + # ============================================================================ # WORKFLOW INITIALIZATION: Send Welcome Message # ============================================================================ @@ -189,10 +275,10 @@ async def on_task_create(self, params: CreateTaskParams) -> str: content=TextContent( author="agent", content=f"🌸 Hello! I'm your Haiku Assistant, powered by OpenAI Agents SDK + Temporal! 🌸\n\n" - f"I'll respond to all your messages in beautiful haiku form. " - f"This conversation is now durable - even if I restart, our chat continues!\n\n" - f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" - f"Send me a message and I'll respond with a haiku! 🎋", + f"I'll respond to all your messages in beautiful haiku form. " + f"This conversation is now durable - even if I restart, our chat continues!\n\n" + f"Task created with params:\n{json.dumps(params.params, indent=2)}\n\n" + f"Send me a message and I'll respond with a haiku! 🎋", ), ) @@ -218,10 +304,10 @@ async def on_task_create(self, params: CreateTaskParams) -> str: async def complete_task_signal(self) -> None: """ Signal to gracefully complete the agent conversation workflow - + This signal can be sent to end the workflow cleanly. In a real application, you might trigger this when a user ends the conversation or after a period of inactivity. """ logger.info("Received signal to complete the agent conversation") - self._complete_task = True \ No newline at end of file + self._complete_task = True diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/pyproject.toml b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/pyproject.toml index a5216dd7..57c46347 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/pyproject.toml +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/pyproject.toml @@ -8,9 +8,10 @@ version = "0.1.0" description = "An AgentEx agent" requires-python = ">=3.12" dependencies = [ - "agentex-sdk>=0.4.18", + "agentex-sdk==0.6.0", + "openai-agents-sdk==0.4.2", + "temporalio==1.18.2", "scale-gp", - "temporalio>=1.18.0,<2", ] [project.optional-dependencies] diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py index 8cdcac93..46a0a9a7 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/tests/test_agent.py @@ -16,11 +16,18 @@ """ import os +import uuid import pytest import pytest_asyncio +from test_utils.agentic import ( + poll_messages, + send_event_and_poll_yielding, +) from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest # Configuration from environment variables AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") @@ -57,80 +64,74 @@ class TestNonStreamingEvents: @pytest.mark.asyncio async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): """Test sending an event and polling for the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None - - # TODO: Poll for the initial task creation message (if your agent sends one) - # async for message in poll_messages( - # client=client, - # task_id=task.id, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected initial message - # assert "expected initial text" in message.content.content - # break - - # TODO: Send an event and poll for response using the yielding helper function - # user_message = "Your test message here" - # async for message in send_event_and_poll_yielding( - # client=client, - # agent_id=agent_id, - # task_id=task.id, - # user_message=user_message, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected response - # assert "expected response text" in message.content.content - # break + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + # Poll for the initial task creation message + async for message in poll_messages( + client=client, + task_id=task.id, + timeout=30, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + if message.content and message.content.type == "text" and message.content.author == "agent": + # Check for the Haiku Assistant welcome message + assert "Haiku Assistant" in message.content.content + assert "Temporal" in message.content.content + break + + # Send event and poll for response with streaming updates + user_message = "Hello how is life?" + print(f"[DEBUG 060 POLL] Sending message: '{user_message}'") + + # Use yield_updates=True to get all streaming chunks as they're written + final_message = None + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message=user_message, + timeout=30, + sleep_interval=1.0, + yield_updates=True, # Get updates as streaming writes chunks + ): + if message.content and message.content.type == "text" and message.content.author == "agent": + print( + f"[DEBUG 060 POLL] Received update - Status: {message.streaming_status}, " + f"Content length: {len(message.content.content)}" + ) + final_message = message + + # Stop polling once we get a DONE message + if message.streaming_status == "DONE": + print(f"[DEBUG 060 POLL] Streaming complete!") + break + + # Verify the final message has content (the haiku) + assert final_message is not None, "Should have received an agent message" + assert final_message.content is not None, "Final message should have content" + assert len(final_message.content.content) > 0, "Final message should have haiku content" + + print(f"[DEBUG 060 POLL] ✅ Successfully received haiku response!") + print(f"[DEBUG 060 POLL] Final haiku:\n{final_message.content.content}") pass class TestStreamingEvents: - """Test streaming event sending.""" + """Test streaming event sending (backend verification via polling).""" @pytest.mark.asyncio async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): - """Test sending an event and streaming the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None + """ + Streaming test placeholder. - # user_message = "Your test message here" - - # # Collect events from stream - # all_events = [] - - # async def collect_stream_events(): - # async for event in stream_agent_response( - # client=client, - # task_id=task.id, - # timeout=30, - # ): - # all_events.append(event) - - # # Start streaming task - # stream_task = asyncio.create_task(collect_stream_events()) - - # # Send the event - # event_content = TextContentParam(type="text", author="user", content=user_message) - # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # # Wait for streaming to complete - # await stream_task - - # # TODO: Add your validation here - # assert len(all_events) > 0, "No events received in streaming response" + NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState). + Backend streaming functionality is verified in test_send_event_and_poll. + """ pass if __name__ == "__main__": - pytest.main([__file__, "-v"]) \ No newline at end of file + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py index 6f6d625a..eb7f99a5 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/acp.py @@ -1,8 +1,7 @@ import os import sys -from datetime import timedelta -from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin # === DEBUG SETUP (AgentEx CLI Debug Support) === if os.getenv("AGENTEX_DEBUG_ENABLED") == "true": @@ -11,20 +10,20 @@ debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679")) debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp") wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true" - + # Configure debugpy debugpy.configure(subProcess=False) debugpy.listen(debug_port) - + print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}") - + if wait_for_attach: print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...") debugpy.wait_for_client() print(f"✅ [{debug_type.upper()}] Debugger attached!") else: print(f"📡 [{debug_type.upper()}] Ready for debugger attachment") - + except ImportError: print("❌ debugpy not available. Install with: pip install debugpy") sys.exit(1) @@ -35,6 +34,13 @@ from agentex.lib.types.fastacp import TemporalACPConfig from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +context_interceptor = ContextInterceptor() +temporal_streaming_model_provider = TemporalStreamingModelProvider() # Create the ACP server acp = FastACP.create( @@ -45,11 +51,8 @@ # We are also adding the Open AI Agents SDK plugin to the ACP. type="temporal", temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), - plugins=[OpenAIAgentsPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(days=1) - ) - )] + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor] ) ) diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py index 49395f15..5c34a2f1 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py @@ -1,7 +1,6 @@ import asyncio -from datetime import timedelta -from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin from project.workflow import ExampleTutorialWorkflow from project.activities import get_weather, deposit_money, withdraw_money @@ -10,6 +9,11 @@ from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor environment_variables = EnvironmentVariables.refresh() @@ -19,23 +23,43 @@ async def main(): # Setup debug mode if enabled setup_debug_if_enabled() - + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE if task_queue_name is None: raise ValueError("WORKFLOW_TASK_QUEUE is not set") - + # Add activities to the worker - all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather] # add your own activities here - + all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather, stream_lifecycle_content] # add your own activities here + + # ============================================================================ + # STREAMING SETUP: Interceptor + Model Provider + # ============================================================================ + # This is where the streaming magic is configured! Two key components: + # + # 1. ContextInterceptor (StreamingInterceptor) + # - Threads task_id through activity headers using Temporal's interceptor pattern + # - Outbound: Reads _task_id from workflow instance, injects into activity headers + # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar + # - This enables runtime context without forking the Temporal plugin! + # + # 2. TemporalStreamingModelProvider + # - Returns StreamingModel instances that read task_id from ContextVar + # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Still returns complete response to Temporal for determinism/replay safety + # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) + # + # Together, these enable real-time LLM streaming while maintaining Temporal's + # durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin! + context_interceptor = ContextInterceptor() + temporal_streaming_model_provider = TemporalStreamingModelProvider() + # Create a worker with automatic tracing - # We are also adding the Open AI Agents SDK plugin to the worker. + # IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin + # No forking needed! The interceptor + model provider handle all streaming logic. worker = AgentexWorker( task_queue=task_queue_name, - plugins=[OpenAIAgentsPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(days=1) - ) - )], + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor], ) await worker.run( diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py index 49d7b9ea..99647497 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py @@ -5,7 +5,7 @@ PATTERN 1: Simple External Tools as Activities (activity_as_tool) - Convert individual Temporal activities directly into agent tools -- 1:1 mapping between tool calls and activities +- 1:1 mapping between tool calls and activities - Best for: single non-deterministic operations (API calls, DB queries) - Example: get_weather activity → weather tool @@ -19,48 +19,65 @@ WHY THIS APPROACH IS GAME-CHANGING: =================================== -There's a crucial meta-point that should be coming through here: **why is this different?** -This approach is truly transactional because of how the `await` works in Temporal workflows. -Consider a "move money" example - if the operation fails between the withdraw and deposit, -Temporal will resume exactly where it left off - the agent gets real-world flexibility even +There's a crucial meta-point that should be coming through here: **why is this different?** +This approach is truly transactional because of how the `await` works in Temporal workflows. +Consider a "move money" example - if the operation fails between the withdraw and deposit, +Temporal will resume exactly where it left off - the agent gets real-world flexibility even if systems die. -**Why even use Temporal? Why are we adding complexity?** The gain is enormous when you +**Why even use Temporal? Why are we adding complexity?** The gain is enormous when you consider what happens without it: -In a traditional approach without Temporal, if you withdraw money but then the system crashes -before depositing, you're stuck in a broken state. The money has been withdrawn, but never -deposited. In a banking scenario, you can't just "withdraw again" - the money is already gone +In a traditional approach without Temporal, if you withdraw money but then the system crashes +before depositing, you're stuck in a broken state. The money has been withdrawn, but never +deposited. In a banking scenario, you can't just "withdraw again" - the money is already gone from the source account, and your agent has no way to recover or know what state it was in. -This is why you can't build very complicated agents without this confidence in transactional +This is why you can't build very complicated agents without this confidence in transactional behavior. Temporal gives us: - **Guaranteed execution**: If the workflow starts, it will complete, even through failures - **Exact resumption**: Pick up exactly where we left off, not start over -- **Transactional integrity**: Either both operations complete, or the workflow can be designed +- **Transactional integrity**: Either both operations complete, or the workflow can be designed to handle partial completion - **Production reliability**: Build agents that can handle real-world complexity and failures -Without this foundation, agents remain fragile toys. With Temporal, they become production-ready +Without this foundation, agents remain fragile toys. With Temporal, they become production-ready systems that can handle the complexities of the real world. """ +import os import json import asyncio +from typing import Any, Dict, List from datetime import timedelta -from agents import Agent, Runner, activity_as_tool +from agents import Agent, Runner from temporalio import workflow +from temporalio.contrib import openai_agents from agentex.lib import adk from project.activities import get_weather from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent +from agentex.lib.utils.model_utils import BaseModel from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import ( + add_tracing_processor_config, +) +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks + +# Configure tracing processor (optional - only if you have SGP credentials) +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + ) +) environment_variables = EnvironmentVariables.refresh() @@ -70,50 +87,88 @@ if environment_variables.AGENT_NAME is None: raise ValueError("Environment variable AGENT_NAME is not set") +# Validate OpenAI API key is set +if not os.environ.get("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable is not set. " + "This tutorial requires an OpenAI API key to run the OpenAI Agents SDK. " + "Please set OPENAI_API_KEY in your environment or manifest.yaml file." + ) + logger = make_logger(__name__) + +class StateModel(BaseModel): + """ + State model for preserving conversation history across turns. + + This allows the agent to maintain context throughout the conversation, + making it possible to reference previous messages and build on the discussion. + """ + + input_list: List[Dict[str, Any]] + turn_number: int + + @workflow.defn(name=environment_variables.WORKFLOW_NAME) class ExampleTutorialWorkflow(BaseWorkflow): """ Minimal async workflow template for AgentEx Temporal agents. """ + def __init__(self): super().__init__(display_name=environment_variables.AGENT_NAME) self._complete_task = False + self._state: StateModel | None = None self._pending_confirmation: asyncio.Queue[str] = asyncio.Queue() + self._task_id = None + self._trace_id = None + self._parent_span_id = None @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: logger.info(f"Received task message instruction: {params}") - - # Echo back the client's message to show it in the UI. This is not done by default + + if self._state is None: + raise ValueError("State is not initialized") + + # Increment turn number for tracing + self._state.turn_number += 1 + + self._task_id = params.task.id + self._trace_id = params.task.id + + # Add the user message to conversation history + self._state.input_list.append({"role": "user", "content": params.event.content.content}) + + # Echo back the client's message to show it in the UI. This is not done by default # so the agent developer has full control over what is shown to the user. await adk.messages.create(task_id=params.task.id, content=params.event.content) # ============================================================================ # OpenAI Agents SDK + Temporal Integration: Two Patterns for Tool Creation # ============================================================================ - + # #### When to Use Activities for Tools # # You'll want to use the activity pattern for tools in the following scenarios: # - # - **API calls within the tool**: Whenever your tool makes an API call (external - # service, database, etc.), you must wrap it as an activity since these are + # - **API calls within the tool**: Whenever your tool makes an API call (external + # service, database, etc.), you must wrap it as an activity since these are # non-deterministic operations that could fail or return different results - # - **Idempotent single operations**: When the tool performs an already idempotent - # single call that you want to ensure gets executed reliably with Temporal's retry + # - **Idempotent single operations**: When the tool performs an already idempotent + # single call that you want to ensure gets executed reliably with Temporal's retry # guarantees # - # Let's start with the case where it is non-deterministic. If this is the case, we - # want this tool to be an activity to guarantee that it will be executed. The way to - # do this is to add some syntax to make the tool call an activity. Let's create a tool - # that gives us the weather and create a weather agent. For this example, we will just - # return a hard-coded string but we can easily imagine this being an API call to a - # weather service which would make it non-deterministic. First we will create a new - # file called `activities.py`. Here we will create a function to get the weather and + # Let's start with the case where it is non-deterministic. If this is the case, we + # want this tool to be an activity to guarantee that it will be executed. The way to + # do this is to add some syntax to make the tool call an activity. Let's create a tool + # that gives us the weather and create a weather agent. For this example, we will just + # return a hard-coded string but we can easily imagine this being an API call to a + # weather service which would make it non-deterministic. First we will create a new + # file called `activities.py`. Here we will create a function to get the weather and # simply add an activity annotation on top. - + # There are TWO key patterns for integrating tools with the OpenAI Agents SDK in Temporal: # # PATTERN 1: Simple External Tools as Activities @@ -145,18 +200,80 @@ async def on_task_event_send(self, params: SendEventParams) -> None: tools=[ # activity_as_tool() converts a Temporal activity into an agent tool # The get_weather activity will be executed with durability guarantees - activity_as_tool( + openai_agents.workflow.activity_as_tool( get_weather, # This is defined in activities.py as @activity.defn - start_to_close_timeout=timedelta(seconds=10) + start_to_close_timeout=timedelta(seconds=10), ), ], ) + # ============================================================================ + # STREAMING SETUP: Store task_id for the Interceptor + # ============================================================================ + # These instance variables are read by StreamingWorkflowOutboundInterceptor + # which injects them into activity headers. This enables streaming without + # forking the Temporal plugin! + # + # How streaming works (Interceptor + Model Provider + Hooks): + # 1. We store task_id in workflow instance variable (here) + # 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance() + # 3. Interceptor injects task_id into activity headers + # 4. StreamingActivityInboundInterceptor extracts from headers + # 5. Sets streaming_task_id ContextVar inside the activity + # 6. StreamingModel reads from ContextVar and streams to Redis + # 7. TemporalStreamingHooks creates placeholder messages for tool calls + # + # This approach uses STANDARD Temporal components - no forked plugin needed! + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + # ============================================================================ + # HOOKS: Create Streaming Lifecycle Messages + # ============================================================================ + # TemporalStreamingHooks integrates with OpenAI Agents SDK lifecycle events + # to create messages in the database for tool calls, reasoning, etc. + # + # What hooks do: + # - on_tool_call_start(): Creates tool_request message with arguments + # - on_tool_call_done(): Creates tool_response message with result + # - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel) + # - on_run_done(): Marks the final response as complete + # + # These hooks create the messages you see in the test output: + # - Type: tool_request - Agent deciding to call get_weather + # - Type: tool_response - Result from get_weather activity + # - Type: text - Final agent response with weather info + # + # The hooks work alongside the interceptor/model streaming to provide + # a complete view of the agent's execution in the UI. + hooks = TemporalStreamingHooks(task_id=params.task.id) + # Run the agent - when it calls the weather tool, it will create a get_weather activity - result = await Runner.run(weather_agent, params.event.content.content) + # Hooks will create messages for tool calls, interceptor enables token streaming + # Wrap in tracing span to track this turn + async with adk.tracing.span( + trace_id=params.task.id, + name=f"Turn {self._state.turn_number}", + input=self._state.model_dump(), + ) as span: + self._parent_span_id = span.id if span else None + # Pass the conversation history to Runner.run to maintain context + result = await Runner.run(weather_agent, self._state.input_list, hooks=hooks) + + # Update the state with the assistant's response for the next turn + if hasattr(result, "messages") and result.messages: + for msg in result.messages: + # Add new assistant messages to history + # Skip messages we already have (user messages we just added) + if msg.get("role") == "assistant" and msg not in self._state.input_list: + self._state.input_list.append(msg) + + # Set span output for tracing - include full state + span.output = self._state.model_dump() # ============================================================================ - # PATTERN 2: Multiple Activities Within Tools + # PATTERN 2: Multiple Activities Within Tools # ============================================================================ # Use this pattern when: # - You need multiple sequential non-deterministic operations within one tool @@ -171,7 +288,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # # BENEFITS: # - Guaranteed execution order (withdraw THEN deposit) - # - Each step is durable and retryable individually + # - Each step is durable and retryable individually # - Atomic operations from the agent's perspective # - Better than having LLM make multiple separate tool calls @@ -186,7 +303,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # move_money, # ], # ) - + # # Run the agent - when it calls move_money tool, it will create TWO activities: # # 1. withdraw_money activity # # 2. deposit_money activity (only after withdraw succeeds) @@ -195,34 +312,31 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # ============================================================================ # PATTERN COMPARISON SUMMARY: # ============================================================================ - # + # # Pattern 1 (activity_as_tool): | Pattern 2 (function_tool with activities): # - Single activity per tool call | - Multiple activities per tool call - # - 1:1 tool to activity mapping | - 1:many tool to activity mapping + # - 1:1 tool to activity mapping | - 1:many tool to activity mapping # - Simple non-deterministic ops | - Complex multi-step operations # - Let LLM sequence multiple tools | - Code controls activity sequencing # - Example: get_weather, db_lookup | - Example: money_transfer, multi_step_workflow # # BOTH patterns provide: # - Automatic retries and failure recovery - # - Full observability in Temporal UI + # - Full observability in Temporal UI # - Durable execution guarantees # - Seamless integration with OpenAI Agents SDK # ============================================================================ - # Send the agent's response back to the user - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=result.final_output, - ), - ) - @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: logger.info(f"Received task create params: {params}") + # Initialize the conversation state with an empty history + self._state = StateModel( + input_list=[], + turn_number=0, + ) + # 1. Acknowledge that the task has been created. await adk.messages.create( task_id=params.task.id, @@ -234,11 +348,11 @@ async def on_task_create(self, params: CreateTaskParams) -> str: await workflow.wait_condition( lambda: self._complete_task, - timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. + timeout=None, # Set a timeout if you want to prevent the task from running indefinitely. Generally this is not needed. Temporal can run hundreds of millions of workflows in parallel and more. Only do this if you have a specific reason to do so. ) return "Task completed" @workflow.signal async def fulfill_order_signal(self, success: bool) -> None: if success == True: - await self._pending_confirmation.put(True) \ No newline at end of file + await self._pending_confirmation.put(True) diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/pyproject.toml b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/pyproject.toml index b95b19bf..57c46347 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/pyproject.toml +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/pyproject.toml @@ -8,9 +8,10 @@ version = "0.1.0" description = "An AgentEx agent" requires-python = ">=3.12" dependencies = [ - "agentex-sdk", + "agentex-sdk==0.6.0", + "openai-agents-sdk==0.4.2", + "temporalio==1.18.2", "scale-gp", - "temporalio>=1.18.0,<2", ] [project.optional-dependencies] diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py index 8cdcac93..85691b6d 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py @@ -16,11 +16,18 @@ """ import os +import uuid import pytest import pytest_asyncio +from test_utils.agentic import ( + poll_messages, + send_event_and_poll_yielding, +) from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest # Configuration from environment variables AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") @@ -57,78 +64,91 @@ class TestNonStreamingEvents: @pytest.mark.asyncio async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): """Test sending an event and polling for the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None - - # TODO: Poll for the initial task creation message (if your agent sends one) - # async for message in poll_messages( - # client=client, - # task_id=task.id, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected initial message - # assert "expected initial text" in message.content.content - # break - - # TODO: Send an event and poll for response using the yielding helper function - # user_message = "Your test message here" - # async for message in send_event_and_poll_yielding( - # client=client, - # agent_id=agent_id, - # task_id=task.id, - # user_message=user_message, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected response - # assert "expected response text" in message.content.content - # break - pass + # Create a task for this conversation + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + # Poll for the initial task creation message + print(f"[DEBUG 070 POLL] Polling for initial task creation message...") + async for message in poll_messages( + client=client, + task_id=task.id, + timeout=30, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + if message.content and message.content.type == "text" and message.content.author == "agent": + # Check for the initial acknowledgment message + print(f"[DEBUG 070 POLL] Initial message: {message.content.content[:100]}") + assert "task" in message.content.content.lower() or "received" in message.content.content.lower() + break + + # Send an event asking about the weather in NYC and poll for response with streaming + user_message = "What is the weather in New York City?" + print(f"[DEBUG 070 POLL] Sending message: '{user_message}'") + + # Track what we've seen to ensure tool calls happened + seen_tool_request = False + seen_tool_response = False + final_message = None + + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message=user_message, + timeout=60, + sleep_interval=1.0, + yield_updates=True, # Get all streaming chunks + ): + assert isinstance(message, TaskMessage) + print(f"[DEBUG 070 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}") + + # Track tool_request messages (agent calling get_weather) + if message.content and message.content.type == "tool_request": + print(f"[DEBUG 070 POLL] ✅ Saw tool_request - agent is calling get_weather tool") + seen_tool_request = True + + # Track tool_response messages (get_weather result) + if message.content and message.content.type == "tool_response": + print(f"[DEBUG 070 POLL] ✅ Saw tool_response - get_weather returned result") + seen_tool_response = True + + # Track agent text messages and their streaming updates + if message.content and message.content.type == "text" and message.content.author == "agent": + content_length = len(message.content.content) if message.content.content else 0 + print(f"[DEBUG 070 POLL] Agent text update - Status: {message.streaming_status}, Length: {content_length}") + final_message = message + + # Stop when we get DONE status + if message.streaming_status == "DONE" and content_length > 0: + print(f"[DEBUG 070 POLL] ✅ Streaming complete!") + break + + # Verify we got all the expected pieces + assert seen_tool_request, "Expected to see tool_request message (agent calling get_weather)" + assert seen_tool_response, "Expected to see tool_response message (get_weather result)" + assert final_message is not None, "Expected to see final agent text message" + assert final_message.content is not None and len(final_message.content.content) > 0, "Final message should have content" + + # Check that the response contains the temperature (22 degrees) + # The get_weather activity returns "The weather in New York City is 22 degrees Celsius" + print(f"[DEBUG 070 POLL] Final response: {final_message.content.content}") + assert "22" in final_message.content.content, "Expected weather response to contain temperature (22 degrees)" class TestStreamingEvents: - """Test streaming event sending.""" + """Test streaming event sending (backend verification via polling).""" @pytest.mark.asyncio async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): - """Test sending an event and streaming the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None - - # user_message = "Your test message here" - - # # Collect events from stream - # all_events = [] - - # async def collect_stream_events(): - # async for event in stream_agent_response( - # client=client, - # task_id=task.id, - # timeout=30, - # ): - # all_events.append(event) - - # # Start streaming task - # stream_task = asyncio.create_task(collect_stream_events()) - - # # Send the event - # event_content = TextContentParam(type="text", author="user", content=user_message) - # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # # Wait for streaming to complete - # await stream_task + """ + Streaming test placeholder. - # # TODO: Add your validation here - # assert len(all_events) > 0, "No events received in streaming response" + NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState). + Backend streaming functionality is verified in test_send_event_and_poll. + """ pass diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py index 16523ce4..fccefd7e 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py @@ -1,8 +1,7 @@ import os import sys -from datetime import timedelta -from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin # === DEBUG SETUP (AgentEx CLI Debug Support) === if os.getenv("AGENTEX_DEBUG_ENABLED") == "true": @@ -35,21 +34,48 @@ from agentex.lib.types.fastacp import TemporalACPConfig from agentex.lib.sdk.fastacp.fastacp import FastACP +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor + +# ============================================================================ +# STREAMING SETUP: Interceptor + Model Provider +# ============================================================================ +# This is where the streaming magic is configured! Two key components: +# +# 1. ContextInterceptor (StreamingInterceptor) +# - Threads task_id through activity headers using Temporal's interceptor pattern +# - Outbound: Reads _task_id from workflow instance, injects into activity headers +# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar +# - This enables runtime context without forking the Temporal plugin! +# +# 2. TemporalStreamingModelProvider +# - Returns StreamingModel instances that read task_id from ContextVar +# - StreamingModel.get_response() streams tokens to Redis in real-time +# - Still returns complete response to Temporal for determinism/replay safety +# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) +# +# Together, these enable real-time LLM streaming while maintaining Temporal's +# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin! +context_interceptor = ContextInterceptor() +temporal_streaming_model_provider = TemporalStreamingModelProvider() # Create the ACP server +# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin +# No forking needed! The interceptor + model provider handle all streaming logic. +# +# Note: ModelActivityParameters with long timeout allows child workflows to wait +# indefinitely for human input without timing out acp = FastACP.create( acp_type="async", config=TemporalACPConfig( # When deployed to the cluster, the Temporal address will automatically be set to the cluster address # For local development, we set the address manually to talk to the local Temporal service set up via docker compose - # We are also adding the Open AI Agents SDK plugin to the ACP. type="temporal", temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"), - plugins=[OpenAIAgentsPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(days=1) - ) - )] + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor], ) ) diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py index 67aed618..53b245f3 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py @@ -1,15 +1,20 @@ import asyncio -from datetime import timedelta -from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters +from temporalio.contrib.openai_agents import OpenAIAgentsPlugin -from project.workflow import ChildWorkflow, ExampleTutorialWorkflow +from project.workflow import ExampleTutorialWorkflow from project.activities import confirm_order, deposit_money, withdraw_money +from project.child_workflow import ChildWorkflow from agentex.lib.utils.debug import setup_debug_if_enabled from agentex.lib.utils.logging import make_logger from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.activities import get_all_activities from agentex.lib.core.temporal.workers.worker import AgentexWorker +from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content +from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import ( + TemporalStreamingModelProvider, +) +from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor environment_variables = EnvironmentVariables.refresh() @@ -19,23 +24,44 @@ async def main(): # Setup debug mode if enabled setup_debug_if_enabled() - + task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE if task_queue_name is None: raise ValueError("WORKFLOW_TASK_QUEUE is not set") - + # Add activities to the worker - all_activities = get_all_activities() + [withdraw_money, deposit_money, confirm_order] # add your own activities here - + # stream_lifecycle_content is required for hooks to work (creates tool_request/tool_response messages) + all_activities = get_all_activities() + [withdraw_money, deposit_money, confirm_order, stream_lifecycle_content] # add your own activities here + + # ============================================================================ + # STREAMING SETUP: Interceptor + Model Provider + # ============================================================================ + # This is where the streaming magic is configured! Two key components: + # + # 1. ContextInterceptor (StreamingInterceptor) + # - Threads task_id through activity headers using Temporal's interceptor pattern + # - Outbound: Reads _task_id from workflow instance, injects into activity headers + # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar + # - This enables runtime context without forking the Temporal plugin! + # + # 2. TemporalStreamingModelProvider + # - Returns StreamingModel instances that read task_id from ContextVar + # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Still returns complete response to Temporal for determinism/replay safety + # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) + # + # Together, these enable real-time LLM streaming while maintaining Temporal's + # durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin! + context_interceptor = ContextInterceptor() + temporal_streaming_model_provider = TemporalStreamingModelProvider() + # Create a worker with automatic tracing - # We are also adding the Open AI Agents SDK plugin to the worker. + # IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin + # No forking needed! The interceptor + model provider handle all streaming logic. worker = AgentexWorker( task_queue=task_queue_name, - plugins=[OpenAIAgentsPlugin( - model_params=ModelActivityParameters( - start_to_close_timeout=timedelta(days=1) - ) - )], + plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)], + interceptors=[context_interceptor], ) await worker.run( diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py index c19e097a..37e5dbc5 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py @@ -10,21 +10,23 @@ - Durable waiting: Agents can wait indefinitely for human input without losing state WHY THIS MATTERS: -Without Temporal, if your system crashes while waiting for human approval, you lose -all context. With Temporal, the agent resumes exactly where it left off after +Without Temporal, if your system crashes while waiting for human approval, you lose +all context. With Temporal, the agent resumes exactly where it left off after system failures, making human-in-the-loop workflows production-ready. PATTERN: 1. Agent calls wait_for_confirmation tool -2. Tool spawns child workflow that waits for signal +2. Tool spawns child workflow that waits for signal 3. Human approves via CLI/web app 4. Child workflow completes, agent continues Usage: `temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true` """ +import os import json import asyncio +from typing import Any, Dict, List from agents import Agent, Runner from temporalio import workflow @@ -32,11 +34,25 @@ from agentex.lib import adk from project.tools import wait_for_confirmation from agentex.lib.types.acp import SendEventParams, CreateTaskParams +from agentex.lib.types.tracing import SGPTracingProcessorConfig from agentex.lib.utils.logging import make_logger from agentex.types.text_content import TextContent +from agentex.lib.utils.model_utils import BaseModel from agentex.lib.environment_variables import EnvironmentVariables from agentex.lib.core.temporal.types.workflow import SignalName from agentex.lib.core.temporal.workflows.workflow import BaseWorkflow +from agentex.lib.core.tracing.tracing_processor_manager import ( + add_tracing_processor_config, +) +from agentex.lib.core.temporal.plugins.openai_agents.hooks.hooks import TemporalStreamingHooks + +# Configure tracing processor (optional - only if you have SGP credentials) +add_tracing_processor_config( + SGPTracingProcessorConfig( + sgp_api_key=os.environ.get("SGP_API_KEY", ""), + sgp_account_id=os.environ.get("SGP_ACCOUNT_ID", ""), + ) +) environment_variables = EnvironmentVariables.refresh() @@ -46,37 +62,117 @@ if environment_variables.AGENT_NAME is None: raise ValueError("Environment variable AGENT_NAME is not set") +# Validate OpenAI API key is set +if not os.environ.get("OPENAI_API_KEY"): + raise ValueError( + "OPENAI_API_KEY environment variable is not set. " + "This tutorial requires an OpenAI API key to run the OpenAI Agents SDK. " + "Please set OPENAI_API_KEY in your environment or manifest.yaml file." + ) + logger = make_logger(__name__) + +class StateModel(BaseModel): + """ + State model for preserving conversation history across turns. + + This allows the agent to maintain context throughout the conversation, + making it possible to reference previous messages and build on the discussion. + """ + + input_list: List[Dict[str, Any]] + turn_number: int + + @workflow.defn(name=environment_variables.WORKFLOW_NAME) class ExampleTutorialWorkflow(BaseWorkflow): """ Human-in-the-Loop Temporal Workflow - + Demonstrates agents that can pause execution and wait for human approval. When approval is needed, the agent spawns a child workflow that waits for external signals (human input) before continuing. - + Benefits: Durable waiting, survives system failures, scalable to millions of workflows. """ + def __init__(self): super().__init__(display_name=environment_variables.AGENT_NAME) self._complete_task = False + self._state: StateModel | None = None self._pending_confirmation: asyncio.Queue[str] = asyncio.Queue() + self._task_id = None + self._trace_id = None + self._parent_span_id = None @workflow.signal(name=SignalName.RECEIVE_EVENT) async def on_task_event_send(self, params: SendEventParams) -> None: """ Handle user messages with human-in-the-loop approval capability. - + When the agent needs human approval, it calls wait_for_confirmation which spawns a child workflow that waits for external signals before continuing. """ logger.info(f"Received task message instruction: {params}") - + + if self._state is None: + raise ValueError("State is not initialized") + + # Increment turn number for tracing + self._state.turn_number += 1 + + self._task_id = params.task.id + self._trace_id = params.task.id + + # Add the user message to conversation history + self._state.input_list.append({"role": "user", "content": params.event.content.content}) + # Echo user message back to UI await adk.messages.create(task_id=params.task.id, content=params.event.content) + # ============================================================================ + # STREAMING SETUP: Store task_id for the Interceptor + # ============================================================================ + # These instance variables are read by StreamingWorkflowOutboundInterceptor + # which injects them into activity headers. This enables streaming without + # forking the Temporal plugin! + # + # How streaming works (Interceptor + Model Provider + Hooks): + # 1. We store task_id in workflow instance variable (here) + # 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance() + # 3. Interceptor injects task_id into activity headers + # 4. StreamingActivityInboundInterceptor extracts from headers + # 5. Sets streaming_task_id ContextVar inside the activity + # 6. StreamingModel reads from ContextVar and streams to Redis + # 7. TemporalStreamingHooks creates placeholder messages for tool calls + # + # This approach uses STANDARD Temporal components - no forked plugin needed! + self._task_id = params.task.id + self._trace_id = params.task.id + self._parent_span_id = params.task.id + + # ============================================================================ + # HOOKS: Create Streaming Lifecycle Messages + # ============================================================================ + # TemporalStreamingHooks integrates with OpenAI Agents SDK lifecycle events + # to create messages in the database for tool calls, reasoning, etc. + # + # What hooks do: + # - on_tool_call_start(): Creates tool_request message with arguments + # - on_tool_call_done(): Creates tool_response message with result + # - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel) + # - on_run_done(): Marks the final response as complete + # + # For human-in-the-loop workflows, hooks create messages showing: + # - Type: tool_request - Agent deciding to call wait_for_confirmation + # - Type: tool_response - Result after human approval (child workflow completion) + # - Type: text - Final agent response after approval received + # + # The hooks work alongside the interceptor/model streaming to provide + # a complete view of the agent's execution in the UI. + hooks = TemporalStreamingHooks(task_id=params.task.id) + # Create agent with human-in-the-loop capability # The wait_for_confirmation tool spawns a child workflow that waits for external signals confirm_order_agent = Agent( @@ -88,27 +184,44 @@ async def on_task_event_send(self, params: SendEventParams) -> None: ) # Run agent - when human approval is needed, it will spawn child workflow and wait - result = await Runner.run(confirm_order_agent, params.event.content.content) - - # Send response back to user (includes result of any human approval process) - await adk.messages.create( - task_id=params.task.id, - content=TextContent( - author="agent", - content=result.final_output, - ), - ) + # Hooks will create messages for tool calls, interceptor enables token streaming + # Wrap in tracing span to track this turn + async with adk.tracing.span( + trace_id=params.task.id, + name=f"Turn {self._state.turn_number}", + input=self._state.model_dump(), + ) as span: + self._parent_span_id = span.id if span else None + # Pass the conversation history to Runner.run to maintain context + result = await Runner.run(confirm_order_agent, self._state.input_list, hooks=hooks) + + # Update the state with the assistant's response for the next turn + if hasattr(result, "messages") and result.messages: + for msg in result.messages: + # Add new assistant messages to history + # Skip messages we already have (user messages we just added) + if msg.get("role") == "assistant" and msg not in self._state.input_list: + self._state.input_list.append(msg) + + # Set span output for tracing - include full state + span.output = self._state.model_dump() @workflow.run async def on_task_create(self, params: CreateTaskParams) -> str: """ Workflow entry point - starts the long-running human-in-the-loop agent. - + Handles both automated decisions and human approval workflows durably. To approve waiting actions: temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true """ logger.info(f"Received task create params: {params}") + # Initialize the conversation state with an empty history + self._state = StateModel( + input_list=[], + turn_number=0, + ) + # Send welcome message when task is created await adk.messages.create( task_id=params.task.id, @@ -130,6 +243,6 @@ async def on_task_create(self, params: CreateTaskParams) -> str: # - Main workflow shows agent activities + ChildWorkflow activity when approval needed # - Child workflow appears as separate "child-workflow-id" that waits for signal # - Timeline: invoke_model_activity → ChildWorkflow (waiting) → invoke_model_activity (after approval) - # + # # To approve: temporal workflow signal --workflow-id="child-workflow-id" --name="fulfill_order_signal" --input=true # Production: Replace CLI with web dashboards/APIs that send signals programmatically diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/pyproject.toml b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/pyproject.toml index b95b19bf..57c46347 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/pyproject.toml +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/pyproject.toml @@ -8,9 +8,10 @@ version = "0.1.0" description = "An AgentEx agent" requires-python = ">=3.12" dependencies = [ - "agentex-sdk", + "agentex-sdk==0.6.0", + "openai-agents-sdk==0.4.2", + "temporalio==1.18.2", "scale-gp", - "temporalio>=1.18.0,<2", ] [project.optional-dependencies] diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py index 8cdcac93..c229aa18 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/tests/test_agent.py @@ -1,30 +1,46 @@ """ -Sample tests for AgentEx ACP agent. +Sample tests for AgentEx ACP agent with Human-in-the-Loop workflow. -This test suite demonstrates how to test the main AgentEx API functions: +This test suite demonstrates how to test human-in-the-loop workflows: - Non-streaming event sending and polling -- Streaming event sending +- Detecting when workflow is waiting for human approval +- Sending Temporal signals to approve/reject +- Verifying workflow completes after approval To run these tests: 1. Make sure the agent is running (via docker-compose or `agentex agents run`) -2. Set the AGENTEX_API_BASE_URL environment variable if not using default -3. Run: pytest test_agent.py -v +2. Make sure Temporal is running (localhost:7233) +3. Set the AGENTEX_API_BASE_URL environment variable if not using default +4. Run: pytest test_agent.py -v Configuration: - AGENTEX_API_BASE_URL: Base URL for the AgentEx server (default: http://localhost:5003) - AGENT_NAME: Name of the agent to test (default: example-tutorial) +- TEMPORAL_ADDRESS: Temporal server address (default: localhost:7233) """ import os +import uuid +import asyncio import pytest import pytest_asyncio +# Temporal imports for signaling child workflows +from temporalio.client import Client as TemporalClient +from test_utils.agentic import ( + poll_messages, + send_event_and_poll_yielding, +) + from agentex import AsyncAgentex +from agentex.types.task_message import TaskMessage +from agentex.types.agent_rpc_params import ParamsCreateTaskRequest # Configuration from environment variables AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003") AGENT_NAME = os.environ.get("AGENT_NAME", "example-tutorial") +TEMPORAL_ADDRESS = os.environ.get("TEMPORAL_ADDRESS", "localhost:7233") @pytest_asyncio.fixture @@ -35,6 +51,14 @@ async def client(): await client.close() +@pytest_asyncio.fixture +async def temporal_client(): + """Create a Temporal client for sending signals to workflows.""" + client = await TemporalClient.connect(TEMPORAL_ADDRESS) + yield client + # Temporal client doesn't need explicit close in recent versions + + @pytest.fixture def agent_name(): """Return the agent name for testing.""" @@ -52,85 +76,126 @@ async def agent_id(client, agent_name): class TestNonStreamingEvents: - """Test non-streaming event sending and polling.""" + """Test non-streaming event sending and polling with human-in-the-loop.""" @pytest.mark.asyncio - async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): - """Test sending an event and polling for the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None - - # TODO: Poll for the initial task creation message (if your agent sends one) - # async for message in poll_messages( - # client=client, - # task_id=task.id, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected initial message - # assert "expected initial text" in message.content.content - # break - - # TODO: Send an event and poll for response using the yielding helper function - # user_message = "Your test message here" - # async for message in send_event_and_poll_yielding( - # client=client, - # agent_id=agent_id, - # task_id=task.id, - # user_message=user_message, - # timeout=30, - # sleep_interval=1.0, - # ): - # assert isinstance(message, TaskMessage) - # if message.content and message.content.type == "text" and message.content.author == "agent": - # # Check for your expected response - # assert "expected response text" in message.content.content - # break - pass + async def test_send_event_and_poll_with_human_approval(self, client: AsyncAgentex, agent_id: str, temporal_client: TemporalClient): + """Test sending an event that triggers human approval workflow.""" + # Create a task for this conversation + task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) + task = task_response.result + assert task is not None + + # Poll for the initial task creation message + print(f"[DEBUG 080 POLL] Polling for initial task creation message...") + async for message in poll_messages( + client=client, + task_id=task.id, + timeout=30, + sleep_interval=1.0, + ): + assert isinstance(message, TaskMessage) + if message.content and message.content.type == "text" and message.content.author == "agent": + # Check for the initial acknowledgment message + print(f"[DEBUG 080 POLL] Initial message: {message.content.content[:100]}") + assert "task" in message.content.content.lower() or "received" in message.content.content.lower() + break + + # Send an event asking to confirm an order (triggers human-in-the-loop) + user_message = "Please confirm my order" + print(f"[DEBUG 080 POLL] Sending message: '{user_message}'") + + # Track what we've seen to ensure human-in-the-loop flow happened + seen_tool_request = False + seen_tool_response = False + found_final_response = False + child_workflow_detected = False + + # Start polling for messages in the background + async def poll_and_detect(): + nonlocal seen_tool_request, seen_tool_response, found_final_response, child_workflow_detected + + async for message in send_event_and_poll_yielding( + client=client, + agent_id=agent_id, + task_id=task.id, + user_message=user_message, + timeout=120, # Longer timeout for human-in-the-loop + sleep_interval=1.0, + yield_updates=True, # Get all streaming chunks + ): + assert isinstance(message, TaskMessage) + print(f"[DEBUG 080 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}") + + # Track tool_request messages (agent calling wait_for_confirmation) + if message.content and message.content.type == "tool_request": + print(f"[DEBUG 080 POLL] ✅ Saw tool_request - agent is calling wait_for_confirmation tool") + print(f"[DEBUG 080 POLL] 🔔 Child workflow should be spawned - will signal it to approve") + seen_tool_request = True + child_workflow_detected = True + + # Track tool_response messages (child workflow completion) + if message.content and message.content.type == "tool_response": + print(f"[DEBUG 080 POLL] ✅ Saw tool_response - child workflow completed after approval") + seen_tool_response = True + + # Track agent text messages and their streaming updates + if message.content and message.content.type == "text" and message.content.author == "agent": + content_length = len(message.content.content) if message.content.content else 0 + print(f"[DEBUG 080 POLL] Agent text update - Status: {message.streaming_status}, Length: {content_length}") + + # Stop when we get DONE status with actual content + if message.streaming_status == "DONE" and content_length > 0: + print(f"[DEBUG 080 POLL] ✅ Streaming complete!") + found_final_response = True + break + + # Start polling task + polling_task = asyncio.create_task(poll_and_detect()) + + # Wait a bit for the child workflow to be created + print(f"[DEBUG 080 POLL] Waiting for child workflow to spawn...") + await asyncio.sleep(5) + + # Send signal to child workflow to approve the order + # The child workflow ID is fixed as "child-workflow-id" (see tools.py) + try: + print(f"[DEBUG 080 POLL] Sending approval signal to child workflow...") + handle = temporal_client.get_workflow_handle("child-workflow-id") + await handle.signal("fulfill_order_signal", True) + print(f"[DEBUG 080 POLL] ✅ Approval signal sent successfully!") + except Exception as e: + print(f"[DEBUG 080 POLL] ⚠️ Warning: Could not send signal to child workflow: {e}") + print(f"[DEBUG 080 POLL] This may be expected if workflow completed before signal could be sent") + + # Wait for polling to complete + try: + await asyncio.wait_for(polling_task, timeout=60) + except asyncio.TimeoutError: + print(f"[DEBUG 080 POLL] ⚠️ Polling timed out - workflow may still be waiting") + polling_task.cancel() + + # Verify that we saw the complete flow: tool_request -> human approval -> tool_response -> final answer + assert seen_tool_request, "Expected to see tool_request message (agent calling wait_for_confirmation)" + assert seen_tool_response, "Expected to see tool_response message (child workflow completion after approval)" + assert found_final_response, "Expected to see final text response after human approval" + + print(f"[DEBUG 080 POLL] ✅ Human-in-the-loop workflow completed successfully!") class TestStreamingEvents: - """Test streaming event sending.""" + """Test streaming event sending (backend verification via polling).""" @pytest.mark.asyncio async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str): - """Test sending an event and streaming the response.""" - # TODO: Create a task for this conversation - # task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex)) - # task = task_response.result - # assert task is not None - - # user_message = "Your test message here" - - # # Collect events from stream - # all_events = [] - - # async def collect_stream_events(): - # async for event in stream_agent_response( - # client=client, - # task_id=task.id, - # timeout=30, - # ): - # all_events.append(event) - - # # Start streaming task - # stream_task = asyncio.create_task(collect_stream_events()) - - # # Send the event - # event_content = TextContentParam(type="text", author="user", content=user_message) - # await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content}) - - # # Wait for streaming to complete - # await stream_task + """ + Streaming test placeholder. - # # TODO: Add your validation here - # assert len(all_events) > 0, "No events received in streaming response" + NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState). + Backend streaming functionality is verified in test_send_event_and_poll_with_human_approval. + """ pass if __name__ == "__main__": - pytest.main([__file__, "-v"]) \ No newline at end of file + pytest.main([__file__, "-v"]) diff --git a/examples/tutorials/run_all_async_tests.sh b/examples/tutorials/run_all_async_tests.sh index 9b261ab4..b175edc3 100755 --- a/examples/tutorials/run_all_async_tests.sh +++ b/examples/tutorials/run_all_async_tests.sh @@ -60,6 +60,9 @@ ALL_TUTORIALS=( "10_async/10_temporal/000_hello_acp" "10_async/10_temporal/010_agent_chat" "10_async/10_temporal/020_state_machine" + "10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world" + "10_agentic/10_temporal/070_open_ai_agents_sdk_tools" + "10_agentic/10_temporal/080_open_ai_agents_sdk_human_in_the_loop" ) PASSED=0 diff --git a/src/agentex/lib/adk/providers/_modules/openai.py b/src/agentex/lib/adk/providers/_modules/openai.py index ac9f7907..5d1e7ecc 100644 --- a/src/agentex/lib/adk/providers/_modules/openai.py +++ b/src/agentex/lib/adk/providers/_modules/openai.py @@ -1,5 +1,6 @@ from __future__ import annotations +import sys from typing import Any, Literal from datetime import timedelta @@ -12,6 +13,12 @@ from agents.agent_output import AgentOutputSchemaBase from agents.model_settings import ModelSettings +# Use warnings.deprecated in Python 3.13+, typing_extensions.deprecated for older versions +if sys.version_info >= (3, 13): + from warnings import deprecated +else: + from typing_extensions import deprecated + from agentex.lib.utils.logging import make_logger from agentex.lib.utils.temporal import in_temporal_workflow from agentex.lib.core.tracing.tracer import AsyncTracer @@ -383,6 +390,10 @@ async def run_agent_streamed( previous_response_id=previous_response_id, ) + @deprecated( + "Use the OpenAI Agents SDK integration with Temporal instead. " + "See examples in tutorials/10_agentic/10_temporal/ for migration guidance." + ) async def run_agent_streamed_auto_send( self, task_id: str, @@ -413,6 +424,10 @@ async def run_agent_streamed_auto_send( """ Run an agent with streaming enabled and automatic TaskMessage creation. + .. deprecated:: + Use the OpenAI Agents SDK integration with Temporal instead. + See examples in tutorials/10_agentic/10_temporal/ for migration guidance. + Args: task_id: The ID of the task to run the agent for. input_list: List of input data for the agent. @@ -494,4 +509,4 @@ async def run_agent_streamed_auto_send( output_guardrails=output_guardrails, max_turns=max_turns, previous_response_id=previous_response_id, - ) + ) \ No newline at end of file diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index cde08606..94ea4876 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -62,6 +62,43 @@ # Create logger for this module logger = logging.getLogger("agentex.temporal.streaming") + +def _serialize_item(item: Any) -> dict[str, Any]: + """ + Universal serializer for any item type from OpenAI Agents SDK. + + Uses model_dump() for Pydantic models, otherwise extracts attributes manually. + Filters out internal Pydantic fields that can't be serialized. + """ + if hasattr(item, 'model_dump'): + # Pydantic model - use model_dump for proper serialization + try: + return item.model_dump(mode='json', exclude_unset=True) + except Exception: + # Fallback to dict conversion + return dict(item) if hasattr(item, '__iter__') else {} + else: + # Not a Pydantic model - extract attributes manually + item_dict = {} + for attr_name in dir(item): + if not attr_name.startswith('_') and attr_name not in ('model_fields', 'model_config', 'model_computed_fields'): + try: + attr_value = getattr(item, attr_name, None) + # Skip methods and None values + if attr_value is not None and not callable(attr_value): + # Convert to JSON-serializable format + if hasattr(attr_value, 'model_dump'): + item_dict[attr_name] = attr_value.model_dump() + elif isinstance(attr_value, (str, int, float, bool, list, dict)): + item_dict[attr_name] = attr_value + else: + item_dict[attr_name] = str(attr_value) + except Exception: + # Skip attributes that can't be accessed + pass + return item_dict + + class TemporalStreamingModel(Model): """Custom model implementation with streaming support.""" @@ -739,6 +776,35 @@ async def get_response( output_tokens_details=OutputTokensDetails(reasoning_tokens=len(''.join(reasoning_contents)) // 4), # Approximate ) + # Serialize response output items for span tracing + new_items = [] + final_output = None + + for item in response_output: + try: + item_dict = _serialize_item(item) + if item_dict: + new_items.append(item_dict) + + # Extract final_output from message type if available + if item_dict.get('type') == 'message' and not final_output: + content = item_dict.get('content', []) + if content and isinstance(content, list): + for content_part in content: + if isinstance(content_part, dict) and 'text' in content_part: + final_output = content_part['text'] + break + except Exception as e: + logger.warning(f"Failed to serialize item in temporal_streaming_model: {e}") + continue + + # Set span output with structured data + if span: + span.output = { + "new_items": new_items, + "final_output": final_output, + } + # Return the response return ModelResponse( output=response_output, diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_tracing_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_tracing_model.py index ae0b79e5..c74a816b 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_tracing_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_tracing_model.py @@ -7,9 +7,10 @@ The key innovation is that these are thin wrappers around the standard OpenAI models, avoiding code duplication while adding tracing capabilities. """ +from __future__ import annotations import logging -from typing import List, Union, Optional, override +from typing import Any, List, Union, Optional, override from agents import ( Tool, @@ -41,6 +42,42 @@ logger = logging.getLogger("agentex.temporal.tracing") +def _serialize_item(item: Any) -> dict[str, Any]: + """ + Universal serializer for any item type from OpenAI Agents SDK. + + Uses model_dump() for Pydantic models, otherwise extracts attributes manually. + Filters out internal Pydantic fields that can't be serialized. + """ + if hasattr(item, 'model_dump'): + # Pydantic model - use model_dump for proper serialization + try: + return item.model_dump(mode='json', exclude_unset=True) + except Exception: + # Fallback to dict conversion + return dict(item) if hasattr(item, '__iter__') else {} + else: + # Not a Pydantic model - extract attributes manually + item_dict = {} + for attr_name in dir(item): + if not attr_name.startswith('_') and attr_name not in ('model_fields', 'model_config', 'model_computed_fields'): + try: + attr_value = getattr(item, attr_name, None) + # Skip methods and None values + if attr_value is not None and not callable(attr_value): + # Convert to JSON-serializable format + if hasattr(attr_value, 'model_dump'): + item_dict[attr_name] = attr_value.model_dump() + elif isinstance(attr_value, (str, int, float, bool, list, dict)): + item_dict[attr_name] = attr_value + else: + item_dict[attr_name] = str(attr_value) + except Exception: + # Skip attributes that can't be accessed + pass + return item_dict + + class TemporalTracingModelProvider(OpenAIProvider): """Model provider that returns OpenAI models wrapped with AgentEx tracing. @@ -171,15 +208,35 @@ async def get_response( **kwargs, ) - # Add response info to span output + # Serialize response output items for span tracing + new_items = [] + final_output = None + + if hasattr(response, 'output') and response.output: + response_output = response.output if isinstance(response.output, list) else [response.output] + + for item in response_output: + try: + item_dict = _serialize_item(item) + if item_dict: + new_items.append(item_dict) + + # Extract final_output from message type if available + if item_dict.get('type') == 'message' and not final_output: + content = item_dict.get('content', []) + if content and isinstance(content, list): + for content_part in content: + if isinstance(content_part, dict) and 'text' in content_part: + final_output = content_part['text'] + break + except Exception as e: + logger.warning(f"Failed to serialize item in temporal tracing model: {e}") + continue + + # Set span output with structured data span.output = { # type: ignore[attr-defined] - "response_id": getattr(response, "id", None), - "model_used": getattr(response, "model", None), - "usage": { - "input_tokens": response.usage.input_tokens if response.usage else None, - "output_tokens": response.usage.output_tokens if response.usage else None, - "total_tokens": response.usage.total_tokens if response.usage else None, - } if response.usage else None, + "new_items": new_items, + "final_output": final_output, } return response @@ -284,15 +341,35 @@ async def get_response( **kwargs, ) - # Add response info to span output + # Serialize response output items for span tracing + new_items = [] + final_output = None + + if hasattr(response, 'output') and response.output: + response_output = response.output if isinstance(response.output, list) else [response.output] + + for item in response_output: + try: + item_dict = _serialize_item(item) + if item_dict: + new_items.append(item_dict) + + # Extract final_output from message type if available + if item_dict.get('type') == 'message' and not final_output: + content = item_dict.get('content', []) + if content and isinstance(content, list): + for content_part in content: + if isinstance(content_part, dict) and 'text' in content_part: + final_output = content_part['text'] + break + except Exception as e: + logger.warning(f"Failed to serialize item in temporal tracing model: {e}") + continue + + # Set span output with structured data span.output = { # type: ignore[attr-defined] - "response_id": getattr(response, "id", None), - "model_used": getattr(response, "model", None), - "usage": { - "input_tokens": response.usage.input_tokens if response.usage else None, - "output_tokens": response.usage.output_tokens if response.usage else None, - "total_tokens": response.usage.total_tokens if response.usage else None, - } if response.usage else None, + "new_items": new_items, + "final_output": final_output, } return response