Skip to content

Conversation

@danielmillerp
Copy link
Contributor

@danielmillerp danielmillerp commented Oct 28, 2025

Add SyncStreamingProvider with Tracing Support

Context & Motivation

Why Not Use run_agent_streamed_auto_send?

For sync agents (FastACP), run_agent_streamed_auto_send doesn't work correctly because:

  1. Redis streaming incompatibility: Sync agents yield TaskMessageUpdate chunks directly to the frontend via async generators.
    They don't subscribe to Redis server-side events like temporal agents do.

  2. Duplicate messages: run_agent_streamed_auto_send streams to Redis AND saves messages to the database. For sync agents, this
    causes:

    • Messages appearing twice (once from yield, once from DB fetch)
    • Messages appearing out of order (tool calls show up after the stream completes instead of being interleaved)
  3. Wrong abstraction: The auto_send methods were designed for temporal workflows, not for sync agents that directly yield to
    the frontend.

The Solution

This PR provides SyncStreamingProvider which:

  • Uses Runner.run_streamed directly (the correct way for sync agents)
  • Converts OpenAI events to AgentEx TaskMessageUpdate events in real-time
  • Yields messages in the correct order as they happen
  • Adds distributed tracing support for observability
  • Eliminates duplicate messages and ordering issues

Core Concepts

SyncStreamingProvider

A provider wrapper that:

  • Wraps the OpenAI provider with tracing instrumentation
  • Automatically creates trace spans for run_agent (non-streaming) and run_agent_streamed (streaming)
  • Captures input parameters and output for observability
  • Works seamlessly with the existing adk.tracing system

convert_openai_to_agentex_events()

A helper function that converts OpenAI streaming events to AgentEx TaskMessageUpdate events, enabling compatibility with the
FastACP streaming protocol.

Usage

Basic Setup with Streaming

from agents import Runner, Agent, RunConfig
from agentex.lib.adk.providers._modules.sync_provider import (
    SyncStreamingProvider,
    convert_openai_to_agentex_events
)
from agentex.lib import adk

# Create provider with tracing
async with adk.tracing.span(
    trace_id=trace_id,
    name="Agent Turn"
) as span:
    provider = SyncStreamingProvider(
        trace_id=trace_id,
        parent_span_id=span.id
    )

    # Run agent with streaming
    agent = Agent(name="assistant", instructions="You are helpful", model="gpt-4o")
    result = Runner.run_streamed(
        agent,
        input_list,
        run_config=RunConfig(model_provider=provider)
    )

    # Convert and yield AgentEx events
    async for agentex_event in convert_openai_to_agentex_events(result.stream_events()):
        yield agentex_event

Non-Streaming with Tracing

# Works with non-streaming too!
provider = SyncStreamingProvider(
    trace_id=trace_id,
    parent_span_id=span.id
)

result = Runner.run(
    agent,
    input_list,
    run_config=RunConfig(model_provider=provider)
)

# The run will be traced automatically

Without Tracing (Optional)

# Tracing is optional - omit trace_id to disable
provider = SyncStreamingProvider()  # No tracing overhead

Benefits

- Correct streaming behavior: Messages appear in the correct order without duplicates
- Full observability: Track agent executions across distributed systems
- Flexible: Works with both streaming and non-streaming
- Opt-in: Zero overhead when tracing is disabled
- Compatible: Integrates seamlessly with existing AgentEx tracing infrastructure

@felixs8696
Copy link
Contributor

felixs8696 commented Oct 28, 2025

Can you fix the lint before merging?

@felixs8696
Copy link
Contributor

Also can you update the tutorials to use this?

@felixs8696
Copy link
Contributor

Also i tested locally by copying the code and it works :) thanks for this!

@danielmillerp
Copy link
Contributor Author

Also can you update the tutorials to use this?

yep, once this is merged, will update tutorials!

@danielmillerp danielmillerp merged commit 6380da6 into main Oct 28, 2025
10 checks passed
@danielmillerp danielmillerp deleted the dm/add-sync-provider branch October 28, 2025 18:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants