From 8108f989613a58f32403bfc86a168f4c25302cac Mon Sep 17 00:00:00 2001 From: Vasiliy Radostev Date: Wed, 11 Mar 2026 23:13:23 -0700 Subject: [PATCH 1/4] feat(ag2): add research-synthesis-team and payment-approval AG2 agent examples Co-Authored-By: Claude Sonnet 4.6 --- ag2-agents/payment-approval/.env.example | 5 ++ ag2-agents/payment-approval/README.md | 26 +++++++ ag2-agents/payment-approval/main.py | 70 +++++++++++++++++++ ag2-agents/payment-approval/requirements.txt | 2 + ag2-agents/payment-approval/tests/conftest.py | 9 +++ .../payment-approval/tests/test_payment.py | 23 ++++++ .../research-synthesis-team/.env.example | 9 +++ ag2-agents/research-synthesis-team/Dockerfile | 14 ++++ ag2-agents/research-synthesis-team/README.md | 35 ++++++++++ .../research-synthesis-team/agent_executor.py | 20 ++++++ ag2-agents/research-synthesis-team/agents.py | 43 ++++++++++++ ag2-agents/research-synthesis-team/main.py | 43 ++++++++++++ .../research-synthesis-team/requirements.txt | 5 ++ .../research-synthesis-team/tests/conftest.py | 20 ++++++ .../tests/test_agent_executor.py | 14 ++++ .../tests/test_agents.py | 35 ++++++++++ .../research-synthesis-team/workflow.py | 50 +++++++++++++ 17 files changed, 423 insertions(+) create mode 100644 ag2-agents/payment-approval/.env.example create mode 100644 ag2-agents/payment-approval/README.md create mode 100644 ag2-agents/payment-approval/main.py create mode 100644 ag2-agents/payment-approval/requirements.txt create mode 100644 ag2-agents/payment-approval/tests/conftest.py create mode 100644 ag2-agents/payment-approval/tests/test_payment.py create mode 100644 ag2-agents/research-synthesis-team/.env.example create mode 100644 ag2-agents/research-synthesis-team/Dockerfile create mode 100644 ag2-agents/research-synthesis-team/README.md create mode 100644 ag2-agents/research-synthesis-team/agent_executor.py create mode 100644 ag2-agents/research-synthesis-team/agents.py create mode 100644 ag2-agents/research-synthesis-team/main.py create mode 100644 ag2-agents/research-synthesis-team/requirements.txt create mode 100644 ag2-agents/research-synthesis-team/tests/conftest.py create mode 100644 ag2-agents/research-synthesis-team/tests/test_agent_executor.py create mode 100644 ag2-agents/research-synthesis-team/tests/test_agents.py create mode 100644 ag2-agents/research-synthesis-team/workflow.py diff --git a/ag2-agents/payment-approval/.env.example b/ag2-agents/payment-approval/.env.example new file mode 100644 index 0000000..3325196 --- /dev/null +++ b/ag2-agents/payment-approval/.env.example @@ -0,0 +1,5 @@ +OPENAI_API_KEY=your-openai-api-key-here + +# Optional +LLM_MODEL=gpt-4o-mini +OPENAI_BASE_URL=https://api.openai.com/v1 diff --git a/ag2-agents/payment-approval/README.md b/ag2-agents/payment-approval/README.md new file mode 100644 index 0000000..916b2bc --- /dev/null +++ b/ag2-agents/payment-approval/README.md @@ -0,0 +1,26 @@ +# AG2 Two-Agent Payment Approval + +Demonstrates AG2's `human_input_mode="ALWAYS"` pattern as an approval gate +before triggering a Skyfire payment — the first example in this repo requiring +explicit user confirmation before a financial action. + +Two agents collaborate: +- **researcher** — investigates the recipient and produces a risk assessment +- **payment_executor** — presents the assessment, pauses for human confirmation, + then executes or aborts based on the response + +## Key AG2 Features + +- **`human_input_mode="ALWAYS"`** — executor pauses before every response; human + types "yes" to proceed or "no" to abort — no custom routing logic needed +- **Two-agent `initiate_chat`** — researcher hands off to executor via the + natural conversation flow; the shared message history carries the assessment + +## Quick Start + +```bash +cd ag2-agents/payment-approval +pip install -r requirements.txt +cp .env.example .env +python main.py +``` diff --git a/ag2-agents/payment-approval/main.py b/ag2-agents/payment-approval/main.py new file mode 100644 index 0000000..73a0ceb --- /dev/null +++ b/ag2-agents/payment-approval/main.py @@ -0,0 +1,70 @@ +""" +AG2 two-agent payment approval with human-in-the-loop gate. + +researcher — investigates the recipient and produces a risk assessment +executor — presents the assessment and waits for explicit human confirmation + before executing the Skyfire payment + +human_input_mode="ALWAYS" on executor is the approval gate: the agent +pauses before every response so the human can type "yes" to proceed or +"no" to abort. No custom routing logic required. +""" +import os +from dotenv import load_dotenv +from autogen import ConversableAgent + +load_dotenv() + +llm_config = { + "config_list": [{ + "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), + "api_key": os.environ["OPENAI_API_KEY"], + "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), + }], + "temperature": 0.2, + "cache_seed": None, +} + +researcher = ConversableAgent( + name="researcher", + system_message=( + "You are a payment risk analyst. Investigate the payment recipient using available " + "tools: check their Fetch.ai address history, reputation, and any known flags. " + "Produce a concise risk assessment with a clear recommendation (proceed / do not proceed). " + "End your assessment with ASSESSMENT COMPLETE." + ), + llm_config=llm_config, + is_termination_msg=lambda m: "ASSESSMENT COMPLETE" in (m.get("content") or ""), +) + +executor = ConversableAgent( + name="payment_executor", + system_message=( + "You handle payment execution. Present the researcher's risk assessment clearly, " + "state the exact payment details (recipient, amount, reason), then ask the human " + "to confirm. If the human approves, call the skyfire_send tool to execute the payment. " + "If the human declines, acknowledge and terminate. End with TERMINATE." + ), + llm_config=llm_config, + human_input_mode="ALWAYS", # pauses before every response — the human types yes/no + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), +) + + +def run_payment_approval(recipient: str, amount: float, reason: str) -> None: + researcher.initiate_chat( + executor, + message=( + f"Payment request: {amount} USDC to {recipient} — reason: '{reason}'. " + f"Investigate the recipient and produce a risk assessment." + ), + max_turns=6, + ) + + +if __name__ == "__main__": + run_payment_approval( + recipient="alice.fetch", + amount=50.0, + reason="research report delivery", + ) diff --git a/ag2-agents/payment-approval/requirements.txt b/ag2-agents/payment-approval/requirements.txt new file mode 100644 index 0000000..3a9e8f6 --- /dev/null +++ b/ag2-agents/payment-approval/requirements.txt @@ -0,0 +1,2 @@ +ag2[openai]>=0.11.0 +python-dotenv>=1.0.0 diff --git a/ag2-agents/payment-approval/tests/conftest.py b/ag2-agents/payment-approval/tests/conftest.py new file mode 100644 index 0000000..c6e2cd2 --- /dev/null +++ b/ag2-agents/payment-approval/tests/conftest.py @@ -0,0 +1,9 @@ +""" +Ensure payment-approval/ is on sys.path so 'import main' works in tests. +""" +import sys +import os + +parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if parent not in sys.path: + sys.path.insert(0, parent) diff --git a/ag2-agents/payment-approval/tests/test_payment.py b/ag2-agents/payment-approval/tests/test_payment.py new file mode 100644 index 0000000..24d35e4 --- /dev/null +++ b/ag2-agents/payment-approval/tests/test_payment.py @@ -0,0 +1,23 @@ +import os +os.environ.setdefault("OPENAI_API_KEY", "test-key") + +def test_agents_instantiate(): + """Verify agent setup without initiating a chat.""" + import main as m + assert m.researcher.name == "researcher" + assert m.executor.name == "payment_executor" + +def test_executor_human_input_mode(): + """Executor must have human_input_mode=ALWAYS — the approval gate.""" + import main as m + assert m.executor.human_input_mode == "ALWAYS" + +def test_researcher_termination_condition(): + import main as m + assert m.researcher._is_termination_msg({"content": "Risk: low. ASSESSMENT COMPLETE"}) is True + assert m.researcher._is_termination_msg({"content": "Still investigating..."}) is False + +def test_executor_termination_condition(): + import main as m + assert m.executor._is_termination_msg({"content": "Payment aborted. TERMINATE"}) is True + assert m.executor._is_termination_msg({"content": "Please confirm."}) is False diff --git a/ag2-agents/research-synthesis-team/.env.example b/ag2-agents/research-synthesis-team/.env.example new file mode 100644 index 0000000..80b5151 --- /dev/null +++ b/ag2-agents/research-synthesis-team/.env.example @@ -0,0 +1,9 @@ +OPENAI_API_KEY=your-openai-api-key-here +AGENTVERSE_API_KEY=your-agentverse-api-key-here + +# Optional +LLM_MODEL=gpt-4o-mini +OPENAI_BASE_URL=https://api.openai.com/v1 +AGENT_PORT=8008 +AGENTVERSE_URL=https://agentverse.ai +MCP_SERVER_URL= diff --git a/ag2-agents/research-synthesis-team/Dockerfile b/ag2-agents/research-synthesis-team/Dockerfile new file mode 100644 index 0000000..67da1c8 --- /dev/null +++ b/ag2-agents/research-synthesis-team/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY . . + +RUN apt-get update && apt-get install -y gcc \ + && pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -r requirements.txt \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8008 +CMD ["python", "main.py"] diff --git a/ag2-agents/research-synthesis-team/README.md b/ag2-agents/research-synthesis-team/README.md new file mode 100644 index 0000000..fe89bbd --- /dev/null +++ b/ag2-agents/research-synthesis-team/README.md @@ -0,0 +1,35 @@ +# AG2 Research Synthesis Team + +A multi-agent research pipeline using [AG2](https://github.com/ag2ai/ag2) (formerly AutoGen) +integrated with the Fetch.ai uAgents ecosystem via the A2A protocol. + +## Architecture + +Four specialists collaborate under GroupChat with LLM-driven speaker selection, wrapped as +an A2A executor and exposed as a discoverable agent on Agentverse. + +``` +GroupChat (AG2) +├── web_researcher — searches and gathers information +├── financial_analyst — analyses market and economic aspects +├── tech_analyst — evaluates technical feasibility +└── synthesizer — produces the final report + ↓ +SingleA2AAdapter (Fetch.ai uagents-adapter) + ↓ +Agentverse (discoverable at port 8008) +``` + +## Quick Start + +```bash +pip install -r requirements.txt +cp .env.example .env # add OPENAI_API_KEY and AGENTVERSE_API_KEY +python main.py +``` + +## AG2 Features Demonstrated + +- **`GroupChat` with `speaker_selection_method="auto"`** — LLM-driven dynamic speaker selection +- **Native MCP client** — optional connection to Fetch.ai's MCP gateway for web search tools +- **Pattern B (A2A Outbound)** — same integration pattern as LangChain and Google ADK examples diff --git a/ag2-agents/research-synthesis-team/agent_executor.py b/ag2-agents/research-synthesis-team/agent_executor.py new file mode 100644 index 0000000..2c8c135 --- /dev/null +++ b/ag2-agents/research-synthesis-team/agent_executor.py @@ -0,0 +1,20 @@ +""" +Wraps the AG2 GroupChat workflow as a LangChain-compatible AgentExecutor +for use with SingleA2AAdapter (Pattern B — matches LangChain/ADK examples). +""" +import asyncio +from autogen import LLMConfig +from workflow import run_research + + +class AG2ResearchExecutor: + """Drop-in AgentExecutor interface for SingleA2AAdapter.""" + + def __init__(self, llm_config: LLMConfig, mcp_url: str | None = None): + self.llm_config = llm_config + self.mcp_url = mcp_url + + def invoke(self, inputs: dict) -> dict: + topic = inputs.get("input", "") + result = asyncio.run(run_research(topic, self.llm_config, self.mcp_url)) + return {"output": result} diff --git a/ag2-agents/research-synthesis-team/agents.py b/ag2-agents/research-synthesis-team/agents.py new file mode 100644 index 0000000..3160b3c --- /dev/null +++ b/ag2-agents/research-synthesis-team/agents.py @@ -0,0 +1,43 @@ +""" +AG2 (formerly AutoGen) research synthesis team. +Four specialists collaborate under GroupChat with LLM-driven speaker selection. +""" +from autogen import AssistantAgent, LLMConfig + +def build_agents(llm_config: LLMConfig) -> list[AssistantAgent]: + web_researcher = AssistantAgent( + name="web_researcher", + system_message=( + "You are a research specialist. Search and gather comprehensive information " + "on the assigned topic using available tools. Cite sources clearly." + ), + llm_config=llm_config, + ) + financial_analyst = AssistantAgent( + name="financial_analyst", + system_message=( + "You are a financial analyst. Analyse market data, trends, and economic " + "implications of the research topic. Be quantitative when possible." + ), + llm_config=llm_config, + ) + tech_analyst = AssistantAgent( + name="tech_analyst", + system_message=( + "You are a technology analyst. Evaluate technical aspects, feasibility, " + "and innovation potential of the research topic." + ), + llm_config=llm_config, + ) + synthesizer = AssistantAgent( + name="synthesizer", + system_message=( + "You are a synthesis expert. Once all specialists have contributed, " + "produce a final structured report combining all perspectives. " + "Format as Markdown with sections: Summary, Financial Analysis, " + "Technical Analysis, Conclusions. End with TERMINATE." + ), + llm_config=llm_config, + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + return [web_researcher, financial_analyst, tech_analyst, synthesizer] diff --git a/ag2-agents/research-synthesis-team/main.py b/ag2-agents/research-synthesis-team/main.py new file mode 100644 index 0000000..93d0709 --- /dev/null +++ b/ag2-agents/research-synthesis-team/main.py @@ -0,0 +1,43 @@ +""" +Fetch.ai uAgent exposing the AG2 research team via A2A protocol (Pattern B). +Discoverable on Agentverse; callable from ASI:One or other uAgents. +""" +import os +from dotenv import load_dotenv +from uagents_adapter import SingleA2AAdapter +from autogen import LLMConfig + +from agent_executor import AG2ResearchExecutor + +load_dotenv() + +llm_config = LLMConfig( + config_list=[{ + "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), + "api_key": os.getenv("OPENAI_API_KEY", ""), + "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), + }], + temperature=0.3, + cache_seed=None, +) + +executor = AG2ResearchExecutor( + llm_config=llm_config, + mcp_url=os.getenv("MCP_SERVER_URL"), # optional: Fetch.ai MCP gateway +) + +adapter = SingleA2AAdapter( + agent_executor=executor, + name="AG2 Research Synthesis Team", + description=( + "Multi-agent research team using AG2 (formerly AutoGen). " + "Four specialists (web researcher, financial analyst, tech analyst, synthesizer) " + "collaborate to produce comprehensive research reports on any topic." + ), + port=int(os.getenv("AGENT_PORT", "8008")), + agentverse_url=os.getenv("AGENTVERSE_URL", "https://agentverse.ai"), + mailbox_api_key=os.getenv("AGENTVERSE_API_KEY", ""), +) + +if __name__ == "__main__": + adapter.run() diff --git a/ag2-agents/research-synthesis-team/requirements.txt b/ag2-agents/research-synthesis-team/requirements.txt new file mode 100644 index 0000000..3d4c1c8 --- /dev/null +++ b/ag2-agents/research-synthesis-team/requirements.txt @@ -0,0 +1,5 @@ +ag2[openai,mcp]>=0.11.0 +mcp>=1.0.0 +uagents>=0.20.0 +uagents-adapter>=0.4.0 +python-dotenv>=1.0.0 diff --git a/ag2-agents/research-synthesis-team/tests/conftest.py b/ag2-agents/research-synthesis-team/tests/conftest.py new file mode 100644 index 0000000..37c39a6 --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/conftest.py @@ -0,0 +1,20 @@ +""" +Ensure the project root (research-synthesis-team/) is on sys.path so that +local modules (agents.py, agent_executor.py, workflow.py) are importable. +Also evict the installed 'agents' package (OpenAI Agents SDK) that would +otherwise shadow the local agents.py. +""" +import sys +import os + +parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if parent not in sys.path: + sys.path.insert(0, parent) + +# Evict the installed 'agents' package from sys.modules cache so +# 'from agents import build_agents' finds our local agents.py instead. +for key in list(sys.modules.keys()): + if key == "agents" or key.startswith("agents."): + mod = sys.modules[key] + if hasattr(mod, "__file__") and mod.__file__ and parent not in (mod.__file__ or ""): + del sys.modules[key] diff --git a/ag2-agents/research-synthesis-team/tests/test_agent_executor.py b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py new file mode 100644 index 0000000..1036cb7 --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py @@ -0,0 +1,14 @@ +from unittest.mock import patch, MagicMock +from autogen import LLMConfig + +TEST_LLM = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + +def test_executor_invoke_calls_run_research(): + from agent_executor import AG2ResearchExecutor + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + with patch("agent_executor.asyncio.run", return_value="Mock research report") as mock_run: + result = executor.invoke({"input": "quantum computing"}) + mock_run.assert_called_once() + assert result == {"output": "Mock research report"} diff --git a/ag2-agents/research-synthesis-team/tests/test_agents.py b/ag2-agents/research-synthesis-team/tests/test_agents.py new file mode 100644 index 0000000..7aff81f --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/test_agents.py @@ -0,0 +1,35 @@ +"""Unit tests — no LLM calls, no network, no uAgents runtime.""" +import pytest +import os +os.environ.setdefault("OPENAI_API_KEY", "test-key") +from autogen import LLMConfig + +TEST_LLM_CONFIG = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + +def test_agents_instantiate(): + from agents import build_agents + agents = build_agents(TEST_LLM_CONFIG) + assert len(agents) == 4 + names = [a.name for a in agents] + assert "web_researcher" in names + assert "financial_analyst" in names + assert "tech_analyst" in names + assert "synthesizer" in names + +def test_synthesizer_termination(): + from agents import build_agents + agents = build_agents(TEST_LLM_CONFIG) + synthesizer = next(a for a in agents if a.name == "synthesizer") + assert synthesizer._is_termination_msg({"content": "Report done. TERMINATE"}) is True + assert synthesizer._is_termination_msg({"content": "Still analysing..."}) is False + +def test_executor_instantiates(): + from autogen import UserProxyAgent + executor = UserProxyAgent( + name="executor", human_input_mode="NEVER", + code_execution_config=False, + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + assert executor.name == "executor" diff --git a/ag2-agents/research-synthesis-team/workflow.py b/ag2-agents/research-synthesis-team/workflow.py new file mode 100644 index 0000000..1429bbe --- /dev/null +++ b/ag2-agents/research-synthesis-team/workflow.py @@ -0,0 +1,50 @@ +"""GroupChat orchestration with AG2 native MCP client for tool access.""" +from autogen import GroupChat, GroupChatManager, LLMConfig, UserProxyAgent +from autogen.mcp import create_toolkit +from mcp import ClientSession +from mcp.client.sse import sse_client + +from agents import build_agents + + +async def run_research(topic: str, llm_config: LLMConfig, mcp_url: str | None = None) -> str: + agents = build_agents(llm_config) + executor = UserProxyAgent( + name="executor", + human_input_mode="NEVER", + code_execution_config=False, + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + default_auto_reply="", + ) + + # Optionally connect to an MCP server (e.g. Fetch.ai's Brave/DuckDuckGo MCP) + if mcp_url: + async with ( + sse_client(mcp_url, timeout=60) as (read, write), + ClientSession(read, write) as session, + ): + await session.initialize() + toolkit = await create_toolkit(session=session) + toolkit.register_for_llm(agents[0]) # web_researcher gets the tools + toolkit.register_for_execution(agents[0]) + result = await _run_groupchat(agents, executor, llm_config, topic) + else: + result = await _run_groupchat(agents, executor, llm_config, topic) + + return result + + +async def _run_groupchat(agents, executor, llm_config, topic): + gc = GroupChat( + agents=[executor] + agents, + messages=[], + max_round=16, + speaker_selection_method="auto", + ) + manager = GroupChatManager(groupchat=gc, llm_config=llm_config) + await executor.a_initiate_chat( + manager, + message=f"Research and synthesise a comprehensive report on: {topic}", + ) + reports = [m["content"] for m in gc.messages if m.get("name") == "synthesizer" and m.get("content")] + return reports[-1] if reports else "Research did not complete." From 4d7f38dfd5c3cc58d0daa19cf71d91988e5925c9 Mon Sep 17 00:00:00 2001 From: Vasiliy Radostev Date: Sat, 14 Mar 2026 10:28:06 -0700 Subject: [PATCH 2/4] Addressed feedback --- ag2-agents/payment-approval/main.py | 14 ++-- .../research-synthesis-team/.env.example | 3 + ag2-agents/research-synthesis-team/README.md | 27 ++++-- .../research-synthesis-team/agent_executor.py | 45 ++++++++-- ag2-agents/research-synthesis-team/agents.py | 57 +++++++++++-- ag2-agents/research-synthesis-team/main.py | 4 +- .../research-synthesis-team/requirements.txt | 3 +- .../tests/test_agent_executor.py | 84 +++++++++++++++++-- .../tests/test_agents.py | 16 ++++ .../research-synthesis-team/workflow.py | 22 +++-- 10 files changed, 224 insertions(+), 51 deletions(-) diff --git a/ag2-agents/payment-approval/main.py b/ag2-agents/payment-approval/main.py index 73a0ceb..84dca01 100644 --- a/ag2-agents/payment-approval/main.py +++ b/ag2-agents/payment-approval/main.py @@ -11,19 +11,19 @@ """ import os from dotenv import load_dotenv -from autogen import ConversableAgent +from autogen import ConversableAgent, LLMConfig load_dotenv() -llm_config = { - "config_list": [{ +llm_config = LLMConfig( + { "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), "api_key": os.environ["OPENAI_API_KEY"], "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), - }], - "temperature": 0.2, - "cache_seed": None, -} + }, + temperature=0.2, + cache_seed=None, +) researcher = ConversableAgent( name="researcher", diff --git a/ag2-agents/research-synthesis-team/.env.example b/ag2-agents/research-synthesis-team/.env.example index 80b5151..e976385 100644 --- a/ag2-agents/research-synthesis-team/.env.example +++ b/ag2-agents/research-synthesis-team/.env.example @@ -6,4 +6,7 @@ LLM_MODEL=gpt-4o-mini OPENAI_BASE_URL=https://api.openai.com/v1 AGENT_PORT=8008 AGENTVERSE_URL=https://agentverse.ai + +# Leave empty to use DuckDuckGo search (default, no API key needed). +# Set to a Fetch.ai MCP gateway URL to use MCP tools instead. MCP_SERVER_URL= diff --git a/ag2-agents/research-synthesis-team/README.md b/ag2-agents/research-synthesis-team/README.md index fe89bbd..7b4ec73 100644 --- a/ag2-agents/research-synthesis-team/README.md +++ b/ag2-agents/research-synthesis-team/README.md @@ -1,5 +1,7 @@ # AG2 Research Synthesis Team +![ag2](https://img.shields.io/badge/ag2-00ADD8) ![uagents](https://img.shields.io/badge/uagents-4A90E2) ![a2a](https://img.shields.io/badge/a2a-000000) ![innovationlab](https://img.shields.io/badge/innovationlab-3D8BD3) + A multi-agent research pipeline using [AG2](https://github.com/ag2ai/ag2) (formerly AutoGen) integrated with the Fetch.ai uAgents ecosystem via the A2A protocol. @@ -9,15 +11,17 @@ Four specialists collaborate under GroupChat with LLM-driven speaker selection, an A2A executor and exposed as a discoverable agent on Agentverse. ``` -GroupChat (AG2) -├── web_researcher — searches and gathers information -├── financial_analyst — analyses market and economic aspects -├── tech_analyst — evaluates technical feasibility -└── synthesizer — produces the final report +User / ASI:One / other uAgent + ↓ +SingleA2AAdapter (port 8008) → Agentverse ↓ -SingleA2AAdapter (Fetch.ai uagents-adapter) +AG2ResearchExecutor (A2A AgentExecutor) ↓ -Agentverse (discoverable at port 8008) +GroupChat (AG2) +├── web_researcher — DuckDuckGo search, gathers sources +├── financial_analyst — market data, metrics, trends +├── tech_analyst — technical feasibility, risks +└── synthesizer — final structured report ``` ## Quick Start @@ -28,8 +32,13 @@ cp .env.example .env # add OPENAI_API_KEY and AGENTVERSE_API_KEY python main.py ``` +No additional API keys needed for search — DuckDuckGo is used by default. + +To use a Fetch.ai MCP gateway instead, set `MCP_SERVER_URL` in `.env`. + ## AG2 Features Demonstrated - **`GroupChat` with `speaker_selection_method="auto"`** — LLM-driven dynamic speaker selection -- **Native MCP client** — optional connection to Fetch.ai's MCP gateway for web search tools -- **Pattern B (A2A Outbound)** — same integration pattern as LangChain and Google ADK examples +- **`DuckDuckGoSearchTool`** — built-in web search, no API key required +- **Native MCP client** — optional override via `MCP_SERVER_URL` for richer tool access +- **`A2A AgentExecutor`** — same integration pattern used by other examples in this repo diff --git a/ag2-agents/research-synthesis-team/agent_executor.py b/ag2-agents/research-synthesis-team/agent_executor.py index 2c8c135..2a39f79 100644 --- a/ag2-agents/research-synthesis-team/agent_executor.py +++ b/ag2-agents/research-synthesis-team/agent_executor.py @@ -1,20 +1,47 @@ """ -Wraps the AG2 GroupChat workflow as a LangChain-compatible AgentExecutor -for use with SingleA2AAdapter (Pattern B — matches LangChain/ADK examples). +Wraps the AG2 GroupChat workflow as an A2A AgentExecutor +for use with SingleA2AAdapter (Pattern B). """ -import asyncio +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.types import Part, TextPart +from a2a.utils import new_agent_text_message from autogen import LLMConfig +from typing_extensions import override + from workflow import run_research -class AG2ResearchExecutor: - """Drop-in AgentExecutor interface for SingleA2AAdapter.""" +class AG2ResearchExecutor(AgentExecutor): + """A2A-compatible executor wrapping the AG2 GroupChat workflow.""" def __init__(self, llm_config: LLMConfig, mcp_url: str | None = None): self.llm_config = llm_config self.mcp_url = mcp_url - def invoke(self, inputs: dict) -> dict: - topic = inputs.get("input", "") - result = asyncio.run(run_research(topic, self.llm_config, self.mcp_url)) - return {"output": result} + @override + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + message_content = "" + for part in context.message.parts: + if isinstance(part, Part): + if isinstance(part.root, TextPart): + message_content = part.root.text + break + + if not message_content: + await event_queue.enqueue_event( + new_agent_text_message("Error: No message content received.") + ) + return + + try: + result = await run_research(message_content, self.llm_config, self.mcp_url) + await event_queue.enqueue_event(new_agent_text_message(result)) + except Exception as e: + await event_queue.enqueue_event( + new_agent_text_message(f"Research failed: {e}") + ) + + @override + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("Cancel not supported for this agent executor.") diff --git a/ag2-agents/research-synthesis-team/agents.py b/ag2-agents/research-synthesis-team/agents.py index 3160b3c..dc097a8 100644 --- a/ag2-agents/research-synthesis-team/agents.py +++ b/ag2-agents/research-synthesis-team/agents.py @@ -4,12 +4,22 @@ """ from autogen import AssistantAgent, LLMConfig + def build_agents(llm_config: LLMConfig) -> list[AssistantAgent]: web_researcher = AssistantAgent( name="web_researcher", system_message=( - "You are a research specialist. Search and gather comprehensive information " - "on the assigned topic using available tools. Cite sources clearly." + "You are a web research specialist with access to search tools.\n\n" + "WORKFLOW:\n" + "1. Use the search tool to find relevant sources on the assigned topic.\n" + "2. Run at least 2-3 different search queries to cover the topic broadly.\n" + "3. Compile your findings into a structured response.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a markdown table of sources found:\n" + " | # | Title | URL | Key Finding |\n" + "- Minimum 5 sources with direct links.\n" + "- Below the table, write a 200-word summary of the most important findings.\n" + "- Flag any conflicting information between sources." ), llm_config=llm_config, ) @@ -17,15 +27,31 @@ def build_agents(llm_config: LLMConfig) -> list[AssistantAgent]: name="financial_analyst", system_message=( "You are a financial analyst. Analyse market data, trends, and economic " - "implications of the research topic. Be quantitative when possible." + "implications of the research topic using the sources provided by the " + "web researcher.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a financial summary table:\n" + " | Metric | Value | Source | Trend |\n" + "- Include: market size, growth rate, key players, funding, and revenue " + " figures where available. Use 'N/A' when data is unavailable.\n" + "- Write a 150-word analysis of financial risks and opportunities.\n" + "- Be quantitative — use numbers, percentages, and dollar amounts." ), llm_config=llm_config, ) tech_analyst = AssistantAgent( name="tech_analyst", system_message=( - "You are a technology analyst. Evaluate technical aspects, feasibility, " - "and innovation potential of the research topic." + "You are a technology analyst. Evaluate the technical landscape of the " + "research topic using the sources provided by the web researcher.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a technology assessment table:\n" + " | Technology/Component | Maturity | Adoption | Risk Level | Notes |\n" + "- Maturity values: Emerging, Growing, Mature, Declining.\n" + "- Risk Level: Low, Medium, High.\n" + "- Write a 150-word analysis covering: technical feasibility, innovation " + " potential, and key technical challenges.\n" + "- Identify the top 3 technical risks with mitigation strategies." ), llm_config=llm_config, ) @@ -33,9 +59,24 @@ def build_agents(llm_config: LLMConfig) -> list[AssistantAgent]: name="synthesizer", system_message=( "You are a synthesis expert. Once all specialists have contributed, " - "produce a final structured report combining all perspectives. " - "Format as Markdown with sections: Summary, Financial Analysis, " - "Technical Analysis, Conclusions. End with TERMINATE." + "produce a final structured report combining all perspectives.\n\n" + "MANDATORY REPORT STRUCTURE:\n" + "## Executive Summary\n" + "3-5 bullet points covering the most important findings.\n\n" + "## Research Findings\n" + "Key sources and discoveries from the web researcher.\n\n" + "## Financial Analysis\n" + "Market data, financial metrics, and economic outlook.\n\n" + "## Technical Analysis\n" + "Technology landscape, maturity assessment, and risks.\n\n" + "## Conclusions & Recommendations\n" + "3-5 actionable recommendations ranked by priority.\n\n" + "## Sources\n" + "Consolidated list of all sources cited, as [Title](URL).\n\n" + "RULES:\n" + "- Do NOT repeat raw data from specialists — synthesize and add insight.\n" + "- Total report length: 500-800 words.\n" + "- End your response with TERMINATE." ), llm_config=llm_config, is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), diff --git a/ag2-agents/research-synthesis-team/main.py b/ag2-agents/research-synthesis-team/main.py index 93d0709..2122bf7 100644 --- a/ag2-agents/research-synthesis-team/main.py +++ b/ag2-agents/research-synthesis-team/main.py @@ -12,11 +12,11 @@ load_dotenv() llm_config = LLMConfig( - config_list=[{ + { "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), "api_key": os.getenv("OPENAI_API_KEY", ""), "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), - }], + }, temperature=0.3, cache_seed=None, ) diff --git a/ag2-agents/research-synthesis-team/requirements.txt b/ag2-agents/research-synthesis-team/requirements.txt index 3d4c1c8..8ee46db 100644 --- a/ag2-agents/research-synthesis-team/requirements.txt +++ b/ag2-agents/research-synthesis-team/requirements.txt @@ -1,4 +1,5 @@ -ag2[openai,mcp]>=0.11.0 +ag2[openai,mcp,duckduckgo]>=0.11.0 +a2a-sdk mcp>=1.0.0 uagents>=0.20.0 uagents-adapter>=0.4.0 diff --git a/ag2-agents/research-synthesis-team/tests/test_agent_executor.py b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py index 1036cb7..eef213f 100644 --- a/ag2-agents/research-synthesis-team/tests/test_agent_executor.py +++ b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py @@ -1,14 +1,84 @@ -from unittest.mock import patch, MagicMock +"""Unit tests for AG2ResearchExecutor — no LLM calls, no network.""" +import pytest +from unittest.mock import AsyncMock, MagicMock, patch from autogen import LLMConfig +from agent_executor import AG2ResearchExecutor + TEST_LLM = LLMConfig( {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} ) -def test_executor_invoke_calls_run_research(): - from agent_executor import AG2ResearchExecutor + +def _make_context(text: str): + """Build a minimal RequestContext-like object with a text message.""" + text_part = MagicMock() + text_part.root = MagicMock() + text_part.root.text = text + + part_cls = type(text_part) + text_part_cls = type(text_part.root) + + message = MagicMock() + message.parts = [text_part] + ctx = MagicMock() + ctx.message = message + return ctx, part_cls, text_part_cls + + +@pytest.mark.asyncio +async def test_execute_calls_run_research(): + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context("quantum computing") + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_research", new_callable=AsyncMock, return_value="Mock report") as mock_run, \ + patch("agent_executor.new_agent_text_message", return_value="event") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_run.assert_called_once_with("quantum computing", TEST_LLM, None) + mock_msg.assert_called_once_with("Mock report") + event_queue.enqueue_event.assert_called_once_with("event") + + +@pytest.mark.asyncio +async def test_execute_empty_message(): + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + message = MagicMock() + message.parts = [] + ctx = MagicMock() + ctx.message = message + event_queue = AsyncMock() + + with patch("agent_executor.Part", MagicMock), \ + patch("agent_executor.TextPart", MagicMock), \ + patch("agent_executor.new_agent_text_message", return_value="err_event") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Error: No message content received.") + event_queue.enqueue_event.assert_called_once_with("err_event") + + +@pytest.mark.asyncio +async def test_execute_handles_research_error(): executor = AG2ResearchExecutor(llm_config=TEST_LLM) - with patch("agent_executor.asyncio.run", return_value="Mock research report") as mock_run: - result = executor.invoke({"input": "quantum computing"}) - mock_run.assert_called_once() - assert result == {"output": "Mock research report"} + ctx, part_cls, text_part_cls = _make_context("bad topic") + event_queue = AsyncMock() + + failing_research = AsyncMock(side_effect=RuntimeError("LLM timeout")) + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_research", failing_research), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Research failed: LLM timeout") + event_queue.enqueue_event.assert_called_once_with("err") + + +def test_executor_inherits_agent_executor(): + """AG2ResearchExecutor must inherit from the A2A AgentExecutor base class.""" + from a2a.server.agent_execution import AgentExecutor + assert issubclass(AG2ResearchExecutor, AgentExecutor) diff --git a/ag2-agents/research-synthesis-team/tests/test_agents.py b/ag2-agents/research-synthesis-team/tests/test_agents.py index 7aff81f..d08da60 100644 --- a/ag2-agents/research-synthesis-team/tests/test_agents.py +++ b/ag2-agents/research-synthesis-team/tests/test_agents.py @@ -33,3 +33,19 @@ def test_executor_instantiates(): is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), ) assert executor.name == "executor" + + +def test_llmconfig_positional_dict_construction(): + """Validate the positional-dict LLMConfig used in main.py works with AG2 0.11+.""" + cfg = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test-key", "base_url": "https://api.openai.com/v1"}, + temperature=0.3, + cache_seed=None, + ) + from autogen import AssistantAgent + agent = AssistantAgent( + name="llm_config_test", + system_message="test", + llm_config=cfg, + ) + assert agent.name == "llm_config_test" diff --git a/ag2-agents/research-synthesis-team/workflow.py b/ag2-agents/research-synthesis-team/workflow.py index 1429bbe..392b8ec 100644 --- a/ag2-agents/research-synthesis-team/workflow.py +++ b/ag2-agents/research-synthesis-team/workflow.py @@ -1,8 +1,6 @@ -"""GroupChat orchestration with AG2 native MCP client for tool access.""" +"""GroupChat orchestration with DuckDuckGo search (default) or MCP tools.""" from autogen import GroupChat, GroupChatManager, LLMConfig, UserProxyAgent -from autogen.mcp import create_toolkit -from mcp import ClientSession -from mcp.client.sse import sse_client +from autogen.tools.experimental import DuckDuckGoSearchTool from agents import build_agents @@ -12,23 +10,31 @@ async def run_research(topic: str, llm_config: LLMConfig, mcp_url: str | None = executor = UserProxyAgent( name="executor", human_input_mode="NEVER", - code_execution_config=False, + code_execution_config=False, # tools use register_for_execution, not code exec is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), default_auto_reply="", ) - # Optionally connect to an MCP server (e.g. Fetch.ai's Brave/DuckDuckGo MCP) if mcp_url: + # Override: use MCP server tools (e.g. Fetch.ai's MCP gateway) + from autogen.mcp import create_toolkit + from mcp import ClientSession + from mcp.client.sse import sse_client + async with ( sse_client(mcp_url, timeout=60) as (read, write), ClientSession(read, write) as session, ): await session.initialize() toolkit = await create_toolkit(session=session) - toolkit.register_for_llm(agents[0]) # web_researcher gets the tools - toolkit.register_for_execution(agents[0]) + toolkit.register_for_llm(agents[0]) + toolkit.register_for_execution(executor) result = await _run_groupchat(agents, executor, llm_config, topic) else: + # Default: DuckDuckGo search — no API key required + search = DuckDuckGoSearchTool(num_results=5) + search.register_for_llm(agents[0]) # web_researcher can request searches + search.register_for_execution(executor) # executor runs them result = await _run_groupchat(agents, executor, llm_config, topic) return result From 57a5f63e0b7c4e83edee01a47a41eb33e8ce728a Mon Sep 17 00:00:00 2001 From: Vasiliy Radostev Date: Sat, 14 Mar 2026 10:42:25 -0700 Subject: [PATCH 3/4] Addressed feedback --- ag2-agents/payment-approval/main.py | 2 +- ag2-agents/payment-approval/tests/test_payment.py | 9 ++++++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/ag2-agents/payment-approval/main.py b/ag2-agents/payment-approval/main.py index 84dca01..d5e9d84 100644 --- a/ag2-agents/payment-approval/main.py +++ b/ag2-agents/payment-approval/main.py @@ -34,7 +34,7 @@ "End your assessment with ASSESSMENT COMPLETE." ), llm_config=llm_config, - is_termination_msg=lambda m: "ASSESSMENT COMPLETE" in (m.get("content") or ""), + # Conversation ends via max_turns or executor's TERMINATE check. ) executor = ConversableAgent( diff --git a/ag2-agents/payment-approval/tests/test_payment.py b/ag2-agents/payment-approval/tests/test_payment.py index 24d35e4..8b6afab 100644 --- a/ag2-agents/payment-approval/tests/test_payment.py +++ b/ag2-agents/payment-approval/tests/test_payment.py @@ -12,10 +12,13 @@ def test_executor_human_input_mode(): import main as m assert m.executor.human_input_mode == "ALWAYS" -def test_researcher_termination_condition(): +def test_researcher_no_custom_termination(): + """Researcher has no custom is_termination_msg. It produces 'ASSESSMENT COMPLETE' + but is_termination_msg evaluates *received* messages only — a self-termination + check would be dead code. Conversation ends via max_turns or executor's TERMINATE.""" import main as m - assert m.researcher._is_termination_msg({"content": "Risk: low. ASSESSMENT COMPLETE"}) is True - assert m.researcher._is_termination_msg({"content": "Still investigating..."}) is False + # Default termination returns False for everything + assert m.researcher._is_termination_msg({"content": "ASSESSMENT COMPLETE"}) is False def test_executor_termination_condition(): import main as m From 2542d831971f863e05a10d49e3a161f4057ac275 Mon Sep 17 00:00:00 2001 From: Vasiliy Radostev Date: Mon, 16 Mar 2026 16:44:23 -0700 Subject: [PATCH 4/4] Address reviewer feedback and integrate payment protocol --- ag2-agents/payment-approval/.env.example | 3 + ag2-agents/payment-approval/README.md | 48 +++-- ag2-agents/payment-approval/agent_executor.py | 79 ++++++++ ag2-agents/payment-approval/agents.py | 36 ++++ ag2-agents/payment-approval/main.py | 81 ++++---- ag2-agents/payment-approval/requirements.txt | 3 + ag2-agents/payment-approval/tests/conftest.py | 10 +- .../payment-approval/tests/test_payment.py | 190 ++++++++++++++++-- ag2-agents/payment-approval/workflow.py | 35 ++++ .../research-synthesis-team/.env.example | 4 +- ag2-agents/research-synthesis-team/README.md | 6 +- ag2-agents/research-synthesis-team/main.py | 24 ++- .../tests/test_agents.py | 32 +++ .../research-synthesis-team/workflow.py | 2 +- 14 files changed, 464 insertions(+), 89 deletions(-) create mode 100644 ag2-agents/payment-approval/agent_executor.py create mode 100644 ag2-agents/payment-approval/agents.py create mode 100644 ag2-agents/payment-approval/workflow.py diff --git a/ag2-agents/payment-approval/.env.example b/ag2-agents/payment-approval/.env.example index 3325196..ccea3d7 100644 --- a/ag2-agents/payment-approval/.env.example +++ b/ag2-agents/payment-approval/.env.example @@ -3,3 +3,6 @@ OPENAI_API_KEY=your-openai-api-key-here # Optional LLM_MODEL=gpt-4o-mini OPENAI_BASE_URL=https://api.openai.com/v1 +AGENT_PORT=8009 +A2A_PORT=9998 +AGENT_SEED= diff --git a/ag2-agents/payment-approval/README.md b/ag2-agents/payment-approval/README.md index 916b2bc..4aa5b67 100644 --- a/ag2-agents/payment-approval/README.md +++ b/ag2-agents/payment-approval/README.md @@ -1,26 +1,46 @@ -# AG2 Two-Agent Payment Approval +# AG2 Payment Approval Agent -Demonstrates AG2's `human_input_mode="ALWAYS"` pattern as an approval gate -before triggering a Skyfire payment — the first example in this repo requiring -explicit user confirmation before a financial action. +![ag2](https://img.shields.io/badge/ag2-00ADD8) ![uagents](https://img.shields.io/badge/uagents-4A90E2) ![a2a](https://img.shields.io/badge/a2a-000000) ![innovationlab](https://img.shields.io/badge/innovationlab-3D8BD3) -Two agents collaborate: -- **researcher** — investigates the recipient and produces a risk assessment -- **payment_executor** — presents the assessment, pauses for human confirmation, - then executes or aborts based on the response +A two-agent payment approval workflow using [AG2](https://github.com/ag2ai/ag2) (formerly AutoGen) +integrated with the Fetch.ai uAgents ecosystem via the A2A protocol and payment protocol. -## Key AG2 Features +## Architecture -- **`human_input_mode="ALWAYS"`** — executor pauses before every response; human - types "yes" to proceed or "no" to abort — no custom routing logic needed -- **Two-agent `initiate_chat`** — researcher hands off to executor via the - natural conversation flow; the shared message history carries the assessment +``` +Buyer agent / ASI:One / other uAgent + ↓ +SingleA2AAdapter (port 8009) → Agentverse + ↓ +PaymentApprovalExecutor (A2A AgentExecutor) + ↓ +AG2 Two-Agent Chat +├── researcher — investigates recipient, produces risk assessment +└── payment_executor — formats assessment, returns APPROVED/REJECTED verdict + ↓ +Payment Protocol (via adapter) +├── APPROVED → buyer receives RequestPayment → CommitPayment / RejectPayment +└── REJECTED → buyer receives assessment text explaining why +``` + +## Prerequisites + +- **Python 3.10–3.13** (uagents depends on Pydantic v1, which is incompatible with Python 3.14+) ## Quick Start ```bash cd ag2-agents/payment-approval pip install -r requirements.txt -cp .env.example .env +cp .env.example .env # add OPENAI_API_KEY python main.py ``` + +## AG2 Features Demonstrated + +- **Two-agent `initiate_chat`** — researcher hands off to executor via the natural + conversation flow; the shared message history carries the assessment +- **`A2A AgentExecutor`** — same integration pattern used by other examples in this repo +- **Payment protocol integration** — `SingleA2AAdapter` bridges the assessment result + into Fetch.ai's `AgentPaymentProtocol`, enabling buyer agents to commit or reject + payments via standard protocol messages diff --git a/ag2-agents/payment-approval/agent_executor.py b/ag2-agents/payment-approval/agent_executor.py new file mode 100644 index 0000000..46864ff --- /dev/null +++ b/ag2-agents/payment-approval/agent_executor.py @@ -0,0 +1,79 @@ +""" +Wraps the AG2 payment assessment workflow as an A2A AgentExecutor +for use with SingleA2AAdapter. +""" +import re + +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.types import Part, TextPart +from a2a.utils import new_agent_text_message +from autogen import LLMConfig +from typing_extensions import override + +from workflow import run_payment_assessment + +# Pattern: "50 USDC to alice.fetch — reason: 'research report delivery'" +_PAYMENT_RE = re.compile( + r"(?P[\d.]+)\s+(?P\w+)\s+to\s+(?P\S+)" + r"(?:\s*[—\-]+\s*reason:\s*['\"]?(?P[^'\"]+)['\"]?)?", + re.IGNORECASE, +) + + +def _parse_payment_request(text: str) -> tuple[str, float, str, str]: + """Extract (recipient, amount, reason, currency) from free-text message. + + Falls back to passing the entire message as the reason if parsing fails. + """ + m = _PAYMENT_RE.search(text) + if m: + return ( + m.group("recipient"), + float(m.group("amount")), + m.group("reason") or "not specified", + m.group("currency"), + ) + # Fallback: let the LLM agents figure it out + return ("unknown", 0.0, text, "USDC") + + +class PaymentApprovalExecutor(AgentExecutor): + """A2A-compatible executor wrapping the AG2 payment assessment workflow.""" + + def __init__(self, llm_config: LLMConfig): + self.llm_config = llm_config + + @override + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + message_content = "" + for part in context.message.parts: + if isinstance(part, Part): + if isinstance(part.root, TextPart): + message_content = part.root.text + break + + if not message_content: + await event_queue.enqueue_event( + new_agent_text_message("Error: No message content received.") + ) + return + + try: + recipient, amount, reason, currency = _parse_payment_request(message_content) + result = await run_payment_assessment( + recipient=recipient, + amount=amount, + reason=reason, + llm_config=self.llm_config, + currency=currency, + ) + await event_queue.enqueue_event(new_agent_text_message(result)) + except Exception as e: + await event_queue.enqueue_event( + new_agent_text_message(f"Payment assessment failed: {e}") + ) + + @override + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("Cancel not supported for this agent executor.") diff --git a/ag2-agents/payment-approval/agents.py b/ag2-agents/payment-approval/agents.py new file mode 100644 index 0000000..46a8289 --- /dev/null +++ b/ag2-agents/payment-approval/agents.py @@ -0,0 +1,36 @@ +""" +AG2 payment approval agents. + +researcher — investigates recipient, produces risk assessment +executor — formats assessment, decides proceed/reject (no human stdin in uAgent mode) +""" +from autogen import ConversableAgent, LLMConfig + + +def build_agents(llm_config: LLMConfig) -> tuple[ConversableAgent, ConversableAgent]: + researcher = ConversableAgent( + name="researcher", + system_message=( + "You are a payment risk analyst. Investigate the payment recipient using available " + "tools: check their Fetch.ai address history, reputation, and any known flags. " + "Produce a concise risk assessment with a clear recommendation (proceed / do not proceed). " + "End your assessment with ASSESSMENT COMPLETE." + ), + llm_config=llm_config, + ) + + executor = ConversableAgent( + name="payment_executor", + system_message=( + "You handle payment execution. Present the researcher's risk assessment clearly, " + "state the exact payment details (recipient, amount, reason), then give a final " + "verdict: APPROVED or REJECTED based on the risk assessment.\n\n" + "If approved, end with: PAYMENT APPROVED. TERMINATE\n" + "If rejected, end with: PAYMENT REJECTED. TERMINATE" + ), + llm_config=llm_config, + human_input_mode="NEVER", + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + + return researcher, executor diff --git a/ag2-agents/payment-approval/main.py b/ag2-agents/payment-approval/main.py index d5e9d84..b02a253 100644 --- a/ag2-agents/payment-approval/main.py +++ b/ag2-agents/payment-approval/main.py @@ -1,70 +1,57 @@ """ -AG2 two-agent payment approval with human-in-the-loop gate. +AG2 two-agent payment approval workflow, exposed via A2A protocol. -researcher — investigates the recipient and produces a risk assessment -executor — presents the assessment and waits for explicit human confirmation - before executing the Skyfire payment +On each request, two AG2 ConversableAgents (researcher → payment_executor) +are created in agents.py and orchestrated by workflow.py to produce a risk +assessment. The result is served through SingleA2AAdapter, making the AG2 +workflow discoverable on Agentverse and callable from ASI:One or other +agents in the ecosystem. -human_input_mode="ALWAYS" on executor is the approval gate: the agent -pauses before every response so the human can type "yes" to proceed or -"no" to abort. No custom routing logic required. +Requires Python ≤3.13 (uagents depends on Pydantic v1, incompatible with 3.14+). """ +import sys + +if sys.version_info >= (3, 14): + raise RuntimeError( + "uagents requires Python ≤3.13 (Pydantic v1 is incompatible with 3.14+). " + "Please use Python 3.10–3.13." + ) + import os from dotenv import load_dotenv -from autogen import ConversableAgent, LLMConfig +from uagents_adapter import SingleA2AAdapter +from autogen import LLMConfig + +from agent_executor import PaymentApprovalExecutor load_dotenv() llm_config = LLMConfig( { "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), - "api_key": os.environ["OPENAI_API_KEY"], + "api_key": os.getenv("OPENAI_API_KEY", ""), "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), }, temperature=0.2, cache_seed=None, ) -researcher = ConversableAgent( - name="researcher", - system_message=( - "You are a payment risk analyst. Investigate the payment recipient using available " - "tools: check their Fetch.ai address history, reputation, and any known flags. " - "Produce a concise risk assessment with a clear recommendation (proceed / do not proceed). " - "End your assessment with ASSESSMENT COMPLETE." - ), - llm_config=llm_config, - # Conversation ends via max_turns or executor's TERMINATE check. -) +executor = PaymentApprovalExecutor(llm_config=llm_config) -executor = ConversableAgent( - name="payment_executor", - system_message=( - "You handle payment execution. Present the researcher's risk assessment clearly, " - "state the exact payment details (recipient, amount, reason), then ask the human " - "to confirm. If the human approves, call the skyfire_send tool to execute the payment. " - "If the human declines, acknowledge and terminate. End with TERMINATE." +adapter = SingleA2AAdapter( + agent_executor=executor, + name="AG2 Payment Approval Agent", + description=( + "Two-agent payment approval workflow using AG2 (formerly AutoGen). " + "A researcher investigates the recipient and a payment executor " + "produces a risk assessment with an APPROVED/REJECTED verdict. " + "Supports Fetch.ai payment protocol for commit/reject flows." ), - llm_config=llm_config, - human_input_mode="ALWAYS", # pauses before every response — the human types yes/no - is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + port=int(os.getenv("AGENT_PORT", "8009")), + a2a_port=int(os.getenv("A2A_PORT", "9998")), + mailbox=True, + seed=os.getenv("AGENT_SEED"), ) - -def run_payment_approval(recipient: str, amount: float, reason: str) -> None: - researcher.initiate_chat( - executor, - message=( - f"Payment request: {amount} USDC to {recipient} — reason: '{reason}'. " - f"Investigate the recipient and produce a risk assessment." - ), - max_turns=6, - ) - - if __name__ == "__main__": - run_payment_approval( - recipient="alice.fetch", - amount=50.0, - reason="research report delivery", - ) + adapter.run() diff --git a/ag2-agents/payment-approval/requirements.txt b/ag2-agents/payment-approval/requirements.txt index 3a9e8f6..7ac080f 100644 --- a/ag2-agents/payment-approval/requirements.txt +++ b/ag2-agents/payment-approval/requirements.txt @@ -1,2 +1,5 @@ ag2[openai]>=0.11.0 +a2a-sdk +uagents>=0.20.0 +uagents-adapter>=0.4.0 python-dotenv>=1.0.0 diff --git a/ag2-agents/payment-approval/tests/conftest.py b/ag2-agents/payment-approval/tests/conftest.py index c6e2cd2..96f7b9c 100644 --- a/ag2-agents/payment-approval/tests/conftest.py +++ b/ag2-agents/payment-approval/tests/conftest.py @@ -1,5 +1,5 @@ """ -Ensure payment-approval/ is on sys.path so 'import main' works in tests. +Ensure payment-approval/ is on sys.path so local modules are importable. """ import sys import os @@ -7,3 +7,11 @@ parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if parent not in sys.path: sys.path.insert(0, parent) + +# Evict the installed 'agents' package (OpenAI Agents SDK) from sys.modules +# so 'from agents import build_agents' finds our local agents.py instead. +for key in list(sys.modules.keys()): + if key == "agents" or key.startswith("agents."): + mod = sys.modules[key] + if hasattr(mod, "__file__") and mod.__file__ and parent not in (mod.__file__ or ""): + del sys.modules[key] diff --git a/ag2-agents/payment-approval/tests/test_payment.py b/ag2-agents/payment-approval/tests/test_payment.py index 8b6afab..65acfa5 100644 --- a/ag2-agents/payment-approval/tests/test_payment.py +++ b/ag2-agents/payment-approval/tests/test_payment.py @@ -1,26 +1,178 @@ +"""Unit tests for payment-approval — no LLM calls, no network, no uAgents runtime.""" import os +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + os.environ.setdefault("OPENAI_API_KEY", "test-key") +from autogen import LLMConfig + +TEST_LLM = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + + +# ── Agent tests ────────────────────────────────────────────────────── + def test_agents_instantiate(): - """Verify agent setup without initiating a chat.""" - import main as m - assert m.researcher.name == "researcher" - assert m.executor.name == "payment_executor" + from agents import build_agents + researcher, executor = build_agents(TEST_LLM) + assert researcher.name == "researcher" + assert executor.name == "payment_executor" + + +def test_executor_terminates_on_keyword(): + from agents import build_agents + _, executor = build_agents(TEST_LLM) + assert executor._is_termination_msg({"content": "PAYMENT APPROVED. TERMINATE"}) is True + assert executor._is_termination_msg({"content": "PAYMENT REJECTED. TERMINATE"}) is True + assert executor._is_termination_msg({"content": "Still investigating."}) is False + + +def test_executor_human_input_mode_is_never(): + """In uAgent mode, human_input_mode is NEVER — payment protocol handles approval.""" + from agents import build_agents + _, executor = build_agents(TEST_LLM) + assert executor.human_input_mode == "NEVER" -def test_executor_human_input_mode(): - """Executor must have human_input_mode=ALWAYS — the approval gate.""" - import main as m - assert m.executor.human_input_mode == "ALWAYS" def test_researcher_no_custom_termination(): - """Researcher has no custom is_termination_msg. It produces 'ASSESSMENT COMPLETE' - but is_termination_msg evaluates *received* messages only — a self-termination - check would be dead code. Conversation ends via max_turns or executor's TERMINATE.""" - import main as m - # Default termination returns False for everything - assert m.researcher._is_termination_msg({"content": "ASSESSMENT COMPLETE"}) is False - -def test_executor_termination_condition(): - import main as m - assert m.executor._is_termination_msg({"content": "Payment aborted. TERMINATE"}) is True - assert m.executor._is_termination_msg({"content": "Please confirm."}) is False + from agents import build_agents + researcher, _ = build_agents(TEST_LLM) + assert researcher._is_termination_msg({"content": "ASSESSMENT COMPLETE"}) is False + + +# ── Parser tests ───────────────────────────────────────────────────── + +def test_parse_payment_request_full(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Pay 50.5 USDC to alice.fetch — reason: 'research report delivery'" + ) + assert recipient == "alice.fetch" + assert amount == 50.5 + assert reason == "research report delivery" + assert currency == "USDC" + + +def test_parse_payment_request_no_reason(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Send 100 FET to bob.fetch" + ) + assert recipient == "bob.fetch" + assert amount == 100.0 + assert currency == "FET" + + +def test_parse_payment_request_fallback(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Please investigate this payment" + ) + assert recipient == "unknown" + assert amount == 0.0 + assert reason == "Please investigate this payment" + + +# ── Executor tests ─────────────────────────────────────────────────── + +def _make_context(text: str): + text_part = MagicMock() + text_part.root = MagicMock() + text_part.root.text = text + part_cls = type(text_part) + text_part_cls = type(text_part.root) + message = MagicMock() + message.parts = [text_part] + ctx = MagicMock() + ctx.message = message + return ctx, part_cls, text_part_cls + + +@pytest.mark.asyncio +async def test_execute_calls_run_payment_assessment(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context( + "Pay 50 USDC to alice.fetch — reason: 'report'" + ) + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_payment_assessment", new_callable=AsyncMock, return_value="APPROVED") as mock_run, \ + patch("agent_executor.new_agent_text_message", return_value="event") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_run.assert_called_once_with( + recipient="alice.fetch", amount=50.0, reason="report", + llm_config=TEST_LLM, currency="USDC", + ) + mock_msg.assert_called_once_with("APPROVED") + event_queue.enqueue_event.assert_called_once_with("event") + + +@pytest.mark.asyncio +async def test_execute_empty_message(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + message = MagicMock() + message.parts = [] + ctx = MagicMock() + ctx.message = message + event_queue = AsyncMock() + + with patch("agent_executor.Part", MagicMock), \ + patch("agent_executor.TextPart", MagicMock), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Error: No message content received.") + + +@pytest.mark.asyncio +async def test_execute_handles_error(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context("Pay 10 USDC to fail.fetch") + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_payment_assessment", new_callable=AsyncMock, side_effect=RuntimeError("LLM timeout")), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Payment assessment failed: LLM timeout") + + +def test_executor_inherits_agent_executor(): + from agent_executor import PaymentApprovalExecutor + from a2a.server.agent_execution import AgentExecutor + assert issubclass(PaymentApprovalExecutor, AgentExecutor) + + +# ── Smoke tests ────────────────────────────────────────────────────── + +def test_all_modules_import(): + """Every module imports without error.""" + import agents + import agent_executor + import workflow + import main + + +def test_adapter_construction(): + """SingleA2AAdapter accepts the params used in main.py.""" + from uagents_adapter import SingleA2AAdapter + adapter = SingleA2AAdapter( + agent_executor=MagicMock(), + name="test", + description="test", + port=8009, + a2a_port=9998, + mailbox=True, + seed="test-seed", + ) + assert adapter.name == "test" diff --git a/ag2-agents/payment-approval/workflow.py b/ag2-agents/payment-approval/workflow.py new file mode 100644 index 0000000..312d414 --- /dev/null +++ b/ag2-agents/payment-approval/workflow.py @@ -0,0 +1,35 @@ +"""Async payment assessment workflow using AG2 two-agent chat.""" +from autogen import LLMConfig + +from agents import build_agents + + +async def run_payment_assessment( + recipient: str, + amount: float, + reason: str, + llm_config: LLMConfig, + currency: str = "USDC", +) -> str: + """Run researcher → executor assessment and return the final verdict. + + Returns the executor's last message containing the risk assessment + summary and APPROVED/REJECTED verdict. + """ + researcher, executor = build_agents(llm_config) + + await researcher.a_initiate_chat( + executor, + message=( + f"Payment request: {amount} {currency} to {recipient} — reason: '{reason}'. " + f"Investigate the recipient and produce a risk assessment." + ), + max_turns=6, + ) + + # Return the executor's final message (contains verdict) + for msg in reversed(executor.chat_messages.get(researcher, [])): + if msg.get("content"): + return msg["content"] + + return "Payment assessment did not complete." diff --git a/ag2-agents/research-synthesis-team/.env.example b/ag2-agents/research-synthesis-team/.env.example index e976385..048eef1 100644 --- a/ag2-agents/research-synthesis-team/.env.example +++ b/ag2-agents/research-synthesis-team/.env.example @@ -1,11 +1,11 @@ OPENAI_API_KEY=your-openai-api-key-here -AGENTVERSE_API_KEY=your-agentverse-api-key-here # Optional LLM_MODEL=gpt-4o-mini OPENAI_BASE_URL=https://api.openai.com/v1 AGENT_PORT=8008 -AGENTVERSE_URL=https://agentverse.ai +A2A_PORT=9999 +AGENT_SEED= # Leave empty to use DuckDuckGo search (default, no API key needed). # Set to a Fetch.ai MCP gateway URL to use MCP tools instead. diff --git a/ag2-agents/research-synthesis-team/README.md b/ag2-agents/research-synthesis-team/README.md index 7b4ec73..0aed46e 100644 --- a/ag2-agents/research-synthesis-team/README.md +++ b/ag2-agents/research-synthesis-team/README.md @@ -24,11 +24,15 @@ GroupChat (AG2) └── synthesizer — final structured report ``` +## Prerequisites + +- **Python 3.10–3.13** (uagents depends on Pydantic v1, which is incompatible with Python 3.14+) + ## Quick Start ```bash pip install -r requirements.txt -cp .env.example .env # add OPENAI_API_KEY and AGENTVERSE_API_KEY +cp .env.example .env # add OPENAI_API_KEY python main.py ``` diff --git a/ag2-agents/research-synthesis-team/main.py b/ag2-agents/research-synthesis-team/main.py index 2122bf7..5d7143a 100644 --- a/ag2-agents/research-synthesis-team/main.py +++ b/ag2-agents/research-synthesis-team/main.py @@ -1,7 +1,22 @@ """ -Fetch.ai uAgent exposing the AG2 research team via A2A protocol (Pattern B). -Discoverable on Agentverse; callable from ASI:One or other uAgents. +AG2 multi-agent research team, exposed via A2A protocol. + +On each request, four AG2 AssistantAgents are created in agents.py and +orchestrated under GroupChat (workflow.py) with LLM-driven speaker selection +to produce structured research reports. The result is served through +SingleA2AAdapter, making the AG2 workflow discoverable on Agentverse and +callable from ASI:One or other agents in the ecosystem. + +Requires Python ≤3.13 (uagents depends on Pydantic v1, incompatible with 3.14+). """ +import sys + +if sys.version_info >= (3, 14): + raise RuntimeError( + "uagents requires Python ≤3.13 (Pydantic v1 is incompatible with 3.14+). " + "Please use Python 3.10–3.13." + ) + import os from dotenv import load_dotenv from uagents_adapter import SingleA2AAdapter @@ -35,8 +50,9 @@ "collaborate to produce comprehensive research reports on any topic." ), port=int(os.getenv("AGENT_PORT", "8008")), - agentverse_url=os.getenv("AGENTVERSE_URL", "https://agentverse.ai"), - mailbox_api_key=os.getenv("AGENTVERSE_API_KEY", ""), + a2a_port=int(os.getenv("A2A_PORT", "9999")), + mailbox=True, + seed=os.getenv("AGENT_SEED"), ) if __name__ == "__main__": diff --git a/ag2-agents/research-synthesis-team/tests/test_agents.py b/ag2-agents/research-synthesis-team/tests/test_agents.py index d08da60..d239c72 100644 --- a/ag2-agents/research-synthesis-team/tests/test_agents.py +++ b/ag2-agents/research-synthesis-team/tests/test_agents.py @@ -49,3 +49,35 @@ def test_llmconfig_positional_dict_construction(): llm_config=cfg, ) assert agent.name == "llm_config_test" + + +def test_all_modules_import(): + """Smoke test: every module imports without error.""" + import agents + import agent_executor + import workflow + import main + + +def test_duckduckgo_search_tool_instantiates(): + """DuckDuckGoSearchTool must be constructable with no arguments.""" + from autogen.tools.experimental import DuckDuckGoSearchTool + tool = DuckDuckGoSearchTool() + assert hasattr(tool, "register_for_llm") + assert hasattr(tool, "register_for_execution") + + +def test_adapter_construction(): + """SingleA2AAdapter accepts the params used in main.py.""" + from unittest.mock import MagicMock + from uagents_adapter import SingleA2AAdapter + adapter = SingleA2AAdapter( + agent_executor=MagicMock(), + name="test", + description="test", + port=8008, + a2a_port=9999, + mailbox=True, + seed="test-seed", + ) + assert adapter.name == "test" diff --git a/ag2-agents/research-synthesis-team/workflow.py b/ag2-agents/research-synthesis-team/workflow.py index 392b8ec..221d425 100644 --- a/ag2-agents/research-synthesis-team/workflow.py +++ b/ag2-agents/research-synthesis-team/workflow.py @@ -32,7 +32,7 @@ async def run_research(topic: str, llm_config: LLMConfig, mcp_url: str | None = result = await _run_groupchat(agents, executor, llm_config, topic) else: # Default: DuckDuckGo search — no API key required - search = DuckDuckGoSearchTool(num_results=5) + search = DuckDuckGoSearchTool() search.register_for_llm(agents[0]) # web_researcher can request searches search.register_for_execution(executor) # executor runs them result = await _run_groupchat(agents, executor, llm_config, topic)