Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions airbyte/mcp/connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
# Note: Deferred type evaluation must be avoided due to FastMCP/Pydantic needing
# types to be available at import time for tool registration.
import contextlib
import re
from typing import Annotated, Any, Literal

import requests
import yaml
from fastmcp import FastMCP
from pydantic import BaseModel, Field

Expand Down Expand Up @@ -161,6 +164,287 @@ def get_connector_info(
)


class ApiDocsUrl(BaseModel):
"""@private Class to hold API documentation URL information."""

title: str
url: str
source: str
doc_type: str = Field(default="other", alias="type")
requires_login: bool = Field(default=False, alias="requiresLogin")

model_config = {"populate_by_name": True}


class ApiDocsUrlsResult(BaseModel):
"""@private Class to hold API docs URLs result."""

connector_name: str
api_name: str | None = None
docs_urls: list[ApiDocsUrl]


def _resolve_connector_name(connector_identifier: str) -> str | None:
"""Resolve a connector identifier to a canonical connector name.

Args:
connector_identifier: Either a canonical connector name (e.g., "source-facebook-marketing")
or an API name (e.g., "Facebook Marketing API" or "Facebook Marketing")

Returns:
Canonical connector name if found, None otherwise.
"""
available_connectors = get_available_connectors()

if connector_identifier in available_connectors:
return connector_identifier

connector_identifier_lower = connector_identifier.lower()

search_term = re.sub(r"\s+(api|rest api)$", "", connector_identifier_lower, flags=re.IGNORECASE)

for connector_name in available_connectors:
metadata = None
with contextlib.suppress(Exception):
metadata = get_connector_metadata(connector_name)

if metadata:
pass

connector_name_clean = (
connector_name.replace("source-", "").replace("destination-", "").replace("-", " ")
)
if search_term in connector_name_clean or connector_name_clean in search_term:
return connector_name

return None


def _extract_urls_from_manifest_description(description: str) -> list[ApiDocsUrl]:
"""Extract URLs from manifest description field."""
urls = []

url_pattern = r"(API Reference|Documentation|Docs|API|Reference):\s*(https?://[^\s\n]+)"
matches = re.finditer(url_pattern, description, re.IGNORECASE)

for match in matches:
title = match.group(1)
url = match.group(2)
urls.append(
ApiDocsUrl(
title=f"{title} (from manifest description)", url=url, source="manifest_description"
)
)

standalone_url_pattern = r"https?://[^\s\n]+"
standalone_matches = re.finditer(standalone_url_pattern, description)

existing_urls = {u.url for u in urls}
for match in standalone_matches:
url = match.group(0)
if url not in existing_urls:
urls.append(
ApiDocsUrl(
title="API Documentation (from manifest)",
url=url,
source="manifest_description",
)
)
existing_urls.add(url)

return urls


def _extract_docs_from_manifest(manifest_data: dict) -> list[ApiDocsUrl]:
"""Extract documentation URLs from parsed manifest data."""
docs_urls = []

if manifest_data.get("description"):
docs_urls.extend(_extract_urls_from_manifest_description(manifest_data["description"]))

data_section = manifest_data.get("data")
if isinstance(data_section, dict):
external_docs = data_section.get("externalDocumentationUrls")
if isinstance(external_docs, list):
docs_urls.extend(
[
ApiDocsUrl(
title=doc["title"],
url=doc["url"],
source="data_external_docs",
doc_type=doc.get("type", "other"),
requires_login=doc.get("requiresLogin", False),
)
for doc in external_docs
if isinstance(doc, dict) and "title" in doc and "url" in doc
]
)

metadata = manifest_data.get("metadata")
if isinstance(metadata, dict):
assist = metadata.get("assist")
if isinstance(assist, dict) and "docsUrl" in assist:
docs_urls.append(
ApiDocsUrl(
title="API Documentation (assist)",
url=assist["docsUrl"],
source="manifest_assist",
)
)

api_docs = metadata.get("apiDocs")
if isinstance(api_docs, list):
docs_urls.extend(
[
ApiDocsUrl(title=doc["title"], url=doc["url"], source="manifest_api_docs")
for doc in api_docs
if isinstance(doc, dict) and "title" in doc and "url" in doc
]
)

return docs_urls


def _fetch_manifest_docs_urls(connector_name: str) -> list[ApiDocsUrl]:
"""Fetch documentation URLs from connector manifest.yaml file."""
manifest_url = DEFAULT_MANIFEST_URL.format(
source_name=connector_name,
version="latest",
)

http_not_found = 404

try:
response = requests.get(manifest_url, timeout=10)
if response.status_code == http_not_found:
return []

response.raise_for_status()
manifest_data = yaml.safe_load(response.text)

return _extract_docs_from_manifest(manifest_data)

except Exception:
return []


def _extract_docs_from_registry(connector_name: str) -> list[ApiDocsUrl]:
"""Extract documentation URLs from connector registry metadata.

Args:
connector_name: The canonical connector name (e.g., "source-facebook-marketing")

Returns:
List of ApiDocsUrl objects extracted from the registry
"""
docs_urls = []

try:
registry_url = "https://connectors.airbyte.com/files/registries/v0/oss_registry.json"
response = requests.get(registry_url, timeout=10)
response.raise_for_status()
registry_data = response.json()

connector_list = registry_data.get("sources", []) + registry_data.get("destinations", [])
connector_entry = None
for entry in connector_list:
if entry.get("dockerRepository", "").endswith(f"/{connector_name}"):
connector_entry = entry
break

if connector_entry and "externalDocumentationUrls" in connector_entry:
external_docs = connector_entry["externalDocumentationUrls"]
if isinstance(external_docs, list):
docs_urls.extend(
[
ApiDocsUrl(
title=doc["title"],
url=doc["url"],
source="registry_external_docs",
doc_type=doc.get("type", "other"),
requires_login=doc.get("requiresLogin", False),
)
for doc in external_docs
if isinstance(doc, dict) and "title" in doc and "url" in doc
]
)
except Exception:
pass

return docs_urls


@mcp_tool(
domain="registry",
read_only=True,
idempotent=True,
)
def get_api_docs_urls(
connector_identifier: Annotated[
str,
Field(
description=(
"The connector identifier. Can be either:\n"
"- A canonical connector name (e.g., 'source-facebook-marketing')\n"
"- An API name (e.g., 'Facebook Marketing API' or 'Facebook Marketing')"
)
),
],
) -> ApiDocsUrlsResult | Literal["Connector not found."]:
"""Get API documentation URLs for a connector.

This tool retrieves documentation URLs for a connector's upstream API from multiple sources:
- Registry metadata (documentationUrl, externalDocumentationUrls)
- Connector manifest.yaml file (data.externalDocumentationUrls, metadata.assist.docsUrl,
metadata.apiDocs)

The tool accepts either a canonical connector ID (e.g., "source-facebook-marketing") or
an API name (e.g., "Facebook Marketing API" or "Facebook Marketing").

Returns:
ApiDocsUrlsResult with connector name and list of documentation URLs, or error message.
"""
connector_name = _resolve_connector_name(connector_identifier)

if not connector_name:
return "Connector not found."

docs_urls: list[ApiDocsUrl] = []
api_name: str | None = None

connector = None
with contextlib.suppress(Exception):
connector = get_source(
connector_name,
docker_image=is_docker_installed() or False,
install_if_missing=False,
)

if connector and connector.docs_url:
docs_urls.append(
ApiDocsUrl(title="Airbyte Documentation", url=connector.docs_url, source="registry")
)

registry_urls = _extract_docs_from_registry(connector_name)
docs_urls.extend(registry_urls)

manifest_urls = _fetch_manifest_docs_urls(connector_name)
docs_urls.extend(manifest_urls)

seen_urls = set()
unique_docs_urls = []
for doc_url in docs_urls:
if doc_url.url not in seen_urls:
seen_urls.add(doc_url.url)
unique_docs_urls.append(doc_url)

return ApiDocsUrlsResult(
connector_name=connector_name,
api_name=api_name,
docs_urls=unique_docs_urls,
)


def register_connector_registry_tools(app: FastMCP) -> None:
"""@private Register tools with the FastMCP app.

Expand Down
Loading