Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 31 additions & 6 deletions apps/agentstack-sdk-py/examples/mcp_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

from agentstack_sdk.a2a.extensions.auth.oauth import OAuthExtensionServer, OAuthExtensionSpec
from agentstack_sdk.a2a.extensions.services.mcp import MCPServiceExtensionServer, MCPServiceExtensionSpec
from agentstack_sdk.a2a.extensions.tools.call import (
ToolCallExtensionParams,
ToolCallExtensionServer,
ToolCallExtensionSpec,
ToolCallRequest,
)
from agentstack_sdk.a2a.types import RunYield
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext
Expand All @@ -21,24 +27,43 @@ async def mcp_agent(
message: Message,
context: RunContext,
oauth: Annotated[OAuthExtensionServer, OAuthExtensionSpec.single_demand()],
mcp: Annotated[
mcp_tool_call: Annotated[ToolCallExtensionServer, ToolCallExtensionSpec(params=ToolCallExtensionParams())],
mcp_service: Annotated[
MCPServiceExtensionServer,
MCPServiceExtensionSpec.single_demand(),
],
) -> AsyncGenerator[RunYield, Message]:
"""Lists tools"""

if not mcp:
if not mcp_service:
yield "MCP extension hasn't been activated, no tools are available"
return

async with mcp.create_client() as (read, write), ClientSession(read, write) as session:
await session.initialize()
if not mcp_tool_call:
yield "MCP Tool Call extension hasn't been activated, no approval requests will be issued"

tools = await session.list_tools()
async with (
mcp_service.create_client() as (read, write),
ClientSession(read, write) as session,
):
session_init_result = await session.initialize()

result = await session.list_tools()

yield "Available tools: \n"
yield "\n".join([t.name for t in tools.tools])
yield "\n".join([t.name for t in result.tools])

if result.tools:
tool = result.tools[0]
input = {}
yield f"Requesting approval for tool {tool.name}"
if mcp_tool_call:
await mcp_tool_call.request_tool_call_approval(
ToolCallRequest.from_mcp_tool(tool, input, server=session_init_result.serverInfo), context=context
)
yield f"Calling tool {tool.name}"
await session.call_tool(tool.name, input)
yield "Tool call finished"


if __name__ == "__main__":
Expand Down
65 changes: 41 additions & 24 deletions apps/agentstack-sdk-py/examples/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from pydantic import AnyHttpUrl, AnyUrl

import agentstack_sdk.a2a.extensions
from agentstack_sdk.a2a.extensions.tools.call import ToolCallResponse


class OAuthHandler:
Expand Down Expand Up @@ -67,67 +68,83 @@ async def handler(request: web.Request) -> web.Response:
async def run(base_url: str = "http://127.0.0.1:10000"):
async with httpx.AsyncClient(timeout=30) as httpx_client:
card = await a2a.client.A2ACardResolver(httpx_client, base_url=base_url).get_agent_card()
mcp_spec = agentstack_sdk.a2a.extensions.MCPServiceExtensionSpec.from_agent_card(card)
mcp_service_spec = agentstack_sdk.a2a.extensions.MCPServiceExtensionSpec.from_agent_card(card)
oauth_spec = agentstack_sdk.a2a.extensions.OAuthExtensionSpec.from_agent_card(card)
tool_call_spec = agentstack_sdk.a2a.extensions.ToolCallExtensionSpec.from_agent_card(card)

if not mcp_spec:
if not mcp_service_spec:
raise ValueError(f"Agent at {base_url} does not support MCP service injection")
if not oauth_spec:
raise ValueError(f"Agent at {base_url} does not support oAuth")
if not tool_call_spec:
raise ValueError(f"Agent at {base_url} does not support MCP")

mcp_extension_client = agentstack_sdk.a2a.extensions.MCPServiceExtensionClient(mcp_spec)
mcp_service_extension_client = agentstack_sdk.a2a.extensions.MCPServiceExtensionClient(mcp_service_spec)
oauth_extension_client = agentstack_sdk.a2a.extensions.OAuthExtensionClient(oauth_spec)
tool_call_extension_client = agentstack_sdk.a2a.extensions.ToolCallExtensionClient(tool_call_spec)

oauth = OAuthHandler()
message = a2a.types.Message(
message_id=str(uuid.uuid4()),
role=a2a.types.Role.user,
parts=[a2a.types.Part(root=a2a.types.TextPart(text="Howdy!"))],
metadata=mcp_extension_client.fulfillment_metadata(
metadata=mcp_service_extension_client.fulfillment_metadata(
mcp_fulfillments={
key: agentstack_sdk.a2a.extensions.services.mcp.MCPFulfillment(
transport=agentstack_sdk.a2a.extensions.services.mcp.StreamableHTTPTransport(
url=AnyHttpUrl("https://mcp.stripe.com")
),
)
for key in mcp_spec.params.mcp_demands
for key in mcp_service_spec.params.mcp_demands
}
)
| oauth_extension_client.fulfillment_metadata(
oauth_fulfillments={
key: agentstack_sdk.a2a.extensions.OAuthFulfillment(redirect_uri=AnyUrl(oauth.redirect_uri))
for key in oauth_spec.params.oauth_demands
}
),
)
| tool_call_extension_client.metadata(),
)

client = a2a.client.ClientFactory(a2a.client.ClientConfig(httpx_client=httpx_client, polling=True)).create(
card=card
)

task = None
async for event in client.send_message(message):
if isinstance(event, a2a.types.Message):
print(event)
return
task, _update = event
while True:
async for event in client.send_message(message):
if isinstance(event, a2a.types.Message):
print(event)
return
task, _update = event

if task and task.status.state == a2a.types.TaskState.auth_required:
if not task.status.message:
raise RuntimeError("Missing message")
if task and task.status.state == a2a.types.TaskState.auth_required:
if not task.status.message:
raise RuntimeError("Missing message")

auth_request = oauth_extension_client.parse_auth_request(message=task.status.message)
auth_request = oauth_extension_client.parse_auth_request(message=task.status.message)

print("Agent has requested authorization")
oauth.open_browser(str(auth_request.authorization_endpoint_url))
request = await oauth.handle_redirect()
print("Agent has requested authorization")
oauth.open_browser(str(auth_request.authorization_endpoint_url))
request = await oauth.handle_redirect()

async for event in client.send_message(
oauth_extension_client.create_auth_response(task_id=task.id, redirect_uri=AnyUrl(str(request.url)))
):
if isinstance(event, a2a.types.Message):
raise RuntimeError("Agent responded with message to a task")
task, _update = event
message = oauth_extension_client.create_auth_response(
task_id=task.id, redirect_uri=AnyUrl(str(request.url))
)
elif task and task.status.state == a2a.types.TaskState.input_required:
if not task.status.message:
raise RuntimeError("Missing message")

approval_request = tool_call_extension_client.parse_request(message=task.status.message)

print("Agent has requested a tool call")
print(approval_request)
choice = input("Approve (Y/n): ")
response = ToolCallResponse(action="accept" if choice.lower() == "y" else "reject")
message = tool_call_extension_client.create_response_message(task_id=task.id, response=response)
else:
break

print(task)

Expand Down
60 changes: 60 additions & 0 deletions apps/agentstack-sdk-py/examples/tool_call_approval_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from typing import Annotated

from a2a.types import Message
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import TextContent

from agentstack_sdk.a2a.extensions.tools.call import (
ToolCallExtensionParams,
ToolCallExtensionServer,
ToolCallExtensionSpec,
ToolCallRequest,
)
from agentstack_sdk.a2a.extensions.tools.exceptions import ToolCallRejectionError
from agentstack_sdk.server import Server
from agentstack_sdk.server.context import RunContext

server = Server()


@server.agent()
async def tool_call_approval_agent(
message: Message,
context: RunContext,
mcp_tool_call: Annotated[ToolCallExtensionServer, ToolCallExtensionSpec(params=ToolCallExtensionParams())],
):
async with (
streamablehttp_client(url="https://hf.co/mcp") as (read, write, _),
ClientSession(read, write) as session,
):
session_init_result = await session.initialize()

list_tools_result = await session.list_tools()
tools = {tool.name: tool for tool in list_tools_result.tools}

whoami_tool = tools.get("hf_whoami")
if not whoami_tool:
raise RuntimeError("Could not find whoami_tool on the server")

arguments = {}
try:
await mcp_tool_call.request_tool_call_approval(
ToolCallRequest.from_mcp_tool(whoami_tool, arguments, server=session_init_result.serverInfo),
context=context,
)
result = await session.call_tool("hf_whoami", arguments)
content = result.content[0]
if isinstance(content, TextContent):
yield content.text
else:
yield "Tool call succeeded"
except ToolCallRejectionError:
yield "Tool call has been rejected by the client"


if __name__ == "__main__":
server.run()
62 changes: 62 additions & 0 deletions apps/agentstack-sdk-py/examples/tool_call_approval_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

import asyncio
import uuid

import a2a.client
import a2a.types
import httpx

import agentstack_sdk.a2a.extensions
from agentstack_sdk.a2a.extensions.tools.call import ToolCallResponse


async def run(base_url: str = "http://127.0.0.1:10000"):
async with httpx.AsyncClient(timeout=30) as httpx_client:
card = await a2a.client.A2ACardResolver(httpx_client, base_url=base_url).get_agent_card()
tool_call_spec = agentstack_sdk.a2a.extensions.ToolCallExtensionSpec.from_agent_card(card)

if not tool_call_spec:
raise ValueError(f"Agent at {base_url} does not support MCP Tool Call extension")

tool_call_extension_client = agentstack_sdk.a2a.extensions.ToolCallExtensionClient(tool_call_spec)

message = a2a.types.Message(
message_id=str(uuid.uuid4()),
role=a2a.types.Role.user,
parts=[a2a.types.Part(root=a2a.types.TextPart(text="Howdy!"))],
metadata=tool_call_extension_client.metadata(),
)

client = a2a.client.ClientFactory(a2a.client.ClientConfig(httpx_client=httpx_client, polling=True)).create(
card=card
)

task = None
while True:
async for event in client.send_message(message):
if isinstance(event, a2a.types.Message):
print(event)
return
task, _update = event

if task and task.status.state == a2a.types.TaskState.input_required:
if not task.status.message:
raise RuntimeError("Missing message")

approval_request = tool_call_extension_client.parse_request(message=task.status.message)

print("Agent has requested a tool call")
print(approval_request)
choice = input("Approve (Y/n): ")
response = ToolCallResponse(action="accept" if choice.lower() == "y" else "reject")
message = tool_call_extension_client.create_response_message(task_id=task.id, response=response)
else:
break

print(task)


if __name__ == "__main__":
asyncio.run(run())
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@

from .auth import *
from .services import *
from .tools import *
from .ui import *
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Copyright 2025 © BeeAI a Series of LF Projects, LLC
# SPDX-License-Identifier: Apache-2.0

from .call import *
from .exceptions import *
Loading