diff --git a/ag2-agents/payment-approval/.env.example b/ag2-agents/payment-approval/.env.example new file mode 100644 index 0000000..ccea3d7 --- /dev/null +++ b/ag2-agents/payment-approval/.env.example @@ -0,0 +1,8 @@ +OPENAI_API_KEY=your-openai-api-key-here + +# Optional +LLM_MODEL=gpt-4o-mini +OPENAI_BASE_URL=https://api.openai.com/v1 +AGENT_PORT=8009 +A2A_PORT=9998 +AGENT_SEED= diff --git a/ag2-agents/payment-approval/README.md b/ag2-agents/payment-approval/README.md new file mode 100644 index 0000000..4aa5b67 --- /dev/null +++ b/ag2-agents/payment-approval/README.md @@ -0,0 +1,46 @@ +# AG2 Payment Approval Agent + +![ag2](https://img.shields.io/badge/ag2-00ADD8) ![uagents](https://img.shields.io/badge/uagents-4A90E2) ![a2a](https://img.shields.io/badge/a2a-000000) ![innovationlab](https://img.shields.io/badge/innovationlab-3D8BD3) + +A two-agent payment approval workflow using [AG2](https://github.com/ag2ai/ag2) (formerly AutoGen) +integrated with the Fetch.ai uAgents ecosystem via the A2A protocol and payment protocol. + +## Architecture + +``` +Buyer agent / ASI:One / other uAgent + ↓ +SingleA2AAdapter (port 8009) → Agentverse + ↓ +PaymentApprovalExecutor (A2A AgentExecutor) + ↓ +AG2 Two-Agent Chat +├── researcher — investigates recipient, produces risk assessment +└── payment_executor — formats assessment, returns APPROVED/REJECTED verdict + ↓ +Payment Protocol (via adapter) +├── APPROVED → buyer receives RequestPayment → CommitPayment / RejectPayment +└── REJECTED → buyer receives assessment text explaining why +``` + +## Prerequisites + +- **Python 3.10–3.13** (uagents depends on Pydantic v1, which is incompatible with Python 3.14+) + +## Quick Start + +```bash +cd ag2-agents/payment-approval +pip install -r requirements.txt +cp .env.example .env # add OPENAI_API_KEY +python main.py +``` + +## AG2 Features Demonstrated + +- **Two-agent `initiate_chat`** — researcher hands off to executor via the natural + conversation flow; the shared message history carries the assessment +- **`A2A AgentExecutor`** — same integration pattern used by other examples in this repo +- **Payment protocol integration** — `SingleA2AAdapter` bridges the assessment result + into Fetch.ai's `AgentPaymentProtocol`, enabling buyer agents to commit or reject + payments via standard protocol messages diff --git a/ag2-agents/payment-approval/agent_executor.py b/ag2-agents/payment-approval/agent_executor.py new file mode 100644 index 0000000..46864ff --- /dev/null +++ b/ag2-agents/payment-approval/agent_executor.py @@ -0,0 +1,79 @@ +""" +Wraps the AG2 payment assessment workflow as an A2A AgentExecutor +for use with SingleA2AAdapter. +""" +import re + +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.types import Part, TextPart +from a2a.utils import new_agent_text_message +from autogen import LLMConfig +from typing_extensions import override + +from workflow import run_payment_assessment + +# Pattern: "50 USDC to alice.fetch — reason: 'research report delivery'" +_PAYMENT_RE = re.compile( + r"(?P[\d.]+)\s+(?P\w+)\s+to\s+(?P\S+)" + r"(?:\s*[—\-]+\s*reason:\s*['\"]?(?P[^'\"]+)['\"]?)?", + re.IGNORECASE, +) + + +def _parse_payment_request(text: str) -> tuple[str, float, str, str]: + """Extract (recipient, amount, reason, currency) from free-text message. + + Falls back to passing the entire message as the reason if parsing fails. + """ + m = _PAYMENT_RE.search(text) + if m: + return ( + m.group("recipient"), + float(m.group("amount")), + m.group("reason") or "not specified", + m.group("currency"), + ) + # Fallback: let the LLM agents figure it out + return ("unknown", 0.0, text, "USDC") + + +class PaymentApprovalExecutor(AgentExecutor): + """A2A-compatible executor wrapping the AG2 payment assessment workflow.""" + + def __init__(self, llm_config: LLMConfig): + self.llm_config = llm_config + + @override + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + message_content = "" + for part in context.message.parts: + if isinstance(part, Part): + if isinstance(part.root, TextPart): + message_content = part.root.text + break + + if not message_content: + await event_queue.enqueue_event( + new_agent_text_message("Error: No message content received.") + ) + return + + try: + recipient, amount, reason, currency = _parse_payment_request(message_content) + result = await run_payment_assessment( + recipient=recipient, + amount=amount, + reason=reason, + llm_config=self.llm_config, + currency=currency, + ) + await event_queue.enqueue_event(new_agent_text_message(result)) + except Exception as e: + await event_queue.enqueue_event( + new_agent_text_message(f"Payment assessment failed: {e}") + ) + + @override + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("Cancel not supported for this agent executor.") diff --git a/ag2-agents/payment-approval/agents.py b/ag2-agents/payment-approval/agents.py new file mode 100644 index 0000000..46a8289 --- /dev/null +++ b/ag2-agents/payment-approval/agents.py @@ -0,0 +1,36 @@ +""" +AG2 payment approval agents. + +researcher — investigates recipient, produces risk assessment +executor — formats assessment, decides proceed/reject (no human stdin in uAgent mode) +""" +from autogen import ConversableAgent, LLMConfig + + +def build_agents(llm_config: LLMConfig) -> tuple[ConversableAgent, ConversableAgent]: + researcher = ConversableAgent( + name="researcher", + system_message=( + "You are a payment risk analyst. Investigate the payment recipient using available " + "tools: check their Fetch.ai address history, reputation, and any known flags. " + "Produce a concise risk assessment with a clear recommendation (proceed / do not proceed). " + "End your assessment with ASSESSMENT COMPLETE." + ), + llm_config=llm_config, + ) + + executor = ConversableAgent( + name="payment_executor", + system_message=( + "You handle payment execution. Present the researcher's risk assessment clearly, " + "state the exact payment details (recipient, amount, reason), then give a final " + "verdict: APPROVED or REJECTED based on the risk assessment.\n\n" + "If approved, end with: PAYMENT APPROVED. TERMINATE\n" + "If rejected, end with: PAYMENT REJECTED. TERMINATE" + ), + llm_config=llm_config, + human_input_mode="NEVER", + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + + return researcher, executor diff --git a/ag2-agents/payment-approval/main.py b/ag2-agents/payment-approval/main.py new file mode 100644 index 0000000..b02a253 --- /dev/null +++ b/ag2-agents/payment-approval/main.py @@ -0,0 +1,57 @@ +""" +AG2 two-agent payment approval workflow, exposed via A2A protocol. + +On each request, two AG2 ConversableAgents (researcher → payment_executor) +are created in agents.py and orchestrated by workflow.py to produce a risk +assessment. The result is served through SingleA2AAdapter, making the AG2 +workflow discoverable on Agentverse and callable from ASI:One or other +agents in the ecosystem. + +Requires Python ≤3.13 (uagents depends on Pydantic v1, incompatible with 3.14+). +""" +import sys + +if sys.version_info >= (3, 14): + raise RuntimeError( + "uagents requires Python ≤3.13 (Pydantic v1 is incompatible with 3.14+). " + "Please use Python 3.10–3.13." + ) + +import os +from dotenv import load_dotenv +from uagents_adapter import SingleA2AAdapter +from autogen import LLMConfig + +from agent_executor import PaymentApprovalExecutor + +load_dotenv() + +llm_config = LLMConfig( + { + "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), + "api_key": os.getenv("OPENAI_API_KEY", ""), + "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), + }, + temperature=0.2, + cache_seed=None, +) + +executor = PaymentApprovalExecutor(llm_config=llm_config) + +adapter = SingleA2AAdapter( + agent_executor=executor, + name="AG2 Payment Approval Agent", + description=( + "Two-agent payment approval workflow using AG2 (formerly AutoGen). " + "A researcher investigates the recipient and a payment executor " + "produces a risk assessment with an APPROVED/REJECTED verdict. " + "Supports Fetch.ai payment protocol for commit/reject flows." + ), + port=int(os.getenv("AGENT_PORT", "8009")), + a2a_port=int(os.getenv("A2A_PORT", "9998")), + mailbox=True, + seed=os.getenv("AGENT_SEED"), +) + +if __name__ == "__main__": + adapter.run() diff --git a/ag2-agents/payment-approval/requirements.txt b/ag2-agents/payment-approval/requirements.txt new file mode 100644 index 0000000..7ac080f --- /dev/null +++ b/ag2-agents/payment-approval/requirements.txt @@ -0,0 +1,5 @@ +ag2[openai]>=0.11.0 +a2a-sdk +uagents>=0.20.0 +uagents-adapter>=0.4.0 +python-dotenv>=1.0.0 diff --git a/ag2-agents/payment-approval/tests/conftest.py b/ag2-agents/payment-approval/tests/conftest.py new file mode 100644 index 0000000..96f7b9c --- /dev/null +++ b/ag2-agents/payment-approval/tests/conftest.py @@ -0,0 +1,17 @@ +""" +Ensure payment-approval/ is on sys.path so local modules are importable. +""" +import sys +import os + +parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if parent not in sys.path: + sys.path.insert(0, parent) + +# Evict the installed 'agents' package (OpenAI Agents SDK) from sys.modules +# so 'from agents import build_agents' finds our local agents.py instead. +for key in list(sys.modules.keys()): + if key == "agents" or key.startswith("agents."): + mod = sys.modules[key] + if hasattr(mod, "__file__") and mod.__file__ and parent not in (mod.__file__ or ""): + del sys.modules[key] diff --git a/ag2-agents/payment-approval/tests/test_payment.py b/ag2-agents/payment-approval/tests/test_payment.py new file mode 100644 index 0000000..65acfa5 --- /dev/null +++ b/ag2-agents/payment-approval/tests/test_payment.py @@ -0,0 +1,178 @@ +"""Unit tests for payment-approval — no LLM calls, no network, no uAgents runtime.""" +import os +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +os.environ.setdefault("OPENAI_API_KEY", "test-key") + +from autogen import LLMConfig + +TEST_LLM = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + + +# ── Agent tests ────────────────────────────────────────────────────── + +def test_agents_instantiate(): + from agents import build_agents + researcher, executor = build_agents(TEST_LLM) + assert researcher.name == "researcher" + assert executor.name == "payment_executor" + + +def test_executor_terminates_on_keyword(): + from agents import build_agents + _, executor = build_agents(TEST_LLM) + assert executor._is_termination_msg({"content": "PAYMENT APPROVED. TERMINATE"}) is True + assert executor._is_termination_msg({"content": "PAYMENT REJECTED. TERMINATE"}) is True + assert executor._is_termination_msg({"content": "Still investigating."}) is False + + +def test_executor_human_input_mode_is_never(): + """In uAgent mode, human_input_mode is NEVER — payment protocol handles approval.""" + from agents import build_agents + _, executor = build_agents(TEST_LLM) + assert executor.human_input_mode == "NEVER" + + +def test_researcher_no_custom_termination(): + from agents import build_agents + researcher, _ = build_agents(TEST_LLM) + assert researcher._is_termination_msg({"content": "ASSESSMENT COMPLETE"}) is False + + +# ── Parser tests ───────────────────────────────────────────────────── + +def test_parse_payment_request_full(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Pay 50.5 USDC to alice.fetch — reason: 'research report delivery'" + ) + assert recipient == "alice.fetch" + assert amount == 50.5 + assert reason == "research report delivery" + assert currency == "USDC" + + +def test_parse_payment_request_no_reason(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Send 100 FET to bob.fetch" + ) + assert recipient == "bob.fetch" + assert amount == 100.0 + assert currency == "FET" + + +def test_parse_payment_request_fallback(): + from agent_executor import _parse_payment_request + recipient, amount, reason, currency = _parse_payment_request( + "Please investigate this payment" + ) + assert recipient == "unknown" + assert amount == 0.0 + assert reason == "Please investigate this payment" + + +# ── Executor tests ─────────────────────────────────────────────────── + +def _make_context(text: str): + text_part = MagicMock() + text_part.root = MagicMock() + text_part.root.text = text + part_cls = type(text_part) + text_part_cls = type(text_part.root) + message = MagicMock() + message.parts = [text_part] + ctx = MagicMock() + ctx.message = message + return ctx, part_cls, text_part_cls + + +@pytest.mark.asyncio +async def test_execute_calls_run_payment_assessment(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context( + "Pay 50 USDC to alice.fetch — reason: 'report'" + ) + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_payment_assessment", new_callable=AsyncMock, return_value="APPROVED") as mock_run, \ + patch("agent_executor.new_agent_text_message", return_value="event") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_run.assert_called_once_with( + recipient="alice.fetch", amount=50.0, reason="report", + llm_config=TEST_LLM, currency="USDC", + ) + mock_msg.assert_called_once_with("APPROVED") + event_queue.enqueue_event.assert_called_once_with("event") + + +@pytest.mark.asyncio +async def test_execute_empty_message(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + message = MagicMock() + message.parts = [] + ctx = MagicMock() + ctx.message = message + event_queue = AsyncMock() + + with patch("agent_executor.Part", MagicMock), \ + patch("agent_executor.TextPart", MagicMock), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Error: No message content received.") + + +@pytest.mark.asyncio +async def test_execute_handles_error(): + from agent_executor import PaymentApprovalExecutor + exec_ = PaymentApprovalExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context("Pay 10 USDC to fail.fetch") + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_payment_assessment", new_callable=AsyncMock, side_effect=RuntimeError("LLM timeout")), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await exec_.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Payment assessment failed: LLM timeout") + + +def test_executor_inherits_agent_executor(): + from agent_executor import PaymentApprovalExecutor + from a2a.server.agent_execution import AgentExecutor + assert issubclass(PaymentApprovalExecutor, AgentExecutor) + + +# ── Smoke tests ────────────────────────────────────────────────────── + +def test_all_modules_import(): + """Every module imports without error.""" + import agents + import agent_executor + import workflow + import main + + +def test_adapter_construction(): + """SingleA2AAdapter accepts the params used in main.py.""" + from uagents_adapter import SingleA2AAdapter + adapter = SingleA2AAdapter( + agent_executor=MagicMock(), + name="test", + description="test", + port=8009, + a2a_port=9998, + mailbox=True, + seed="test-seed", + ) + assert adapter.name == "test" diff --git a/ag2-agents/payment-approval/workflow.py b/ag2-agents/payment-approval/workflow.py new file mode 100644 index 0000000..312d414 --- /dev/null +++ b/ag2-agents/payment-approval/workflow.py @@ -0,0 +1,35 @@ +"""Async payment assessment workflow using AG2 two-agent chat.""" +from autogen import LLMConfig + +from agents import build_agents + + +async def run_payment_assessment( + recipient: str, + amount: float, + reason: str, + llm_config: LLMConfig, + currency: str = "USDC", +) -> str: + """Run researcher → executor assessment and return the final verdict. + + Returns the executor's last message containing the risk assessment + summary and APPROVED/REJECTED verdict. + """ + researcher, executor = build_agents(llm_config) + + await researcher.a_initiate_chat( + executor, + message=( + f"Payment request: {amount} {currency} to {recipient} — reason: '{reason}'. " + f"Investigate the recipient and produce a risk assessment." + ), + max_turns=6, + ) + + # Return the executor's final message (contains verdict) + for msg in reversed(executor.chat_messages.get(researcher, [])): + if msg.get("content"): + return msg["content"] + + return "Payment assessment did not complete." diff --git a/ag2-agents/research-synthesis-team/.env.example b/ag2-agents/research-synthesis-team/.env.example new file mode 100644 index 0000000..048eef1 --- /dev/null +++ b/ag2-agents/research-synthesis-team/.env.example @@ -0,0 +1,12 @@ +OPENAI_API_KEY=your-openai-api-key-here + +# Optional +LLM_MODEL=gpt-4o-mini +OPENAI_BASE_URL=https://api.openai.com/v1 +AGENT_PORT=8008 +A2A_PORT=9999 +AGENT_SEED= + +# Leave empty to use DuckDuckGo search (default, no API key needed). +# Set to a Fetch.ai MCP gateway URL to use MCP tools instead. +MCP_SERVER_URL= diff --git a/ag2-agents/research-synthesis-team/Dockerfile b/ag2-agents/research-synthesis-team/Dockerfile new file mode 100644 index 0000000..67da1c8 --- /dev/null +++ b/ag2-agents/research-synthesis-team/Dockerfile @@ -0,0 +1,14 @@ +FROM python:3.11-slim + +WORKDIR /app + +COPY . . + +RUN apt-get update && apt-get install -y gcc \ + && pip install --no-cache-dir --upgrade pip \ + && pip install --no-cache-dir -r requirements.txt \ + && apt-get clean && rm -rf /var/lib/apt/lists/* + +ENV PYTHONUNBUFFERED=1 +EXPOSE 8008 +CMD ["python", "main.py"] diff --git a/ag2-agents/research-synthesis-team/README.md b/ag2-agents/research-synthesis-team/README.md new file mode 100644 index 0000000..0aed46e --- /dev/null +++ b/ag2-agents/research-synthesis-team/README.md @@ -0,0 +1,48 @@ +# AG2 Research Synthesis Team + +![ag2](https://img.shields.io/badge/ag2-00ADD8) ![uagents](https://img.shields.io/badge/uagents-4A90E2) ![a2a](https://img.shields.io/badge/a2a-000000) ![innovationlab](https://img.shields.io/badge/innovationlab-3D8BD3) + +A multi-agent research pipeline using [AG2](https://github.com/ag2ai/ag2) (formerly AutoGen) +integrated with the Fetch.ai uAgents ecosystem via the A2A protocol. + +## Architecture + +Four specialists collaborate under GroupChat with LLM-driven speaker selection, wrapped as +an A2A executor and exposed as a discoverable agent on Agentverse. + +``` +User / ASI:One / other uAgent + ↓ +SingleA2AAdapter (port 8008) → Agentverse + ↓ +AG2ResearchExecutor (A2A AgentExecutor) + ↓ +GroupChat (AG2) +├── web_researcher — DuckDuckGo search, gathers sources +├── financial_analyst — market data, metrics, trends +├── tech_analyst — technical feasibility, risks +└── synthesizer — final structured report +``` + +## Prerequisites + +- **Python 3.10–3.13** (uagents depends on Pydantic v1, which is incompatible with Python 3.14+) + +## Quick Start + +```bash +pip install -r requirements.txt +cp .env.example .env # add OPENAI_API_KEY +python main.py +``` + +No additional API keys needed for search — DuckDuckGo is used by default. + +To use a Fetch.ai MCP gateway instead, set `MCP_SERVER_URL` in `.env`. + +## AG2 Features Demonstrated + +- **`GroupChat` with `speaker_selection_method="auto"`** — LLM-driven dynamic speaker selection +- **`DuckDuckGoSearchTool`** — built-in web search, no API key required +- **Native MCP client** — optional override via `MCP_SERVER_URL` for richer tool access +- **`A2A AgentExecutor`** — same integration pattern used by other examples in this repo diff --git a/ag2-agents/research-synthesis-team/agent_executor.py b/ag2-agents/research-synthesis-team/agent_executor.py new file mode 100644 index 0000000..2a39f79 --- /dev/null +++ b/ag2-agents/research-synthesis-team/agent_executor.py @@ -0,0 +1,47 @@ +""" +Wraps the AG2 GroupChat workflow as an A2A AgentExecutor +for use with SingleA2AAdapter (Pattern B). +""" +from a2a.server.agent_execution import AgentExecutor, RequestContext +from a2a.server.events import EventQueue +from a2a.types import Part, TextPart +from a2a.utils import new_agent_text_message +from autogen import LLMConfig +from typing_extensions import override + +from workflow import run_research + + +class AG2ResearchExecutor(AgentExecutor): + """A2A-compatible executor wrapping the AG2 GroupChat workflow.""" + + def __init__(self, llm_config: LLMConfig, mcp_url: str | None = None): + self.llm_config = llm_config + self.mcp_url = mcp_url + + @override + async def execute(self, context: RequestContext, event_queue: EventQueue) -> None: + message_content = "" + for part in context.message.parts: + if isinstance(part, Part): + if isinstance(part.root, TextPart): + message_content = part.root.text + break + + if not message_content: + await event_queue.enqueue_event( + new_agent_text_message("Error: No message content received.") + ) + return + + try: + result = await run_research(message_content, self.llm_config, self.mcp_url) + await event_queue.enqueue_event(new_agent_text_message(result)) + except Exception as e: + await event_queue.enqueue_event( + new_agent_text_message(f"Research failed: {e}") + ) + + @override + async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None: + raise Exception("Cancel not supported for this agent executor.") diff --git a/ag2-agents/research-synthesis-team/agents.py b/ag2-agents/research-synthesis-team/agents.py new file mode 100644 index 0000000..dc097a8 --- /dev/null +++ b/ag2-agents/research-synthesis-team/agents.py @@ -0,0 +1,84 @@ +""" +AG2 (formerly AutoGen) research synthesis team. +Four specialists collaborate under GroupChat with LLM-driven speaker selection. +""" +from autogen import AssistantAgent, LLMConfig + + +def build_agents(llm_config: LLMConfig) -> list[AssistantAgent]: + web_researcher = AssistantAgent( + name="web_researcher", + system_message=( + "You are a web research specialist with access to search tools.\n\n" + "WORKFLOW:\n" + "1. Use the search tool to find relevant sources on the assigned topic.\n" + "2. Run at least 2-3 different search queries to cover the topic broadly.\n" + "3. Compile your findings into a structured response.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a markdown table of sources found:\n" + " | # | Title | URL | Key Finding |\n" + "- Minimum 5 sources with direct links.\n" + "- Below the table, write a 200-word summary of the most important findings.\n" + "- Flag any conflicting information between sources." + ), + llm_config=llm_config, + ) + financial_analyst = AssistantAgent( + name="financial_analyst", + system_message=( + "You are a financial analyst. Analyse market data, trends, and economic " + "implications of the research topic using the sources provided by the " + "web researcher.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a financial summary table:\n" + " | Metric | Value | Source | Trend |\n" + "- Include: market size, growth rate, key players, funding, and revenue " + " figures where available. Use 'N/A' when data is unavailable.\n" + "- Write a 150-word analysis of financial risks and opportunities.\n" + "- Be quantitative — use numbers, percentages, and dollar amounts." + ), + llm_config=llm_config, + ) + tech_analyst = AssistantAgent( + name="tech_analyst", + system_message=( + "You are a technology analyst. Evaluate the technical landscape of the " + "research topic using the sources provided by the web researcher.\n\n" + "OUTPUT FORMAT (mandatory):\n" + "- Provide a technology assessment table:\n" + " | Technology/Component | Maturity | Adoption | Risk Level | Notes |\n" + "- Maturity values: Emerging, Growing, Mature, Declining.\n" + "- Risk Level: Low, Medium, High.\n" + "- Write a 150-word analysis covering: technical feasibility, innovation " + " potential, and key technical challenges.\n" + "- Identify the top 3 technical risks with mitigation strategies." + ), + llm_config=llm_config, + ) + synthesizer = AssistantAgent( + name="synthesizer", + system_message=( + "You are a synthesis expert. Once all specialists have contributed, " + "produce a final structured report combining all perspectives.\n\n" + "MANDATORY REPORT STRUCTURE:\n" + "## Executive Summary\n" + "3-5 bullet points covering the most important findings.\n\n" + "## Research Findings\n" + "Key sources and discoveries from the web researcher.\n\n" + "## Financial Analysis\n" + "Market data, financial metrics, and economic outlook.\n\n" + "## Technical Analysis\n" + "Technology landscape, maturity assessment, and risks.\n\n" + "## Conclusions & Recommendations\n" + "3-5 actionable recommendations ranked by priority.\n\n" + "## Sources\n" + "Consolidated list of all sources cited, as [Title](URL).\n\n" + "RULES:\n" + "- Do NOT repeat raw data from specialists — synthesize and add insight.\n" + "- Total report length: 500-800 words.\n" + "- End your response with TERMINATE." + ), + llm_config=llm_config, + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + return [web_researcher, financial_analyst, tech_analyst, synthesizer] diff --git a/ag2-agents/research-synthesis-team/main.py b/ag2-agents/research-synthesis-team/main.py new file mode 100644 index 0000000..5d7143a --- /dev/null +++ b/ag2-agents/research-synthesis-team/main.py @@ -0,0 +1,59 @@ +""" +AG2 multi-agent research team, exposed via A2A protocol. + +On each request, four AG2 AssistantAgents are created in agents.py and +orchestrated under GroupChat (workflow.py) with LLM-driven speaker selection +to produce structured research reports. The result is served through +SingleA2AAdapter, making the AG2 workflow discoverable on Agentverse and +callable from ASI:One or other agents in the ecosystem. + +Requires Python ≤3.13 (uagents depends on Pydantic v1, incompatible with 3.14+). +""" +import sys + +if sys.version_info >= (3, 14): + raise RuntimeError( + "uagents requires Python ≤3.13 (Pydantic v1 is incompatible with 3.14+). " + "Please use Python 3.10–3.13." + ) + +import os +from dotenv import load_dotenv +from uagents_adapter import SingleA2AAdapter +from autogen import LLMConfig + +from agent_executor import AG2ResearchExecutor + +load_dotenv() + +llm_config = LLMConfig( + { + "model": os.getenv("LLM_MODEL", "gpt-4o-mini"), + "api_key": os.getenv("OPENAI_API_KEY", ""), + "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), + }, + temperature=0.3, + cache_seed=None, +) + +executor = AG2ResearchExecutor( + llm_config=llm_config, + mcp_url=os.getenv("MCP_SERVER_URL"), # optional: Fetch.ai MCP gateway +) + +adapter = SingleA2AAdapter( + agent_executor=executor, + name="AG2 Research Synthesis Team", + description=( + "Multi-agent research team using AG2 (formerly AutoGen). " + "Four specialists (web researcher, financial analyst, tech analyst, synthesizer) " + "collaborate to produce comprehensive research reports on any topic." + ), + port=int(os.getenv("AGENT_PORT", "8008")), + a2a_port=int(os.getenv("A2A_PORT", "9999")), + mailbox=True, + seed=os.getenv("AGENT_SEED"), +) + +if __name__ == "__main__": + adapter.run() diff --git a/ag2-agents/research-synthesis-team/requirements.txt b/ag2-agents/research-synthesis-team/requirements.txt new file mode 100644 index 0000000..8ee46db --- /dev/null +++ b/ag2-agents/research-synthesis-team/requirements.txt @@ -0,0 +1,6 @@ +ag2[openai,mcp,duckduckgo]>=0.11.0 +a2a-sdk +mcp>=1.0.0 +uagents>=0.20.0 +uagents-adapter>=0.4.0 +python-dotenv>=1.0.0 diff --git a/ag2-agents/research-synthesis-team/tests/conftest.py b/ag2-agents/research-synthesis-team/tests/conftest.py new file mode 100644 index 0000000..37c39a6 --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/conftest.py @@ -0,0 +1,20 @@ +""" +Ensure the project root (research-synthesis-team/) is on sys.path so that +local modules (agents.py, agent_executor.py, workflow.py) are importable. +Also evict the installed 'agents' package (OpenAI Agents SDK) that would +otherwise shadow the local agents.py. +""" +import sys +import os + +parent = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +if parent not in sys.path: + sys.path.insert(0, parent) + +# Evict the installed 'agents' package from sys.modules cache so +# 'from agents import build_agents' finds our local agents.py instead. +for key in list(sys.modules.keys()): + if key == "agents" or key.startswith("agents."): + mod = sys.modules[key] + if hasattr(mod, "__file__") and mod.__file__ and parent not in (mod.__file__ or ""): + del sys.modules[key] diff --git a/ag2-agents/research-synthesis-team/tests/test_agent_executor.py b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py new file mode 100644 index 0000000..eef213f --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/test_agent_executor.py @@ -0,0 +1,84 @@ +"""Unit tests for AG2ResearchExecutor — no LLM calls, no network.""" +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from autogen import LLMConfig + +from agent_executor import AG2ResearchExecutor + +TEST_LLM = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + + +def _make_context(text: str): + """Build a minimal RequestContext-like object with a text message.""" + text_part = MagicMock() + text_part.root = MagicMock() + text_part.root.text = text + + part_cls = type(text_part) + text_part_cls = type(text_part.root) + + message = MagicMock() + message.parts = [text_part] + ctx = MagicMock() + ctx.message = message + return ctx, part_cls, text_part_cls + + +@pytest.mark.asyncio +async def test_execute_calls_run_research(): + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context("quantum computing") + event_queue = AsyncMock() + + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_research", new_callable=AsyncMock, return_value="Mock report") as mock_run, \ + patch("agent_executor.new_agent_text_message", return_value="event") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_run.assert_called_once_with("quantum computing", TEST_LLM, None) + mock_msg.assert_called_once_with("Mock report") + event_queue.enqueue_event.assert_called_once_with("event") + + +@pytest.mark.asyncio +async def test_execute_empty_message(): + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + message = MagicMock() + message.parts = [] + ctx = MagicMock() + ctx.message = message + event_queue = AsyncMock() + + with patch("agent_executor.Part", MagicMock), \ + patch("agent_executor.TextPart", MagicMock), \ + patch("agent_executor.new_agent_text_message", return_value="err_event") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Error: No message content received.") + event_queue.enqueue_event.assert_called_once_with("err_event") + + +@pytest.mark.asyncio +async def test_execute_handles_research_error(): + executor = AG2ResearchExecutor(llm_config=TEST_LLM) + ctx, part_cls, text_part_cls = _make_context("bad topic") + event_queue = AsyncMock() + + failing_research = AsyncMock(side_effect=RuntimeError("LLM timeout")) + with patch("agent_executor.Part", part_cls), \ + patch("agent_executor.TextPart", text_part_cls), \ + patch("agent_executor.run_research", failing_research), \ + patch("agent_executor.new_agent_text_message", return_value="err") as mock_msg: + await executor.execute(ctx, event_queue) + + mock_msg.assert_called_once_with("Research failed: LLM timeout") + event_queue.enqueue_event.assert_called_once_with("err") + + +def test_executor_inherits_agent_executor(): + """AG2ResearchExecutor must inherit from the A2A AgentExecutor base class.""" + from a2a.server.agent_execution import AgentExecutor + assert issubclass(AG2ResearchExecutor, AgentExecutor) diff --git a/ag2-agents/research-synthesis-team/tests/test_agents.py b/ag2-agents/research-synthesis-team/tests/test_agents.py new file mode 100644 index 0000000..d239c72 --- /dev/null +++ b/ag2-agents/research-synthesis-team/tests/test_agents.py @@ -0,0 +1,83 @@ +"""Unit tests — no LLM calls, no network, no uAgents runtime.""" +import pytest +import os +os.environ.setdefault("OPENAI_API_KEY", "test-key") +from autogen import LLMConfig + +TEST_LLM_CONFIG = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test", "base_url": "https://api.openai.com/v1"} +) + +def test_agents_instantiate(): + from agents import build_agents + agents = build_agents(TEST_LLM_CONFIG) + assert len(agents) == 4 + names = [a.name for a in agents] + assert "web_researcher" in names + assert "financial_analyst" in names + assert "tech_analyst" in names + assert "synthesizer" in names + +def test_synthesizer_termination(): + from agents import build_agents + agents = build_agents(TEST_LLM_CONFIG) + synthesizer = next(a for a in agents if a.name == "synthesizer") + assert synthesizer._is_termination_msg({"content": "Report done. TERMINATE"}) is True + assert synthesizer._is_termination_msg({"content": "Still analysing..."}) is False + +def test_executor_instantiates(): + from autogen import UserProxyAgent + executor = UserProxyAgent( + name="executor", human_input_mode="NEVER", + code_execution_config=False, + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + ) + assert executor.name == "executor" + + +def test_llmconfig_positional_dict_construction(): + """Validate the positional-dict LLMConfig used in main.py works with AG2 0.11+.""" + cfg = LLMConfig( + {"model": "gpt-4o-mini", "api_key": "test-key", "base_url": "https://api.openai.com/v1"}, + temperature=0.3, + cache_seed=None, + ) + from autogen import AssistantAgent + agent = AssistantAgent( + name="llm_config_test", + system_message="test", + llm_config=cfg, + ) + assert agent.name == "llm_config_test" + + +def test_all_modules_import(): + """Smoke test: every module imports without error.""" + import agents + import agent_executor + import workflow + import main + + +def test_duckduckgo_search_tool_instantiates(): + """DuckDuckGoSearchTool must be constructable with no arguments.""" + from autogen.tools.experimental import DuckDuckGoSearchTool + tool = DuckDuckGoSearchTool() + assert hasattr(tool, "register_for_llm") + assert hasattr(tool, "register_for_execution") + + +def test_adapter_construction(): + """SingleA2AAdapter accepts the params used in main.py.""" + from unittest.mock import MagicMock + from uagents_adapter import SingleA2AAdapter + adapter = SingleA2AAdapter( + agent_executor=MagicMock(), + name="test", + description="test", + port=8008, + a2a_port=9999, + mailbox=True, + seed="test-seed", + ) + assert adapter.name == "test" diff --git a/ag2-agents/research-synthesis-team/workflow.py b/ag2-agents/research-synthesis-team/workflow.py new file mode 100644 index 0000000..221d425 --- /dev/null +++ b/ag2-agents/research-synthesis-team/workflow.py @@ -0,0 +1,56 @@ +"""GroupChat orchestration with DuckDuckGo search (default) or MCP tools.""" +from autogen import GroupChat, GroupChatManager, LLMConfig, UserProxyAgent +from autogen.tools.experimental import DuckDuckGoSearchTool + +from agents import build_agents + + +async def run_research(topic: str, llm_config: LLMConfig, mcp_url: str | None = None) -> str: + agents = build_agents(llm_config) + executor = UserProxyAgent( + name="executor", + human_input_mode="NEVER", + code_execution_config=False, # tools use register_for_execution, not code exec + is_termination_msg=lambda m: "TERMINATE" in (m.get("content") or ""), + default_auto_reply="", + ) + + if mcp_url: + # Override: use MCP server tools (e.g. Fetch.ai's MCP gateway) + from autogen.mcp import create_toolkit + from mcp import ClientSession + from mcp.client.sse import sse_client + + async with ( + sse_client(mcp_url, timeout=60) as (read, write), + ClientSession(read, write) as session, + ): + await session.initialize() + toolkit = await create_toolkit(session=session) + toolkit.register_for_llm(agents[0]) + toolkit.register_for_execution(executor) + result = await _run_groupchat(agents, executor, llm_config, topic) + else: + # Default: DuckDuckGo search — no API key required + search = DuckDuckGoSearchTool() + search.register_for_llm(agents[0]) # web_researcher can request searches + search.register_for_execution(executor) # executor runs them + result = await _run_groupchat(agents, executor, llm_config, topic) + + return result + + +async def _run_groupchat(agents, executor, llm_config, topic): + gc = GroupChat( + agents=[executor] + agents, + messages=[], + max_round=16, + speaker_selection_method="auto", + ) + manager = GroupChatManager(groupchat=gc, llm_config=llm_config) + await executor.a_initiate_chat( + manager, + message=f"Research and synthesise a comprehensive report on: {topic}", + ) + reports = [m["content"] for m in gc.messages if m.get("name") == "synthesizer" and m.get("content")] + return reports[-1] if reports else "Research did not complete."