Skip to content

Conversation

@sanzog03
Copy link
Collaborator

Summary
This PR enhances the OpenAIBaseAgent (and CMRCareAgent) to support full streaming capabilities, including token streaming, reasoning events, and tool execution feedback. Additionally, it introduces Human-in-the-Loop (HITL) support, allowing agents to pause execution for user clarification and seamlessly resume with new context.

What it does

  • Streaming Support: Implements _stream_llm_response to stream tokens, thinking blocks, and partial outputs in real-time from OpenAI SDK, accessed via astream.
  • Human-in-the-Loop: Enables the agent to request human input via HumanTool. Execution halts and emits a HumanInputRequiredEvent that includes current RunContext. It resumes when the user's response is sent along with previous RunContext in a new agent run.
  • Stateful Orchestration: Updates CMRCareAgent to manage conversation state across multiple turns, orchestrating a search agent and a formatter agent.
  • Standardized Events: Converts OpenAI raw events into AKD-standard StreamEvent types (StreamingTokenEvent, ThinkingEvent, ToolCallingEvent).

How it does

  • OpenAIBaseAgent:
    • Leverages Runner.run_streamed from the OpenAI SDK to process events.
    • Maps response.output_text.delta to StreamingTokenEvent.
    • Maps reasoning content to ThinkingEvent.
    • Detects ask_human tool calls to trigger HumanInputRequiredEvent.
    • Injects HumanResponse into conversation history upon resumption via RunContext.
  • CMRCareAgent:
    • Orchestrates a stateful inner _search_agent and stateless _formatter_agent.
    • Forwards streaming events from sub-agents while maintaining the parent run context.
    • Handles partial outputs and intermediate tool calls correctly.

Files changed

  • akd_ext/agents/_base.py — Core streaming logic and event handling for OpenAI agents.
  • akd_ext/agents/cmr_care.py — Updated CMR agent structure for stateful orchestration and HITL.
  • docs/specs/HUMAN-IN-THE-LOOP.md — Documentation for Human-in-the-Loop feature.
  • akd_ext/_types.py — Type definitions update.
  • CONTRIBUTING.md — Agent configuration guidelines update.

Testing
To verify streaming and HITL functionality:

import asyncio
from akd_ext.agents import CMRCareAgent, CMRCareConfig
from akd.tools import HumanTool
from akd_ext.agents.cmr_care import get_default_cmr_tools, CMRCareAgentInputSchema
from akd._base import StreamEventType, RunContext, HumanResponse

async def main():
    # 1. Initialize agent with HumanTool
    config = CMRCareConfig(
        tools=get_default_cmr_tools() + [HumanTool()]
    )
    agent = CMRCareAgent(config=config)

    # 2. Start streaming
    query = "Find data about sea ice."
    params = CMRCareAgentInputSchema(query=query)
    
    saved_event = None
    
    print("--- Start Streaming ---")
    async for event in agent.astream(params):
        if event.event_type == StreamEventType.STREAMING_TOKEN:
            print(event.data.token, end="", flush=True)
        elif event.event_type == StreamEventType.HUMAN_INPUT_REQUIRED:
            print(f"\n\n[Human Input Required]: {event.data.human_input.question}")
            saved_event = event
            break 
            
    # 3. Resume with human feedback if needed
    if saved_event:
        print("\n--- Resuming with Human Input ---")
        user_response = "I am interested in data from 2022."
        
        run_context = RunContext(
            human_response=HumanResponse(
                tool_call_id=saved_event.data.tool_call_id,
                content={"response": user_response}
            ),
            messages=saved_event.run_context.messages
        )
        
        async for event in agent.astream(params, run_context=run_context):
            if event.event_type == StreamEventType.COMPLETED:
                print(f"\n\n[Final Output]:\n{event.data.output.report}")

if __name__ == "__main__":
    asyncio.run(main())

rohit-sahoo and others added 30 commits January 29, 2026 09:50
- Use the final text not delta
- ALso bugfix tool output issue
- Fix token accumulation problem and output for free form response
- IMprove tool calling output using "tool_output" stream event
- Finalize CMRCareAgent as orchestrator
- Avoid emitting the completedevent until pipeline is complete
- This is to make sure that search agent cmpletiion event data won't be
  validated with CMRCareAgent.output_schema
- Instead of yielding completed event immeidately, just store final
  output
- Also bugfix token batch size param mismatch to _astream
- The orcehstrator itself carries the message tracking and just forwrads
  accordingly to the sub agent depending on search agent or output agent
- The output agent is stateless
- The output forammter is stateless no matter what
NISH1001 and others added 10 commits February 10, 2026 11:18
- Convert akd-core compatible message list (compleition format) to
  openai response format for agents sdk
- Properly flush and track tool calls in message history
- Apparentl, hostedmcptool doesn't have 2 separate evnets. it's a single
  event iwth both intput and output
- Add `akd_ext._types` module to intoruduce custom types for easy import
- validate `OpenAIBaseAgentConfig.tools` with either openai tool or akd
  tool
- Basically Any already encompasses everything
- Also add docs/specs/ hitl document
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants