Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 185 additions & 0 deletions airbyte/mcp/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import contextlib
from typing import Annotated, Any, Literal

import requests
import yaml
from fastmcp import FastMCP
from pydantic import BaseModel, Field
from typing_extensions import Self

from airbyte._executors.util import DEFAULT_MANIFEST_URL
from airbyte._util.meta import is_docker_installed
Expand Down Expand Up @@ -161,6 +164,188 @@ def get_connector_info(
)


def _manifest_url_for(connector_name: str) -> str:
"""Get the expected URL of the manifest.yaml file for a connector.

Args:
connector_name: The canonical connector name (e.g., "source-facebook-marketing")

Returns:
The URL to the connector's manifest.yaml file
"""
return DEFAULT_MANIFEST_URL.format(
source_name=connector_name,
version="latest",
)


def _fetch_manifest_dict(url: str) -> dict[str, Any]:
"""Fetch and parse a manifest.yaml file from a URL.

Args:
url: The URL to fetch the manifest from

Returns:
The parsed manifest data as a dictionary, or empty dict if manifest not found (404)

Raises:
HTTPError: If the request fails with a non-404 status code
"""
http_not_found = 404

response = requests.get(url, timeout=10)
if response.status_code == http_not_found:
return {}

response.raise_for_status()
return yaml.safe_load(response.text) or {}


class ApiDocsUrl(BaseModel):
"""@private Class to hold API documentation URL information."""

title: str
url: str
source: str
doc_type: str = Field(default="other", alias="type")
requires_login: bool = Field(default=False, alias="requiresLogin")

model_config = {"populate_by_name": True}

@classmethod
def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]:
"""Extract documentation URLs from parsed manifest data.

Args:
manifest_data: The parsed manifest.yaml data as a dictionary

Returns:
List of ApiDocsUrl objects extracted from the manifest
"""
results: list[Self] = []

data_section = manifest_data.get("data")
if isinstance(data_section, dict):
external_docs = data_section.get("externalDocumentationUrls")
if isinstance(external_docs, list):
results = [
cls(
title=doc["title"],
url=doc["url"],
source="data_external_docs",
doc_type=doc.get("type", "other"),
requires_login=doc.get("requiresLogin", False),
)
for doc in external_docs
]

return results


def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]:
"""Extract documentation URLs from connector registry metadata.

Args:
connector_name: The canonical connector name (e.g., "source-facebook-marketing")

Returns:
List of ApiDocsUrl objects extracted from the registry
"""
registry_url = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
response = requests.get(registry_url, timeout=10)
response.raise_for_status()
registry_data = response.json()

connector_list = registry_data.get("sources", []) + registry_data.get("destinations", [])
connector_entry = None
for entry in connector_list:
if entry.get("dockerRepository", "").endswith(f"/{connector_name}"):
connector_entry = entry
break

docs_urls = []
if connector_entry and "externalDocumentationUrls" in connector_entry:
external_docs = connector_entry["externalDocumentationUrls"]
if isinstance(external_docs, list):
docs_urls.extend(
[
ApiDocsUrl(
title=doc["title"],
url=doc["url"],
source="registry_external_docs",
doc_type=doc.get("type", "other"),
requires_login=doc.get("requiresLogin", False),
)
for doc in external_docs
]
)

return docs_urls


@mcp_tool(
domain="registry",
read_only=True,
idempotent=True,
)
def get_api_docs_urls(
connector_name: Annotated[
str,
Field(
description=(
"The canonical connector name "
"(e.g., 'source-facebook-marketing', 'destination-snowflake')"
)
),
],
) -> list[ApiDocsUrl] | Literal["Connector not found."]:
"""Get API documentation URLs for a connector.

This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls)

Returns:
List of ApiDocsUrl objects with documentation URLs, or error message if connector not found.
"""
available_connectors = get_available_connectors()

if connector_name not in available_connectors:
return "Connector not found."

docs_urls: list[ApiDocsUrl] = []

connector = None
with contextlib.suppress(Exception):
connector = get_source(
connector_name,
docker_image=is_docker_installed() or False,
install_if_missing=False,
)

if connector and connector.docs_url:
docs_urls.append(
ApiDocsUrl(title="Airbyte Documentation", url=connector.docs_url, source="registry")
)

registry_urls = _extract_docs_from_registry(connector_name)
docs_urls.extend(registry_urls)

manifest_url = _manifest_url_for(connector_name)
manifest_data = _fetch_manifest_dict(manifest_url)
manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data)
docs_urls.extend(manifest_urls)

seen_urls = set()
unique_docs_urls = []
for doc_url in docs_urls:
if doc_url.url not in seen_urls:
seen_urls.add(doc_url.url)
unique_docs_urls.append(doc_url)

return unique_docs_urls


def register_connector_registry_tools(app: FastMCP) -> None:
"""@private Register tools with the FastMCP app.

Expand Down
181 changes: 181 additions & 0 deletions tests/unit_tests/test_mcp_connector_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Unit tests for MCP connector registry tools."""

from __future__ import annotations

from unittest.mock import MagicMock, patch


from airbyte.mcp.connector_registry import (
ApiDocsUrl,
_fetch_manifest_dict,
_manifest_url_for,
get_api_docs_urls,
)


class TestManifestUrlFor:
"""Tests for _manifest_url_for function."""

def test_manifest_url_for(self) -> None:
"""Test generating manifest URL for a connector."""
url = _manifest_url_for("source-example")
assert "source-example" in url
assert "manifest.yaml" in url
assert "latest" in url


class TestFetchManifestDict:
"""Tests for _fetch_manifest_dict function."""

def test_manifest_not_found(self) -> None:
"""Test handling when manifest.yaml doesn't exist (404)."""
with patch("airbyte.mcp.connector_registry.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.status_code = 404
mock_get.return_value = mock_response

manifest_dict = _fetch_manifest_dict("https://example.com/manifest.yaml")
assert manifest_dict == {}

def test_fetch_manifest_dict(self) -> None:
"""Test fetching and parsing manifest.yaml."""
manifest_yaml = """
version: 1.0.0
type: DeclarativeSource
data:
name: Example
"""
with patch("airbyte.mcp.connector_registry.requests.get") as mock_get:
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.text = manifest_yaml
mock_get.return_value = mock_response

manifest_dict = _fetch_manifest_dict("https://example.com/manifest.yaml")
assert manifest_dict["version"] == "1.0.0"
assert manifest_dict["type"] == "DeclarativeSource"
assert manifest_dict["data"]["name"] == "Example"


class TestApiDocsUrlFromManifestDict:
"""Tests for ApiDocsUrl.from_manifest_dict classmethod."""

def test_manifest_with_external_docs_urls(self) -> None:
"""Test extracting URLs from data.externalDocumentationUrls field."""
manifest_dict = {
"version": "1.0.0",
"type": "DeclarativeSource",
"data": {
"externalDocumentationUrls": [
{
"title": "Versioning docs",
"url": "https://api.example.com/versioning",
"type": "api_reference",
},
{
"title": "Changelog",
"url": "https://api.example.com/changelog",
"type": "api_release_history",
},
{
"title": "Deprecated API calls",
"url": "https://api.example.com/deprecations",
"type": "api_deprecations",
"requiresLogin": True,
},
]
},
}

urls = ApiDocsUrl.from_manifest_dict(manifest_dict)
assert len(urls) == 3
assert urls[0].title == "Versioning docs"
assert urls[0].url == "https://api.example.com/versioning"
assert urls[0].doc_type == "api_reference"
assert urls[0].requires_login is False
assert urls[1].title == "Changelog"
assert urls[1].doc_type == "api_release_history"
assert urls[2].title == "Deprecated API calls"
assert urls[2].doc_type == "api_deprecations"
assert urls[2].requires_login is True

def test_manifest_with_external_docs_no_type(self) -> None:
"""Test extracting URLs from data.externalDocumentationUrls without type field."""
manifest_dict = {
"version": "1.0.0",
"type": "DeclarativeSource",
"data": {
"externalDocumentationUrls": [
{
"title": "General docs",
"url": "https://api.example.com/docs",
}
]
},
}

urls = ApiDocsUrl.from_manifest_dict(manifest_dict)
assert len(urls) == 1
assert urls[0].title == "General docs"
assert urls[0].doc_type == "other"
assert urls[0].requires_login is False

def test_empty_manifest(self) -> None:
"""Test handling empty manifest dict."""
urls = ApiDocsUrl.from_manifest_dict({})
assert len(urls) == 0


class TestGetApiDocsUrls:
"""Tests for get_api_docs_urls function."""

def test_connector_not_found(self) -> None:
"""Test handling when connector is not found."""
with patch(
"airbyte.mcp.connector_registry.get_available_connectors"
) as mock_get:
mock_get.return_value = ["source-faker", "source-facebook-marketing"]

result = get_api_docs_urls("nonexistent-connector")
assert result == "Connector not found."

def test_deduplication_of_urls(self) -> None:
"""Test that duplicate URLs are deduplicated."""
with (
patch(
"airbyte.mcp.connector_registry.get_available_connectors"
) as mock_get,
patch("airbyte.mcp.connector_registry.get_source") as mock_source,
patch(
"airbyte.mcp.connector_registry._fetch_manifest_dict"
) as mock_fetch_dict,
patch(
"airbyte.mcp.connector_registry._extract_docs_from_registry"
) as mock_registry,
):
mock_get.return_value = ["source-example", "source-faker"]

mock_connector = MagicMock()
mock_connector.docs_url = (
"https://docs.airbyte.com/integrations/sources/example"
)
mock_source.return_value = mock_connector

mock_registry.return_value = []

mock_fetch_dict.return_value = {
"data": {
"externalDocumentationUrls": [
{
"title": "Airbyte Documentation",
"url": "https://docs.airbyte.com/integrations/sources/example",
}
]
}
}

result = get_api_docs_urls("source-example")

assert isinstance(result, list)
assert len(result) == 1