Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@

from agentex.lib.types.fastacp import TemporalACPConfig
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create the ACP server
acp = FastACP.create(
Expand All @@ -44,7 +51,8 @@
# We are also adding the Open AI Agents SDK plugin to the ACP.
type="temporal",
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[OpenAIAgentsPlugin()]
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor]
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
from agentex.lib.environment_variables import EnvironmentVariables
from agentex.lib.core.temporal.activities import get_all_activities
from agentex.lib.core.temporal.workers.worker import AgentexWorker
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

environment_variables = EnvironmentVariables.refresh()

Expand All @@ -24,12 +28,36 @@ async def main():

# Add activities to the worker
all_activities = get_all_activities() + [] # add your own activities here


# ============================================================================
# STREAMING SETUP: Interceptor + Model Provider
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
# Together, these enable real-time LLM streaming while maintaining Temporal's
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create a worker with automatic tracing
# We are also adding the Open AI Agents SDK plugin to the worker.
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
# No forking needed! The interceptor + model provider handle all streaming logic.
worker = AgentexWorker(
task_queue=task_queue_name,
plugins=[OpenAIAgentsPlugin()]
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor]
)

await worker.run(
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ version = "0.1.0"
description = "An AgentEx agent"
requires-python = ">=3.12"
dependencies = [
"agentex-sdk>=0.4.18",
"agentex-sdk==0.6.0",
"openai-agents-sdk==0.4.2",
"temporalio==1.18.2",
"scale-gp",
"temporalio>=1.18.0,<2",
]

[project.optional-dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@
"""

import os
import uuid

import pytest
import pytest_asyncio
from test_utils.agentic import (
poll_messages,
send_event_and_poll_yielding,
)

from agentex import AsyncAgentex
from agentex.types.task_message import TaskMessage
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest

# Configuration from environment variables
AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003")
Expand Down Expand Up @@ -57,80 +64,74 @@ class TestNonStreamingEvents:
@pytest.mark.asyncio
async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
"""Test sending an event and polling for the response."""
# TODO: Create a task for this conversation
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
# task = task_response.result
# assert task is not None

# TODO: Poll for the initial task creation message (if your agent sends one)
# async for message in poll_messages(
# client=client,
# task_id=task.id,
# timeout=30,
# sleep_interval=1.0,
# ):
# assert isinstance(message, TaskMessage)
# if message.content and message.content.type == "text" and message.content.author == "agent":
# # Check for your expected initial message
# assert "expected initial text" in message.content.content
# break

# TODO: Send an event and poll for response using the yielding helper function
# user_message = "Your test message here"
# async for message in send_event_and_poll_yielding(
# client=client,
# agent_id=agent_id,
# task_id=task.id,
# user_message=user_message,
# timeout=30,
# sleep_interval=1.0,
# ):
# assert isinstance(message, TaskMessage)
# if message.content and message.content.type == "text" and message.content.author == "agent":
# # Check for your expected response
# assert "expected response text" in message.content.content
# break
task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
task = task_response.result
assert task is not None

# Poll for the initial task creation message
async for message in poll_messages(
client=client,
task_id=task.id,
timeout=30,
sleep_interval=1.0,
):
assert isinstance(message, TaskMessage)
if message.content and message.content.type == "text" and message.content.author == "agent":
# Check for the Haiku Assistant welcome message
assert "Haiku Assistant" in message.content.content
assert "Temporal" in message.content.content
break

# Send event and poll for response with streaming updates
user_message = "Hello how is life?"
print(f"[DEBUG 060 POLL] Sending message: '{user_message}'")

# Use yield_updates=True to get all streaming chunks as they're written
final_message = None
async for message in send_event_and_poll_yielding(
client=client,
agent_id=agent_id,
task_id=task.id,
user_message=user_message,
timeout=30,
sleep_interval=1.0,
yield_updates=True, # Get updates as streaming writes chunks
):
if message.content and message.content.type == "text" and message.content.author == "agent":
print(
f"[DEBUG 060 POLL] Received update - Status: {message.streaming_status}, "
f"Content length: {len(message.content.content)}"
)
final_message = message

# Stop polling once we get a DONE message
if message.streaming_status == "DONE":
print(f"[DEBUG 060 POLL] Streaming complete!")
break

# Verify the final message has content (the haiku)
assert final_message is not None, "Should have received an agent message"
assert final_message.content is not None, "Final message should have content"
assert len(final_message.content.content) > 0, "Final message should have haiku content"

print(f"[DEBUG 060 POLL] ✅ Successfully received haiku response!")
print(f"[DEBUG 060 POLL] Final haiku:\n{final_message.content.content}")
pass


class TestStreamingEvents:
"""Test streaming event sending."""
"""Test streaming event sending (backend verification via polling)."""

@pytest.mark.asyncio
async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
"""Test sending an event and streaming the response."""
# TODO: Create a task for this conversation
# task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
# task = task_response.result
# assert task is not None
"""
Streaming test placeholder.

# user_message = "Your test message here"

# # Collect events from stream
# all_events = []

# async def collect_stream_events():
# async for event in stream_agent_response(
# client=client,
# task_id=task.id,
# timeout=30,
# ):
# all_events.append(event)

# # Start streaming task
# stream_task = asyncio.create_task(collect_stream_events())

# # Send the event
# event_content = TextContentParam(type="text", author="user", content=user_message)
# await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})

# # Wait for streaming to complete
# await stream_task

# # TODO: Add your validation here
# assert len(all_events) > 0, "No events received in streaming response"
NOTE: SSE streaming is tested via the UI (agentex-ui subscribeTaskState).
Backend streaming functionality is verified in test_send_event_and_poll.
"""
pass


if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import os
import sys
from datetime import timedelta

from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin

# === DEBUG SETUP (AgentEx CLI Debug Support) ===
if os.getenv("AGENTEX_DEBUG_ENABLED") == "true":
Expand All @@ -11,20 +10,20 @@
debug_port = int(os.getenv("AGENTEX_DEBUG_PORT", "5679"))
debug_type = os.getenv("AGENTEX_DEBUG_TYPE", "acp")
wait_for_attach = os.getenv("AGENTEX_DEBUG_WAIT_FOR_ATTACH", "false").lower() == "true"

# Configure debugpy
debugpy.configure(subProcess=False)
debugpy.listen(debug_port)

print(f"🐛 [{debug_type.upper()}] Debug server listening on port {debug_port}")

if wait_for_attach:
print(f"⏳ [{debug_type.upper()}] Waiting for debugger to attach...")
debugpy.wait_for_client()
print(f"✅ [{debug_type.upper()}] Debugger attached!")
else:
print(f"📡 [{debug_type.upper()}] Ready for debugger attachment")

except ImportError:
print("❌ debugpy not available. Install with: pip install debugpy")
sys.exit(1)
Expand All @@ -35,6 +34,13 @@

from agentex.lib.types.fastacp import TemporalACPConfig
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create the ACP server
acp = FastACP.create(
Expand All @@ -45,11 +51,8 @@
# We are also adding the Open AI Agents SDK plugin to the ACP.
type="temporal",
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[OpenAIAgentsPlugin(
model_params=ModelActivityParameters(
start_to_close_timeout=timedelta(days=1)
)
)]
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor]
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from datetime import timedelta

from temporalio.contrib.openai_agents import OpenAIAgentsPlugin, ModelActivityParameters
from temporalio.contrib.openai_agents import OpenAIAgentsPlugin

from project.workflow import ExampleTutorialWorkflow
from project.activities import get_weather, deposit_money, withdraw_money
Expand All @@ -10,6 +9,11 @@
from agentex.lib.environment_variables import EnvironmentVariables
from agentex.lib.core.temporal.activities import get_all_activities
from agentex.lib.core.temporal.workers.worker import AgentexWorker
from agentex.lib.core.temporal.plugins.openai_agents.hooks.activities import stream_lifecycle_content
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

environment_variables = EnvironmentVariables.refresh()

Expand All @@ -19,23 +23,43 @@
async def main():
# Setup debug mode if enabled
setup_debug_if_enabled()

task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
if task_queue_name is None:
raise ValueError("WORKFLOW_TASK_QUEUE is not set")

# Add activities to the worker
all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather] # add your own activities here

all_activities = get_all_activities() + [withdraw_money, deposit_money, get_weather, stream_lifecycle_content] # add your own activities here

# ============================================================================
# STREAMING SETUP: Interceptor + Model Provider
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
# Together, these enable real-time LLM streaming while maintaining Temporal's
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create a worker with automatic tracing
# We are also adding the Open AI Agents SDK plugin to the worker.
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
# No forking needed! The interceptor + model provider handle all streaming logic.
worker = AgentexWorker(
task_queue=task_queue_name,
plugins=[OpenAIAgentsPlugin(
model_params=ModelActivityParameters(
start_to_close_timeout=timedelta(days=1)
)
)],
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor],
)

await worker.run(
Expand Down
Loading