Skip to content

How to send custom stream events from an MCP Client to an app connected to an agent, similar to get_stream_writer (langgraph) with custom mode?ย #398

@tdkhang305-glitch

Description

@tdkhang305-glitch

Evironment

langchain-mcp-adapters: 0.2.1

โ“ Question: Forwarding tool progress events from MCP Client to an agent-connected application

Currently, I am able to receive tool execution progress via on_progress callbacks on the MCP client side.

However, I am not sure how to forward these progress events back to the application that is connected to the agent, so that the UI can react dynamically (e.g. display real-time tool progress, loading states, or intermediate steps).


async def on_progress(
    progress: float,
    total: float | None,
    message: str | None,
    context: CallbackContext,
):
    """Handle progress updates from MCP servers."""
    percent = (progress / total * 100) if total else progress
    tool_info = f" ({context.tool_name})" if context.tool_name else ""
    print(f"[{context.server_name}{tool_info}] Progress: {percent:.1f}% - {message}")


async def on_logging_message(
    params: LoggingMessageNotificationParams,
    context: CallbackContext,
):
    """Handle log messages from MCP servers."""
    print(f"[{context.server_name}] {params.level}: {params.data}")

    
async def runtime_interceptor(
        request: MCPToolCallRequest, handler
    ) -> CallToolResult:
        request.headers = {"Authorizatahaksion": f"Bearer 123456789"}

        return await handler(request)


async def make_graph():   
    client = MultiServerMCPClient(
        {
            "math": {
                "transport": "streamable_http",  # HTTP-based remote server
                # Ensure you start your weather server on port 8000
                "url": "http://localhost:8000/mcp",
            },
        },
        tool_interceptors=[runtime_interceptor],
        callbacks=Callbacks(on_logging_message=on_logging_message, on_progress=on_progress),
    )
    tools = await client.get_tools()
    tools.extend([get_current_timestamp, get_all_jobs, get_ownerships, get_activity_logs, create_activity_log])
    
    agent = create_agent(
        "gpt-4.1",
        tools,
        middleware=[system_prompt, AmpAgentMiddleware()],
        context_schema=Context,
    )

    return agent

๐Ÿ” What I can do today

  • Tools executed via MCP can emit progress updates.
  • These progress updates are available through on_progress callbacks on the MCP client.

๐ŸŽฏ What I want to achieve

I want to stream custom progress events from the MCP Client โ†’ Agent โ†’ Application, similar to how this works when using LangGraph tools with:

  • get_stream_writer
  • mode="custom"

In LangGraph, this makes it straightforward to:

  • Emit custom events from tools
  • Stream them to the application
  • Update the UI in real time

โ“ Question

Is there an equivalent or recommended approach when using langchain_mcp_adapters to:

  • Emit custom stream events from tools
  • Forward those events through the agent runtime
  • Consume them in the application layer (e.g. UI)

If this is not currently supported:

  • Is there a recommended workaround?
  • Or is this considered a feature gap compared to LangGraphโ€™s custom streaming mode?

๐Ÿ“Ž Additional Context

The main goal is to make the UI more flexible and responsive by displaying tool-level progress updates while the agent is running.

Any guidance, examples, or design recommendations would be greatly appreciated.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions