Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def destination_id(self) -> str:
if not self._connection_info:
self._connection_info = self._fetch_connection_info()

self._destination_id = self._connection_info.source_id
self._destination_id = self._connection_info.destination_id

return self._destination_id

Expand Down
83 changes: 74 additions & 9 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Annotated, Any

from fastmcp import FastMCP
from pydantic import Field
from pydantic import BaseModel, Field

from airbyte import cloud, get_destination, get_source
from airbyte.cloud.auth import (
Expand All @@ -14,8 +14,7 @@
resolve_cloud_client_secret,
resolve_cloud_workspace_id,
)
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.connectors import CloudDestination, CloudSource, CustomCloudSourceDefinition
from airbyte.cloud.connectors import CustomCloudSourceDefinition
from airbyte.cloud.workspaces import CloudWorkspace
from airbyte.destinations.util import get_noop_destination
from airbyte.mcp._tool_utils import (
Expand All @@ -27,6 +26,43 @@
from airbyte.mcp._util import resolve_config, resolve_list_of_strings


class CloudSourceResult(BaseModel):
"""Information about a deployed source connector in Airbyte Cloud."""

id: str
"""The source ID."""
name: str | None
"""Display name of the source, if set."""
url: str | None
"""Web URL for managing this source in Airbyte Cloud."""


class CloudDestinationResult(BaseModel):
"""Information about a deployed destination connector in Airbyte Cloud."""

id: str
"""The destination ID."""
name: str | None
"""Display name of the destination, if set."""
url: str | None
"""Web URL for managing this destination in Airbyte Cloud."""


class CloudConnectionResult(BaseModel):
"""Information about a deployed connection in Airbyte Cloud."""

id: str
"""The connection ID."""
name: str | None
"""Display name of the connection, if set."""
url: str | None
"""Web URL for managing this connection in Airbyte Cloud."""
source_id: str
"""ID of the source used by this connection."""
destination_id: str
"""ID of the destination used by this connection."""


def _get_cloud_workspace() -> CloudWorkspace:
"""Get an authenticated CloudWorkspace using environment variables."""
return CloudWorkspace(
Expand Down Expand Up @@ -444,15 +480,24 @@ def get_cloud_sync_status(
idempotent=True,
open_world=True,
)
def list_deployed_cloud_source_connectors() -> list[CloudSource]:
def list_deployed_cloud_source_connectors() -> list[CloudSourceResult]:
"""List all deployed source connectors in the Airbyte Cloud workspace.

By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
workspace: CloudWorkspace = _get_cloud_workspace()
return workspace.list_sources()
sources = workspace.list_sources()

return [
CloudSourceResult(
id=source.source_id,
name=source.name,
url=source.connector_url,
)
for source in sources
]


@mcp_tool(
Expand All @@ -461,15 +506,24 @@ def list_deployed_cloud_source_connectors() -> list[CloudSource]:
idempotent=True,
open_world=True,
)
def list_deployed_cloud_destination_connectors() -> list[CloudDestination]:
def list_deployed_cloud_destination_connectors() -> list[CloudDestinationResult]:
"""List all deployed destination connectors in the Airbyte Cloud workspace.

By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
workspace: CloudWorkspace = _get_cloud_workspace()
return workspace.list_destinations()
destinations = workspace.list_destinations()

return [
CloudDestinationResult(
id=destination.destination_id,
name=destination.name,
url=destination.connector_url,
)
for destination in destinations
]


@mcp_tool(
Expand Down Expand Up @@ -546,15 +600,26 @@ def get_cloud_sync_logs(
idempotent=True,
open_world=True,
)
def list_deployed_cloud_connections() -> list[CloudConnection]:
def list_deployed_cloud_connections() -> list[CloudConnectionResult]:
"""List all deployed connections in the Airbyte Cloud workspace.

By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`,
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
workspace: CloudWorkspace = _get_cloud_workspace()
return workspace.list_connections()
connections = workspace.list_connections()

return [
CloudConnectionResult(
id=connection.connection_id,
name=connection.name,
url=connection.connection_url,
source_id=connection.source_id,
destination_id=connection.destination_id,
)
for connection in connections
]


def _get_custom_source_definition_description(
Expand Down
Loading