Skip to content

Conversation

@rohit-sahoo
Copy link
Collaborator

Summary

Add streaming support to OpenAIBaseAgent in akd-ext, enabling real-time token streaming and tool call events following the akd-core streaming spec.

What it does

Enables agent.astream() method for OpenAI Agents SDK based agents with support for STARTING, RUNNING, STREAMING, THINKING, TOOL_CALLING , TOOL_RESULT, COMPLETED, FAILED

How it does

  1. Added StreamingMixin to OpenAIBaseAgent class

    • Provides astream() public method with input/output validation
  2. Implemented _stream_llm_response() method

    • Uses Runner.run_streamed() instead of Runner.run()
    • Handles RawResponsesStreamEvent for text tokens and thinking
    • Handles RunItemStreamEvent for tool calls (via event.item.raw_item)
    • Token batching for efficiency (default batch_size=10)
  3. Implemented _astream() method

    • Converts internal stream format to akd StreamEvent objects

Tested streaming with MCP Tools

from agents import HostedMCPTool
from pydantic import Field
from akd._base import InputSchema, OutputSchema
from akd._base.streaming import StreamEventType
from akd_ext.agents._base import OpenAIBaseAgent, OpenAIBaseAgentConfig


class QIn(InputSchema):
    """Input."""
    q: str = Field(..., description="Question")


class QOut(OutputSchema):
    """Output."""
    a: str = Field(..., description="Answer")


# Auto-approve MCP tool calls
async def auto_approve(request) -> dict:
    """Automatically approve all MCP tool calls."""
    print(f"Auto-approving: {request}")
    return {"approve": True}


# MCP Tool setup
mcp_tool = HostedMCPTool(
    tool_config={
        "type": "mcp",
        "server_url": "YOUR_MCP_SERVER_URL",
        "server_label": "your_label",
        "headers": {
            "Authorization": "Bearer YOUR_TOKEN",
        }
    },
    on_approval_request=auto_approve,
)


class MCPTestAgent(OpenAIBaseAgent[QIn, QOut]):
    input_schema = QIn
    output_schema = QOut


config = OpenAIBaseAgentConfig(
    model_name="gpt-4o-mini",
    system_prompt="You are a helpful assistant. Use tools when needed.",
    tools=[mcp_tool],
)

agent = MCPTestAgent(config=config)

print(f"Agent created with MCP tools!")
print(f"Tools: {config.tools}")

# Test with tool calling
question = "Use the dummy tool with input 'hello world'"

async for event in agent.astream(QIn(q=question)):
    
    print(f"[{event.event_type.upper()}]", end=" ")
    if event.event_type == StreamEventType.TOOL_CALLING:
        print(f"TOOL: {event.data.get('tool_name')}")
        print(f"INPUT: {event.data.get('tool_input')}")
    elif event.event_type == StreamEventType.TOOL_RESULT:
        print(f"RESULT: {event.data.get('tool_output')}")
    elif event.event_type == StreamEventType.STREAMING:
        print(f"Token: '{event.token}'")
    elif event.event_type == StreamEventType.COMPLETED:
        print(f"Final: {event.output}")
    else:
        print()

@rohit-sahoo rohit-sahoo self-assigned this Jan 29, 2026
@rohit-sahoo
Copy link
Collaborator Author

Reference Links:

  1. Running agents - OpenAI SDK
  2. Streaming - OpenAI SDK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants