From 00601137d242b153b90b685e9e9df45dcb7ce494 Mon Sep 17 00:00:00 2001 From: Daniel Miller Date: Thu, 6 Nov 2025 08:51:04 -0500 Subject: [PATCH] Update tests --- .../manifest.yaml | 12 ++--- .../project/run_worker.py | 6 +-- .../project/workflow.py | 4 +- .../manifest.yaml | 6 +-- .../project/run_worker.py | 6 +-- .../project/workflow.py | 10 ++-- .../tests/test_agent.py | 5 +- .../manifest.yaml | 6 +-- .../project/acp.py | 6 +-- .../project/run_worker.py | 6 +-- .../project/workflow.py | 10 ++-- examples/tutorials/run_all_async_tests.sh | 6 +-- examples/tutorials/test_utils/async_utils.py | 52 +++++++++++++++---- 13 files changed, 82 insertions(+), 53 deletions(-) diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml index b5da9121..e515c3b1 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/manifest.yaml @@ -96,16 +96,16 @@ agent: - env_var_name: REDIS_URL secret_name: redis-url-secret secret_key: url - # - env_var_name: OPENAI_API_KEY - # secret_name: openai-api-key - # secret_key: api-key + # - env_var_name: OPENAI_API_KEY + # secret_name: openai-api-key + # secret_key: api-key # Optional: Set Environment variables for running your agent locally as well # as for deployment later on - env: {} - # OPENAI_API_KEY: "" + env: + OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" - # OPENAI_ORG_ID: "" + OPENAI_ORG_ID: "" # Deployment Configuration diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py index 70886a54..df281b58 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/run_worker.py @@ -34,15 +34,15 @@ async def main(): # ============================================================================ # This is where the streaming magic is configured! Two key components: # - # 1. ContextInterceptor (StreamingInterceptor) + # 1. ContextInterceptor # - Threads task_id through activity headers using Temporal's interceptor pattern # - Outbound: Reads _task_id from workflow instance, injects into activity headers # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar # - This enables runtime context without forking the Temporal plugin! # # 2. TemporalStreamingModelProvider - # - Returns StreamingModel instances that read task_id from ContextVar - # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Returns TemporalStreamingModel instances that read task_id from ContextVar + # - TemporalStreamingModel.get_response() streams tokens to Redis in real-time # - Still returns complete response to Temporal for determinism/replay safety # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) # diff --git a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py index 650c3674..e01f40ce 100644 --- a/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/060_open_ai_agents_sdk_hello_world/project/workflow.py @@ -176,8 +176,8 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # 5. The conversation state is preserved even if the worker restarts # # STREAMING MAGIC (via Interceptors + Model Provider): - # - The StreamingInterceptor threads task_id through activity headers - # - The StreamingModelProvider returns a model that streams to Redis + # - The ContextInterceptor threads task_id through activity headers + # - The TemporalStreamingModelProvider returns a model that streams to Redis # - The model streams tokens in real-time while maintaining determinism # - Complete response is still returned to Temporal for replay safety # diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml index 286a5c28..265b08f6 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/manifest.yaml @@ -100,10 +100,10 @@ agent: # Optional: Set Environment variables for running your agent locally as well # as for deployment later on - env: {} - # OPENAI_API_KEY: "" + env: + OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" - # OPENAI_ORG_ID: "" + OPENAI_ORG_ID: "" # Deployment Configuration diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py index a5d8db0d..4aa50e18 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/run_worker.py @@ -36,15 +36,15 @@ async def main(): # ============================================================================ # This is where the streaming magic is configured! Two key components: # - # 1. ContextInterceptor (StreamingInterceptor) + # 1. ContextInterceptor # - Threads task_id through activity headers using Temporal's interceptor pattern # - Outbound: Reads _task_id from workflow instance, injects into activity headers # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar # - This enables runtime context without forking the Temporal plugin! # # 2. TemporalStreamingModelProvider - # - Returns StreamingModel instances that read task_id from ContextVar - # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Returns TemporalStreamingModel instances that read task_id from ContextVar + # - TemporalStreamingModel.get_response() streams tokens to Redis in real-time # - Still returns complete response to Temporal for determinism/replay safety # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) # diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py index ef4da1df..2204d3a0 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/project/workflow.py @@ -210,17 +210,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # ============================================================================ # STREAMING SETUP: Store task_id for the Interceptor # ============================================================================ - # These instance variables are read by StreamingWorkflowOutboundInterceptor + # These instance variables are read by ContextWorkflowOutboundInterceptor # which injects them into activity headers. This enables streaming without # forking the Temporal plugin! # # How streaming works (Interceptor + Model Provider + Hooks): # 1. We store task_id in workflow instance variable (here) - # 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance() + # 2. ContextWorkflowOutboundInterceptor reads it via workflow.instance() # 3. Interceptor injects task_id into activity headers - # 4. StreamingActivityInboundInterceptor extracts from headers + # 4. ContextActivityInboundInterceptor extracts from headers # 5. Sets streaming_task_id ContextVar inside the activity - # 6. StreamingModel reads from ContextVar and streams to Redis + # 6. TemporalStreamingModel reads from ContextVar and streams to Redis # 7. TemporalStreamingHooks creates placeholder messages for tool calls # # This approach uses STANDARD Temporal components - no forked plugin needed! @@ -237,7 +237,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # What hooks do: # - on_tool_call_start(): Creates tool_request message with arguments # - on_tool_call_done(): Creates tool_response message with result - # - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel) + # - on_model_stream_part(): Called for each streaming chunk (handled by TemporalStreamingModel) # - on_run_done(): Marks the final response as complete # # These hooks create the messages you see in the test output: diff --git a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py index 496f3b96..d6fdc6ff 100644 --- a/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py +++ b/examples/tutorials/10_async/10_temporal/070_open_ai_agents_sdk_tools/tests/test_agent.py @@ -99,9 +99,8 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str): task_id=task.id, user_message=user_message, timeout=60, - sleep_interval=1.0, - yield_updates=True, # Get all streaming chunks - ): + sleep_interval=1.0 + ): assert isinstance(message, TaskMessage) print(f"[DEBUG 070 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}") diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml index 9562ec94..d12ffe47 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/manifest.yaml @@ -102,10 +102,10 @@ agent: # Optional: Set Environment variables for running your agent locally as well # as for deployment later on - env: {} - # OPENAI_API_KEY: "" # Set this in your shell environment instead + env: + OPENAI_API_KEY: "" # OPENAI_BASE_URL: "" - # OPENAI_ORG_ID: "" + OPENAI_ORG_ID: "" # Deployment Configuration diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py index fccefd7e..c05effdb 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/acp.py @@ -44,15 +44,15 @@ # ============================================================================ # This is where the streaming magic is configured! Two key components: # -# 1. ContextInterceptor (StreamingInterceptor) +# 1. ContextInterceptor # - Threads task_id through activity headers using Temporal's interceptor pattern # - Outbound: Reads _task_id from workflow instance, injects into activity headers # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar # - This enables runtime context without forking the Temporal plugin! # # 2. TemporalStreamingModelProvider -# - Returns StreamingModel instances that read task_id from ContextVar -# - StreamingModel.get_response() streams tokens to Redis in real-time +# - Returns TemporalStreamingModel instances that read task_id from ContextVar +# - TemporalStreamingModel.get_response() streams tokens to Redis in real-time # - Still returns complete response to Temporal for determinism/replay safety # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) # diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py index 187017e4..a07439fd 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/run_worker.py @@ -38,15 +38,15 @@ async def main(): # ============================================================================ # This is where the streaming magic is configured! Two key components: # - # 1. ContextInterceptor (StreamingInterceptor) + # 1. ContextInterceptor # - Threads task_id through activity headers using Temporal's interceptor pattern # - Outbound: Reads _task_id from workflow instance, injects into activity headers # - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar # - This enables runtime context without forking the Temporal plugin! # # 2. TemporalStreamingModelProvider - # - Returns StreamingModel instances that read task_id from ContextVar - # - StreamingModel.get_response() streams tokens to Redis in real-time + # - Returns TemporalStreamingModel instances that read task_id from ContextVar + # - TemporalStreamingModel.get_response() streams tokens to Redis in real-time # - Still returns complete response to Temporal for determinism/replay safety # - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id}) # diff --git a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py index 09d15862..4f11ac4c 100644 --- a/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py +++ b/examples/tutorials/10_async/10_temporal/080_open_ai_agents_sdk_human_in_the_loop/project/workflow.py @@ -134,17 +134,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # ============================================================================ # STREAMING SETUP: Store task_id for the Interceptor # ============================================================================ - # These instance variables are read by StreamingWorkflowOutboundInterceptor + # These instance variables are read by ContextWorkflowOutboundInterceptor # which injects them into activity headers. This enables streaming without # forking the Temporal plugin! # # How streaming works (Interceptor + Model Provider + Hooks): # 1. We store task_id in workflow instance variable (here) - # 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance() + # 2. ContextWorkflowOutboundInterceptor reads it via workflow.instance() # 3. Interceptor injects task_id into activity headers - # 4. StreamingActivityInboundInterceptor extracts from headers + # 4. ContextActivityInboundInterceptor extracts from headers # 5. Sets streaming_task_id ContextVar inside the activity - # 6. StreamingModel reads from ContextVar and streams to Redis + # 6. TemporalStreamingModel reads from ContextVar and streams to Redis # 7. TemporalStreamingHooks creates placeholder messages for tool calls # # This approach uses STANDARD Temporal components - no forked plugin needed! @@ -161,7 +161,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None: # What hooks do: # - on_tool_call_start(): Creates tool_request message with arguments # - on_tool_call_done(): Creates tool_response message with result - # - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel) + # - on_model_stream_part(): Called for each streaming chunk (handled by TemporalStreamingModel) # - on_run_done(): Marks the final response as complete # # For human-in-the-loop workflows, hooks create messages showing: diff --git a/examples/tutorials/run_all_async_tests.sh b/examples/tutorials/run_all_async_tests.sh index ac9c2fac..7d5c82ed 100755 --- a/examples/tutorials/run_all_async_tests.sh +++ b/examples/tutorials/run_all_async_tests.sh @@ -45,9 +45,9 @@ done # Find all async tutorial directories ALL_TUTORIALS=( # sync tutorials - #"00_sync/000_hello_acp" - #"00_sync/010_multiturn" - #"00_sync/020_streaming" + "00_sync/000_hello_acp" + "00_sync/010_multiturn" + "00_sync/020_streaming" # base tutorials # "10_async/00_base/000_hello_acp" # "10_async/00_base/010_multiturn" diff --git a/examples/tutorials/test_utils/async_utils.py b/examples/tutorials/test_utils/async_utils.py index 9c124d24..d3405417 100644 --- a/examples/tutorials/test_utils/async_utils.py +++ b/examples/tutorials/test_utils/async_utils.py @@ -25,6 +25,7 @@ async def send_event_and_poll_yielding( user_message: str, timeout: int = 30, sleep_interval: float = 1.0, + yield_updates: bool = True, ) -> AsyncGenerator[TaskMessage, None]: """ Send an event to an agent and poll for responses, yielding messages as they arrive. @@ -38,6 +39,7 @@ async def send_event_and_poll_yielding( user_message: The message content to send timeout: Maximum seconds to wait for a response (default: 30) sleep_interval: Seconds to sleep between polls (default: 1.0) + yield_updates: If True, yield messages again when their content changes (default: True for streaming) Yields: TaskMessage objects as they are discovered during polling @@ -61,6 +63,7 @@ async def send_event_and_poll_yielding( timeout=timeout, sleep_interval=sleep_interval, messages_created_after=messages_created_after, + yield_updates=yield_updates, ): yield message @@ -71,9 +74,27 @@ async def poll_messages( timeout: int = 30, sleep_interval: float = 1.0, messages_created_after: Optional[float] = None, + yield_updates: bool = False, ) -> AsyncGenerator[TaskMessage, None]: + """ + Poll for messages continuously until timeout. + + Args: + client: AgentEx client instance + task_id: The task ID to poll messages for + timeout: Maximum seconds to poll (default: 30) + sleep_interval: Seconds to sleep between polls (default: 1.0) + messages_created_after: Optional timestamp to filter messages (Unix timestamp) + yield_updates: If True, yield messages again when their content changes (for streaming) + If False, only yield each message ID once (default: False) + + Yields: + TaskMessage objects as they are discovered or updated + """ # Keep track of messages we've already yielded seen_message_ids = set() + # Track message content hashes to detect updates (for streaming) + message_content_hashes = {} start_time = datetime.now() # Poll continuously until timeout @@ -89,10 +110,6 @@ async def poll_messages( new_messages_found = 0 for message in sorted_messages: - # Skip if we've already yielded this message - if message.id in seen_message_ids: - continue - # Check if message passes timestamp filter if messages_created_after and message.created_at: # If message.created_at is timezone-naive, assume it's UTC @@ -103,14 +120,27 @@ async def poll_messages( if msg_timestamp < messages_created_after: continue - # Yield new messages that pass the filter - seen_message_ids.add(message.id) - new_messages_found += 1 - - # This yield should transfer control back to the caller - yield message + # Check if this is a new message or an update to existing message + is_new_message = message.id not in seen_message_ids + + if yield_updates: + # For streaming: track content changes + content_str = message.content.content if message.content and hasattr(message.content, 'content') else "" + content_hash = hash(content_str + str(message.streaming_status)) + is_updated = message.id in message_content_hashes and message_content_hashes[message.id] != content_hash + + if is_new_message or is_updated: + message_content_hashes[message.id] = content_hash + seen_message_ids.add(message.id) + new_messages_found += 1 + yield message + else: + # Original behavior: only yield each message ID once + if is_new_message: + seen_message_ids.add(message.id) + new_messages_found += 1 + yield message - # If we see this print, it means the caller consumed the message and we resumed # Sleep before next poll await asyncio.sleep(sleep_interval)