Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import pytest_asyncio
from test_utils.agentic import (
poll_messages,
stream_agent_response,
send_event_and_poll_yielding,
)

from agentex import AsyncAgentex
from agentex.types import TaskMessage
from agentex.types.agent_rpc_params import ParamsCreateTaskRequest
from agentex.types.text_content_param import TextContentParam

# Configuration from environment variables
AGENTEX_API_BASE_URL = os.environ.get("AGENTEX_API_BASE_URL", "http://localhost:5003")
Expand Down Expand Up @@ -83,7 +81,7 @@ async def test_send_event_and_poll(self, client: AsyncAgentex, agent_id: str):
if message.content and message.content.type == "text" and message.content.author == "agent":
assert "Hello! I've received your task" in message.content.content
break

await asyncio.sleep(1.5)
# Send an event and poll for response
user_message = "Hello, this is a test message!"
Expand All @@ -105,67 +103,8 @@ class TestStreamingEvents:

@pytest.mark.asyncio
async def test_send_event_and_stream(self, client: AsyncAgentex, agent_id: str):
"""Test sending an event and streaming the response."""
task_response = await client.agents.create_task(agent_id, params=ParamsCreateTaskRequest(name=uuid.uuid1().hex))
task = task_response.result
assert task is not None

user_message = "Hello, this is a test message!"
pass

# Collect events from stream
all_events = []

# Flags to track what we've received
task_creation_found = False
user_echo_found = False
agent_response_found = False

async def collect_stream_events(): #noqa: ANN101
nonlocal task_creation_found, user_echo_found, agent_response_found

async for event in stream_agent_response(
client=client,
task_id=task.id,
timeout=30,
):
# Check events as they arrive
event_type = event.get("type")
if event_type == "full":
content = event.get("content", {})
if content.get("content") is None:
continue # Skip empty content
if content.get("type") == "text" and content.get("author") == "agent":
# Check for initial task creation message
if "Hello! I've received your task" in content.get("content", ""):
task_creation_found = True
# Check for agent response to user message
elif "Hello! I've received your message" in content.get("content", ""):
# Agent response should come after user echo
assert user_echo_found, "Agent response arrived before user message echo (incorrect order)"
agent_response_found = True
elif content.get("type") == "text" and content.get("author") == "user":
# Check for user message echo
if content.get("content") == user_message:
user_echo_found = True

# Exit early if we've found all expected messages
if task_creation_found and user_echo_found and agent_response_found:
break

assert task_creation_found, "Task creation message not found in stream"
assert user_echo_found, "User message echo not found in stream"
assert agent_response_found, "Agent response not found in stream"


# Start streaming task
stream_task = asyncio.create_task(collect_stream_events())

# Send the event
event_content = TextContentParam(type="text", author="user", content=user_message)
await client.agents.send_event(agent_id=agent_id, params={"task_id": task.id, "content": event_content})

# Wait for streaming to complete
await stream_task

if __name__ == "__main__":
pytest.main([__file__, "-v"])
pytest.main([__file__, "-v"])
2,447 changes: 0 additions & 2,447 deletions examples/tutorials/10_agentic/10_temporal/010_agent_chat/uv.lock

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ Or set in `.env` file: `OPENAI_API_KEY=your-key-here`
## Quick Start

```bash
cd examples/tutorials/10_agentic/10_temporal/060_open_ai_agents_sdk_hello_world
cd examples/tutorials/10_agentic/10_temporal/010_open_ai_agents_sdk_hello_world
uv run agentex agents run --manifest manifest.yaml
```

Expand Down Expand Up @@ -102,4 +102,4 @@ The magic happens behind the scenes - no manual activity wrapping needed. The co
- Want automatic activity creation without manual wrapping
- Leveraging OpenAI's agent patterns with Temporal's durability

**Next:** [070_open_ai_agents_sdk_tools](../070_open_ai_agents_sdk_tools/) - Add durable tools to your agents
**Next:** [020_open_ai_agents_sdk_tools](../020_open_ai_agents_sdk_tools/) - Add durable tools to your agents
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ build:
# - Your agent's directory (your custom agent code)
# These paths are collected and sent to the Docker daemon for building
include_paths:
- 060_open_ai_agents_sdk_hello_world
- 010_open_ai_agents_sdk_hello_world

# Path to your agent's Dockerfile
# This defines how your agent's image is built from the context
# Relative to the root directory
dockerfile: 060_open_ai_agents_sdk_hello_world/Dockerfile
dockerfile: 010_open_ai_agents_sdk_hello_world/Dockerfile

# Path to your agent's .dockerignore
# Filters unnecessary files from the build context
# Helps keep build context small and builds fast
dockerignore: 060_open_ai_agents_sdk_hello_world/.dockerignore
dockerignore: 010_open_ai_agents_sdk_hello_world/.dockerignore


# Local Development Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@

from agentex.lib.types.fastacp import TemporalACPConfig
from agentex.lib.sdk.fastacp.fastacp import FastACP
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create the ACP server
acp = FastACP.create(
Expand All @@ -44,7 +51,8 @@
# We are also adding the Open AI Agents SDK plugin to the ACP.
type="temporal",
temporal_address=os.getenv("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[OpenAIAgentsPlugin()]
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor]
)
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import asyncio

from temporalio.contrib.openai_agents import OpenAIAgentsPlugin

from project.workflow import ExampleTutorialWorkflow
from agentex.lib.utils.debug import setup_debug_if_enabled
from agentex.lib.utils.logging import make_logger
from agentex.lib.environment_variables import EnvironmentVariables
from agentex.lib.core.temporal.activities import get_all_activities
from agentex.lib.core.temporal.workers.worker import AgentexWorker
from agentex.lib.core.temporal.plugins.openai_agents.models.temporal_streaming_model import (
TemporalStreamingModelProvider,
)
from agentex.lib.core.temporal.plugins.openai_agents.interceptors.context_interceptor import ContextInterceptor

environment_variables = EnvironmentVariables.refresh()

logger = make_logger(__name__)


async def main():
# Setup debug mode if enabled
setup_debug_if_enabled()

task_queue_name = environment_variables.WORKFLOW_TASK_QUEUE
if task_queue_name is None:
raise ValueError("WORKFLOW_TASK_QUEUE is not set")

# Add activities to the worker
all_activities = get_all_activities() + [] # add your own activities here

# ============================================================================
# STREAMING SETUP: Interceptor + Model Provider
# ============================================================================
# This is where the streaming magic is configured! Two key components:
#
# 1. ContextInterceptor (StreamingInterceptor)
# - Threads task_id through activity headers using Temporal's interceptor pattern
# - Outbound: Reads _task_id from workflow instance, injects into activity headers
# - Inbound: Extracts task_id from headers, sets streaming_task_id ContextVar
# - This enables runtime context without forking the Temporal plugin!
#
# 2. TemporalStreamingModelProvider
# - Returns StreamingModel instances that read task_id from ContextVar
# - StreamingModel.get_response() streams tokens to Redis in real-time
# - Still returns complete response to Temporal for determinism/replay safety
# - Uses AgentEx ADK streaming infrastructure (Redis XADD to stream:{task_id})
#
# Together, these enable real-time LLM streaming while maintaining Temporal's
# durability guarantees. No forked components - uses STANDARD OpenAIAgentsPlugin!
context_interceptor = ContextInterceptor()
temporal_streaming_model_provider = TemporalStreamingModelProvider()

# Create a worker with automatic tracing
# IMPORTANT: We use the STANDARD temporalio.contrib.openai_agents.OpenAIAgentsPlugin
# No forking needed! The interceptor + model provider handle all streaming logic.
worker = AgentexWorker(
task_queue=task_queue_name,
plugins=[OpenAIAgentsPlugin(model_provider=temporal_streaming_model_provider)],
interceptors=[context_interceptor]
)

await worker.run(
activities=all_activities,
workflow=ExampleTutorialWorkflow,
)

if __name__ == "__main__":
asyncio.run(main())
Loading
Loading