Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1562,3 +1562,113 @@ def get_connector_builder_project_for_definition_id(
client_secret=client_secret,
)
return json_result.get("builderProjectId")


# Organization and workspace listing


def list_organizations_for_user(
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> list[models.OrganizationResponse]:
"""List all organizations accessible to the current user.

Uses the public API endpoint: GET /organizations

Args:
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
List of OrganizationResponse objects containing organization_id, organization_name, email
"""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)
response = airbyte_instance.organizations.list_organizations_for_user()

if status_ok(response.status_code) and response.organizations_response:
return response.organizations_response.data

raise AirbyteError(
message="Failed to list organizations for user.",
context={
"status_code": response.status_code,
"response": response,
},
)


def list_workspaces_in_organization(
organization_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
name_contains: str | None = None,
max_items_limit: int | None = None,
) -> list[dict[str, Any]]:
"""List workspaces within a specific organization.

Uses the Config API endpoint: POST /v1/workspaces/list_by_organization_id

Args:
organization_id: The organization ID to list workspaces for
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret
name_contains: Optional substring filter for workspace names (server-side)
max_items_limit: Optional maximum number of workspaces to return

Returns:
List of workspace dictionaries containing workspaceId, organizationId, name, slug, etc.
"""
result: list[dict[str, Any]] = []
page_size = 100

# Build base payload
payload: dict[str, Any] = {
"organizationId": organization_id,
"pagination": {
"pageSize": page_size,
"rowOffset": 0,
},
}
if name_contains:
payload["nameContains"] = name_contains

# Fetch pages until we have all results or reach the limit
while True:
json_result = _make_config_api_request(
path="/workspaces/list_by_organization_id",
json=payload,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

workspaces = json_result.get("workspaces", [])

# If no results returned, we've exhausted all pages
if not workspaces:
break

result.extend(workspaces)

# Check if we've reached the limit
if max_items_limit is not None and len(result) >= max_items_limit:
return result[:max_items_limit]

# If we got fewer results than page_size, this was the last page
if len(workspaces) < page_size:
break

# Bump offset for next iteration
payload["pagination"]["rowOffset"] += page_size

return result
246 changes: 245 additions & 1 deletion airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import BaseModel, Field

from airbyte import cloud, get_destination, get_source
from airbyte._util.api_util import PyAirbyteInputError
from airbyte._util import api_util
from airbyte.cloud.auth import (
resolve_cloud_api_url,
resolve_cloud_client_id,
Expand All @@ -18,13 +18,15 @@
from airbyte.cloud.connectors import CustomCloudSourceDefinition
from airbyte.cloud.workspaces import CloudWorkspace
from airbyte.destinations.util import get_noop_destination
from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
from airbyte.mcp._tool_utils import (
check_guid_created_in_session,
mcp_tool,
register_guid_created_in_session,
register_tools,
)
from airbyte.mcp._util import resolve_config, resolve_list_of_strings
from airbyte.secrets import SecretString


CLOUD_AUTH_TIP_TEXT = (
Expand Down Expand Up @@ -121,6 +123,28 @@ class CloudConnectionDetails(BaseModel):
"""Table prefix applied when syncing to the destination."""


class CloudOrganizationResult(BaseModel):
"""Information about an organization in Airbyte Cloud."""

id: str
"""The organization ID."""
name: str
"""Display name of the organization."""
email: str
"""Email associated with the organization."""


class CloudWorkspaceResult(BaseModel):
"""Information about a workspace in Airbyte Cloud."""

id: str
"""The workspace ID."""
name: str
"""Display name of the workspace."""
organization_id: str
"""ID of the organization this workspace belongs to."""


def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
"""Get an authenticated CloudWorkspace.

Expand Down Expand Up @@ -852,6 +876,226 @@ def list_deployed_cloud_connections(
]


def _resolve_organization(
organization_id: str | None,
organization_name: str | None,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> api_util.models.OrganizationResponse:
"""Resolve organization from either ID or exact name match.

Args:
organization_id: The organization ID (if provided directly)
organization_name: The organization name (exact match required)
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
The resolved OrganizationResponse object

Raises:
PyAirbyteInputError: If neither or both parameters are provided,
or if no organization matches the exact name
AirbyteMissingResourceError: If the organization is not found
"""
if organization_id and organization_name:
raise PyAirbyteInputError(
message="Provide either 'organization_id' or 'organization_name', not both."
)
if not organization_id and not organization_name:
raise PyAirbyteInputError(
message="Either 'organization_id' or 'organization_name' must be provided."
)

# Get all organizations for the user
orgs = api_util.list_organizations_for_user(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

if organization_id:
# Find by ID
matching_orgs = [org for org in orgs if org.organization_id == organization_id]
if not matching_orgs:
raise AirbyteMissingResourceError(
resource_type="organization",
context={
"organization_id": organization_id,
"message": f"No organization found with ID '{organization_id}' "
"for the current user.",
},
)
return matching_orgs[0]

# Find by exact name match (case-sensitive)
matching_orgs = [org for org in orgs if org.organization_name == organization_name]

if not matching_orgs:
raise AirbyteMissingResourceError(
resource_type="organization",
context={
"organization_name": organization_name,
"message": f"No organization found with exact name '{organization_name}' "
"for the current user.",
},
)

if len(matching_orgs) > 1:
raise PyAirbyteInputError(
message=f"Multiple organizations found with name '{organization_name}'. "
"Please use 'organization_id' instead to specify the exact organization."
)

return matching_orgs[0]


def _resolve_organization_id(
organization_id: str | None,
organization_name: str | None,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> str:
"""Resolve organization ID from either ID or exact name match.

This is a convenience wrapper around _resolve_organization that returns just the ID.
"""
org = _resolve_organization(
organization_id=organization_id,
organization_name=organization_name,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)
return org.organization_id


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def list_cloud_workspaces(
*,
organization_id: Annotated[
str | None,
Field(
description="Organization ID. Required if organization_name is not provided.",
default=None,
),
],
organization_name: Annotated[
str | None,
Field(
description=(
"Organization name (exact match). " "Required if organization_id is not provided."
),
default=None,
),
],
name_contains: Annotated[
str | None,
"Optional substring to filter workspaces by name (server-side filtering)",
] = None,
max_items_limit: Annotated[
int | None,
"Optional maximum number of items to return (default: no limit)",
] = None,
) -> list[CloudWorkspaceResult]:
"""List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
"""
api_root = resolve_cloud_api_url()
client_id = resolve_cloud_client_id()
client_secret = resolve_cloud_client_secret()

resolved_org_id = _resolve_organization_id(
organization_id=organization_id,
organization_name=organization_name,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

workspaces = api_util.list_workspaces_in_organization(
organization_id=resolved_org_id,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
name_contains=name_contains,
max_items_limit=max_items_limit,
)

return [
CloudWorkspaceResult(
id=ws.get("workspaceId", ""),
name=ws.get("name", ""),
organization_id=ws.get("organizationId", ""),
)
for ws in workspaces
]


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def describe_cloud_organization(
*,
organization_id: Annotated[
str | None,
Field(
description="Organization ID. Required if organization_name is not provided.",
default=None,
),
],
organization_name: Annotated[
str | None,
Field(
description=(
"Organization name (exact match). " "Required if organization_id is not provided."
),
default=None,
),
],
) -> CloudOrganizationResult:
"""Get details about a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
"""
api_root = resolve_cloud_api_url()
client_id = resolve_cloud_client_id()
client_secret = resolve_cloud_client_secret()

org = _resolve_organization(
organization_id=organization_id,
organization_name=organization_name,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

return CloudOrganizationResult(
id=org.organization_id,
name=org.organization_name,
email=org.email,
)


def _get_custom_source_definition_description(
custom_source: CustomCloudSourceDefinition,
) -> str:
Expand Down
Loading