Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,16 @@ agent:
- env_var_name: REDIS_URL
secret_name: redis-url-secret
secret_key: url
# - env_var_name: OPENAI_API_KEY
# secret_name: openai-api-key
# secret_key: api-key
# - env_var_name: OPENAI_API_KEY
# secret_name: openai-api-key
# secret_key: api-key

# Optional: Set Environment variables for running your agent locally as well
# as for deployment later on
env: {}
# OPENAI_API_KEY: ""
env:
OPENAI_API_KEY: ""
# OPENAI_BASE_URL: "<YOUR_OPENAI_BASE_URL_HERE>"
# OPENAI_ORG_ID: ""
OPENAI_ORG_ID: ""


# Deployment Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ async def main():
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# 1. ContextInterceptor
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Returns TemporalStreamingModel instances that read task_id from ContextVar
# - TemporalStreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
# 5. The conversation state is preserved even if the worker restarts
#
# STREAMING MAGIC (via Interceptors + Model Provider):
# - The StreamingInterceptor threads task_id through activity headers
# - The StreamingModelProvider returns a model that streams to Redis
# - The ContextInterceptor threads task_id through activity headers
# - The TemporalStreamingModelProvider returns a model that streams to Redis
# - The model streams tokens in real-time while maintaining determinism
# - Complete response is still returned to Temporal for replay safety
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ agent:

# Optional: Set Environment variables for running your agent locally as well
# as for deployment later on
env: {}
# OPENAI_API_KEY: ""
env:
OPENAI_API_KEY: ""
# OPENAI_BASE_URL: "<YOUR_OPENAI_BASE_URL_HERE>"
# OPENAI_ORG_ID: ""
OPENAI_ORG_ID: ""


# Deployment Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ async def main():
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# 1. ContextInterceptor
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Returns TemporalStreamingModel instances that read task_id from ContextVar
# - TemporalStreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
# ============================================================================
# STREAMING SETUP: Store task_id for the Interceptor
# ============================================================================
# These instance variables are read by StreamingWorkflowOutboundInterceptor
# These instance variables are read by ContextWorkflowOutboundInterceptor
# which injects them into activity headers. This enables streaming without
# forking the Temporal plugin!
#
# How streaming works (Interceptor + Model Provider + Hooks):
# 1. We store task_id in workflow instance variable (here)
# 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance()
# 2. ContextWorkflowOutboundInterceptor reads it via workflow.instance()
# 3. Interceptor injects task_id into activity headers
# 4. StreamingActivityInboundInterceptor extracts from headers
# 4. ContextActivityInboundInterceptor extracts from headers
# 5. Sets streaming_task_id ContextVar inside the activity
# 6. StreamingModel reads from ContextVar and streams to Redis
# 6. TemporalStreamingModel reads from ContextVar and streams to Redis
# 7. TemporalStreamingHooks creates placeholder messages for tool calls
#
# This approach uses STANDARD Temporal components - no forked plugin needed!
Expand All @@ -237,7 +237,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
# What hooks do:
# - on_tool_call_start(): Creates tool_request message with arguments
# - on_tool_call_done(): Creates tool_response message with result
# - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel)
# - on_model_stream_part(): Called for each streaming chunk (handled by TemporalStreamingModel)
# - on_run_done(): Marks the final response as complete
#
# These hooks create the messages you see in the test output:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
task_id=task.id,
user_message=user_message,
timeout=60,
sleep_interval=1.0,
yield_updates=True, # Get all streaming chunks
):
sleep_interval=1.0
):
assert isinstance(message, TaskMessage)
print(f"[DEBUG 070 POLL] Received message - Type: {message.content.type if message.content else 'None'}, Author: {message.content.author if message.content else 'None'}, Status: {message.streaming_status}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ agent:

# Optional: Set Environment variables for running your agent locally as well
# as for deployment later on
env: {}
# OPENAI_API_KEY: "" # Set this in your shell environment instead
env:
OPENAI_API_KEY: ""
# OPENAI_BASE_URL: "<YOUR_OPENAI_BASE_URL_HERE>"
# OPENAI_ORG_ID: ""
OPENAI_ORG_ID: ""


# Deployment Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# 1. ContextInterceptor
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Returns TemporalStreamingModel instances that read task_id from ContextVar
# - TemporalStreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ async def main():
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# 1. ContextInterceptor
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Returns TemporalStreamingModel instances that read task_id from ContextVar
# - TemporalStreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,17 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
# ============================================================================
# STREAMING SETUP: Store task_id for the Interceptor
# ============================================================================
# These instance variables are read by StreamingWorkflowOutboundInterceptor
# These instance variables are read by ContextWorkflowOutboundInterceptor
# which injects them into activity headers. This enables streaming without
# forking the Temporal plugin!
#
# How streaming works (Interceptor + Model Provider + Hooks):
# 1. We store task_id in workflow instance variable (here)
# 2. StreamingWorkflowOutboundInterceptor reads it via workflow.instance()
# 2. ContextWorkflowOutboundInterceptor reads it via workflow.instance()
# 3. Interceptor injects task_id into activity headers
# 4. StreamingActivityInboundInterceptor extracts from headers
# 4. ContextActivityInboundInterceptor extracts from headers
# 5. Sets streaming_task_id ContextVar inside the activity
# 6. StreamingModel reads from ContextVar and streams to Redis
# 6. TemporalStreamingModel reads from ContextVar and streams to Redis
# 7. TemporalStreamingHooks creates placeholder messages for tool calls
#
# This approach uses STANDARD Temporal components - no forked plugin needed!
Expand All @@ -161,7 +161,7 @@ async def on_task_event_send(self, params: SendEventParams) -> None:
# What hooks do:
# - on_tool_call_start(): Creates tool_request message with arguments
# - on_tool_call_done(): Creates tool_response message with result
# - on_model_stream_part(): Called for each streaming chunk (handled by StreamingModel)
# - on_model_stream_part(): Called for each streaming chunk (handled by TemporalStreamingModel)
# - on_run_done(): Marks the final response as complete
#
# For human-in-the-loop workflows, hooks create messages showing:
Expand Down
6 changes: 3 additions & 3 deletions examples/tutorials/run_all_async_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ done
# Find all async tutorial directories
ALL_TUTORIALS=(
# sync tutorials
#"00_sync/000_hello_acp"
#"00_sync/010_multiturn"
#"00_sync/020_streaming"
"00_sync/000_hello_acp"
"00_sync/010_multiturn"
"00_sync/020_streaming"
# base tutorials
# "10_async/00_base/000_hello_acp"
# "10_async/00_base/010_multiturn"
Expand Down
52 changes: 41 additions & 11 deletions examples/tutorials/test_utils/async_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def send_event_and_poll_yielding(
user_message: str,
timeout: int = 30,
sleep_interval: float = 1.0,
yield_updates: bool = True,
) -> AsyncGenerator[TaskMessage, None]:
"""
Send an event to an agent and poll for responses, yielding messages as they arrive.
Expand All @@ -38,6 +39,7 @@ async def send_event_and_poll_yielding(
user_message: The message content to send
timeout: Maximum seconds to wait for a response (default: 30)
sleep_interval: Seconds to sleep between polls (default: 1.0)
yield_updates: If True, yield messages again when their content changes (default: True for streaming)

Yields:
TaskMessage objects as they are discovered during polling
Expand All @@ -61,6 +63,7 @@ async def send_event_and_poll_yielding(
timeout=timeout,
sleep_interval=sleep_interval,
messages_created_after=messages_created_after,
yield_updates=yield_updates,
):
yield message

Expand All @@ -71,9 +74,27 @@ async def poll_messages(
timeout: int = 30,
sleep_interval: float = 1.0,
messages_created_after: Optional[float] = None,
yield_updates: bool = False,
) -> AsyncGenerator[TaskMessage, None]:
"""
Poll for messages continuously until timeout.

Args:
client: AgentEx client instance
task_id: The task ID to poll messages for
timeout: Maximum seconds to poll (default: 30)
sleep_interval: Seconds to sleep between polls (default: 1.0)
messages_created_after: Optional timestamp to filter messages (Unix timestamp)
yield_updates: If True, yield messages again when their content changes (for streaming)
If False, only yield each message ID once (default: False)

Yields:
TaskMessage objects as they are discovered or updated
"""
# Keep track of messages we've already yielded
seen_message_ids = set()
# Track message content hashes to detect updates (for streaming)
message_content_hashes = {}
start_time = datetime.now()

# Poll continuously until timeout
Expand All @@ -89,10 +110,6 @@ async def poll_messages(

new_messages_found = 0
for message in sorted_messages:
# Skip if we've already yielded this message
if message.id in seen_message_ids:
continue

# Check if message passes timestamp filter
if messages_created_after and message.created_at:
# If message.created_at is timezone-naive, assume it's UTC
Expand All @@ -103,14 +120,27 @@ async def poll_messages(
if msg_timestamp < messages_created_after:
continue

# Yield new messages that pass the filter
seen_message_ids.add(message.id)
new_messages_found += 1

# This yield should transfer control back to the caller
yield message
# Check if this is a new message or an update to existing message
is_new_message = message.id not in seen_message_ids

if yield_updates:
# For streaming: track content changes
content_str = message.content.content if message.content and hasattr(message.content, 'content') else ""
content_hash = hash(content_str + str(message.streaming_status))
is_updated = message.id in message_content_hashes and message_content_hashes[message.id] != content_hash

if is_new_message or is_updated:
message_content_hashes[message.id] = content_hash
seen_message_ids.add(message.id)
new_messages_found += 1
yield message
else:
# Original behavior: only yield each message ID once
if is_new_message:
seen_message_ids.add(message.id)
new_messages_found += 1
yield message

# If we see this print, it means the caller consumed the message and we resumed
# Sleep before next poll
await asyncio.sleep(sleep_interval)

Expand Down