diff --git a/airbyte/mcp/connector_registry.py b/airbyte/mcp/connector_registry.py index 6686cbe65..7dbedb341 100644 --- a/airbyte/mcp/connector_registry.py +++ b/airbyte/mcp/connector_registry.py @@ -12,15 +12,17 @@ from pydantic import BaseModel, Field from airbyte import exceptions as exc -from airbyte._executors.util import DEFAULT_MANIFEST_URL from airbyte._util.meta import is_docker_installed from airbyte.mcp._tool_utils import mcp_tool, register_tools from airbyte.mcp._util import resolve_list_of_strings from airbyte.registry import ( + _DEFAULT_MANIFEST_URL, + ApiDocsUrl, ConnectorMetadata, ConnectorVersionInfo, InstallType, get_available_connectors, + get_connector_api_docs_urls, get_connector_metadata, ) from airbyte.registry import get_connector_version_history as _get_connector_version_history @@ -159,7 +161,7 @@ def get_connector_info( connector.install() config_spec_jsonschema = connector.config_spec - manifest_url = DEFAULT_MANIFEST_URL.format( + manifest_url = _DEFAULT_MANIFEST_URL.format( source_name=connector_name, version="latest", ) @@ -173,6 +175,34 @@ def get_connector_info( ) +@mcp_tool( + domain="registry", + read_only=True, + idempotent=True, +) +def get_api_docs_urls( + connector_name: Annotated[ + str, + Field( + description=( + "The canonical connector name " + "(e.g., 'source-facebook-marketing', 'destination-snowflake')" + ) + ), + ], +) -> list[ApiDocsUrl] | Literal["Connector not found."]: + """Get API documentation URLs for a connector. + + This tool retrieves documentation URLs for a connector's upstream API from multiple sources: + - Registry metadata (documentationUrl, externalDocumentationUrls) + - Connector manifest.yaml file (data.externalDocumentationUrls) + """ + try: + return get_connector_api_docs_urls(connector_name) + except exc.AirbyteConnectorNotRegisteredError: + return "Connector not found." + + @mcp_tool( domain="registry", read_only=True, diff --git a/airbyte/registry.py b/airbyte/registry.py index 776f4107c..adff22755 100644 --- a/airbyte/registry.py +++ b/airbyte/registry.py @@ -10,10 +10,12 @@ from copy import copy from enum import Enum from pathlib import Path -from typing import cast +from typing import Any, cast import requests +import yaml from pydantic import BaseModel, Field +from typing_extensions import Self from airbyte import exceptions as exc from airbyte._registry_utils import fetch_registry_version_date, parse_changelog_html @@ -38,6 +40,10 @@ _PYTHON_LANGUAGE_TAG = f"language:{_PYTHON_LANGUAGE}" _MANIFEST_ONLY_TAG = f"language:{_MANIFEST_ONLY_LANGUAGE}" +_DEFAULT_MANIFEST_URL = ( + "https://connectors.airbyte.com/files/metadata/airbyte/{source_name}/{version}/manifest.yaml" +) + class InstallType(str, Enum): """The type of installation for a connector.""" @@ -294,6 +300,180 @@ class ConnectorVersionInfo(BaseModel): parsing_errors: list[str] = Field(default_factory=list) +class ApiDocsUrl(BaseModel): + """API documentation URL information.""" + + title: str + url: str + source: str + doc_type: str = Field(default="other", alias="type") + requires_login: bool = Field(default=False, alias="requiresLogin") + + model_config = {"populate_by_name": True} + + @classmethod + def from_manifest_dict(cls, manifest_data: dict[str, Any]) -> list[Self]: + """Extract documentation URLs from parsed manifest data. + + Args: + manifest_data: The parsed manifest.yaml data as a dictionary + + Returns: + List of ApiDocsUrl objects extracted from the manifest + """ + results: list[Self] = [] + + data_section = manifest_data.get("data") + if isinstance(data_section, dict): + external_docs = data_section.get("externalDocumentationUrls") + if isinstance(external_docs, list): + results = [ + cls( + title=doc["title"], + url=doc["url"], + source="data_external_docs", + doc_type=doc.get("type", "other"), + requires_login=doc.get("requiresLogin", False), + ) + for doc in external_docs + ] + + return results + + +def _manifest_url_for(connector_name: str) -> str: + """Get the expected URL of the manifest.yaml file for a connector. + + Args: + connector_name: The canonical connector name (e.g., "source-facebook-marketing") + + Returns: + The URL to the connector's manifest.yaml file + """ + return _DEFAULT_MANIFEST_URL.format( + source_name=connector_name, + version="latest", + ) + + +def _fetch_manifest_dict(url: str) -> dict[str, Any]: + """Fetch and parse a manifest.yaml file from a URL. + + Args: + url: The URL to fetch the manifest from + + Returns: + The parsed manifest data as a dictionary, or empty dict if manifest not found (404) + + Raises: + HTTPError: If the request fails with a non-404 status code + """ + http_not_found = 404 + + response = requests.get(url, timeout=10) + if response.status_code == http_not_found: + return {} + + response.raise_for_status() + return yaml.safe_load(response.text) or {} + + +def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]: + """Extract documentation URLs from connector registry metadata. + + Args: + connector_name: The canonical connector name (e.g., "source-facebook-marketing") + + Returns: + List of ApiDocsUrl objects extracted from the registry + """ + registry_url = _get_registry_url() + response = requests.get(registry_url, timeout=10) + response.raise_for_status() + registry_data = response.json() + + connector_list = registry_data.get("sources", []) + registry_data.get("destinations", []) + connector_entry = None + for entry in connector_list: + if entry.get("dockerRepository", "").endswith(f"/{connector_name}"): + connector_entry = entry + break + + docs_urls = [] + + if connector_entry and "documentationUrl" in connector_entry: + docs_urls.append( + ApiDocsUrl( + title="Airbyte Documentation", + url=connector_entry["documentationUrl"], + source="registry", + ) + ) + + if connector_entry and "externalDocumentationUrls" in connector_entry: + external_docs = connector_entry["externalDocumentationUrls"] + if isinstance(external_docs, list): + docs_urls.extend( + [ + ApiDocsUrl( + title=doc["title"], + url=doc["url"], + source="registry_external_docs", + doc_type=doc.get("type", "other"), + requires_login=doc.get("requiresLogin", False), + ) + for doc in external_docs + ] + ) + + return docs_urls + + +def get_connector_api_docs_urls(connector_name: str) -> list[ApiDocsUrl]: + """Get API documentation URLs for a connector. + + This function retrieves documentation URLs for a connector's upstream API from multiple sources: + - Registry metadata (documentationUrl, externalDocumentationUrls) + - Connector manifest.yaml file (data.externalDocumentationUrls) + + Args: + connector_name: The canonical connector name (e.g., "source-facebook-marketing") + + Returns: + List of ApiDocsUrl objects with documentation URLs, deduplicated by URL. + + Raises: + AirbyteConnectorNotRegisteredError: If the connector is not found in the registry. + """ + if connector_name not in get_available_connectors(InstallType.DOCKER): + raise exc.AirbyteConnectorNotRegisteredError( + connector_name=connector_name, + context={ + "registry_url": _get_registry_url(), + "available_connectors": get_available_connectors(InstallType.DOCKER), + }, + ) + + docs_urls: list[ApiDocsUrl] = [] + + registry_urls = _extract_docs_from_registry(connector_name) + docs_urls.extend(registry_urls) + + manifest_url = _manifest_url_for(connector_name) + manifest_data = _fetch_manifest_dict(manifest_url) + manifest_urls = ApiDocsUrl.from_manifest_dict(manifest_data) + docs_urls.extend(manifest_urls) + + seen_urls = set() + unique_docs_urls = [] + for doc_url in docs_urls: + if doc_url.url not in seen_urls: + seen_urls.add(doc_url.url) + unique_docs_urls.append(doc_url) + + return unique_docs_urls + + def get_connector_version_history( connector_name: str, *, diff --git a/tests/unit_tests/test_mcp_connector_registry.py b/tests/unit_tests/test_mcp_connector_registry.py new file mode 100644 index 000000000..5f70ea167 --- /dev/null +++ b/tests/unit_tests/test_mcp_connector_registry.py @@ -0,0 +1,163 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Unit tests for MCP connector registry tools.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from airbyte import exceptions as exc +from airbyte.mcp.connector_registry import get_api_docs_urls +from airbyte.registry import ( + ApiDocsUrl, + _fetch_manifest_dict, + _manifest_url_for, +) + + +class TestManifestUrlFor: + """Tests for _manifest_url_for function.""" + + def test_manifest_url_for(self) -> None: + """Test generating manifest URL for a connector.""" + url = _manifest_url_for("source-example") + assert "source-example" in url + assert "manifest.yaml" in url + assert "latest" in url + + +class TestFetchManifestDict: + """Tests for _fetch_manifest_dict function.""" + + def test_manifest_not_found(self) -> None: + """Test handling when manifest.yaml doesn't exist (404).""" + with patch("airbyte.registry.requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 404 + mock_get.return_value = mock_response + + manifest_dict = _fetch_manifest_dict("https://example.com/manifest.yaml") + assert manifest_dict == {} + + def test_fetch_manifest_dict(self) -> None: + """Test fetching and parsing manifest.yaml.""" + manifest_yaml = """ +version: 1.0.0 +type: DeclarativeSource +data: + name: Example +""" + with patch("airbyte.registry.requests.get") as mock_get: + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.text = manifest_yaml + mock_get.return_value = mock_response + + manifest_dict = _fetch_manifest_dict("https://example.com/manifest.yaml") + assert manifest_dict["version"] == "1.0.0" + assert manifest_dict["type"] == "DeclarativeSource" + assert manifest_dict["data"]["name"] == "Example" + + +class TestApiDocsUrlFromManifestDict: + """Tests for ApiDocsUrl.from_manifest_dict classmethod.""" + + def test_manifest_with_external_docs_urls(self) -> None: + """Test extracting URLs from data.externalDocumentationUrls field.""" + manifest_dict = { + "version": "1.0.0", + "type": "DeclarativeSource", + "data": { + "externalDocumentationUrls": [ + { + "title": "Versioning docs", + "url": "https://api.example.com/versioning", + "type": "api_reference", + }, + { + "title": "Changelog", + "url": "https://api.example.com/changelog", + "type": "api_release_history", + }, + { + "title": "Deprecated API calls", + "url": "https://api.example.com/deprecations", + "type": "api_deprecations", + "requiresLogin": True, + }, + ] + }, + } + + urls = ApiDocsUrl.from_manifest_dict(manifest_dict) + assert len(urls) == 3 + assert urls[0].title == "Versioning docs" + assert urls[0].url == "https://api.example.com/versioning" + assert urls[0].doc_type == "api_reference" + assert urls[0].requires_login is False + assert urls[1].title == "Changelog" + assert urls[1].doc_type == "api_release_history" + assert urls[2].title == "Deprecated API calls" + assert urls[2].doc_type == "api_deprecations" + assert urls[2].requires_login is True + + def test_manifest_with_external_docs_no_type(self) -> None: + """Test extracting URLs from data.externalDocumentationUrls without type field.""" + manifest_dict = { + "version": "1.0.0", + "type": "DeclarativeSource", + "data": { + "externalDocumentationUrls": [ + { + "title": "General docs", + "url": "https://api.example.com/docs", + } + ] + }, + } + + urls = ApiDocsUrl.from_manifest_dict(manifest_dict) + assert len(urls) == 1 + assert urls[0].title == "General docs" + assert urls[0].doc_type == "other" + assert urls[0].requires_login is False + + def test_empty_manifest(self) -> None: + """Test handling empty manifest dict.""" + urls = ApiDocsUrl.from_manifest_dict({}) + assert len(urls) == 0 + + +class TestGetApiDocsUrls: + """Tests for get_api_docs_urls function.""" + + def test_connector_not_found(self) -> None: + """Test handling when connector is not found.""" + with patch( + "airbyte.mcp.connector_registry.get_connector_api_docs_urls" + ) as mock_get_docs: + mock_get_docs.side_effect = exc.AirbyteConnectorNotRegisteredError( + connector_name="nonexistent-connector", + context={}, + ) + + result = get_api_docs_urls("nonexistent-connector") + assert result == "Connector not found." + + def test_deduplication_of_urls(self) -> None: + """Test that duplicate URLs are deduplicated.""" + with patch( + "airbyte.mcp.connector_registry.get_connector_api_docs_urls" + ) as mock_get_docs: + mock_get_docs.return_value = [ + ApiDocsUrl( + title="Airbyte Documentation", + url="https://docs.airbyte.com/integrations/sources/example", + source="registry", + ) + ] + + result = get_api_docs_urls("source-example") + + assert isinstance(result, list) + assert len(result) == 1 + assert result[0].title == "Airbyte Documentation"