Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1562,3 +1562,107 @@ def get_connector_builder_project_for_definition_id(
client_secret=client_secret,
)
return json_result.get("builderProjectId")


# Organization and workspace listing


def list_organizations_for_user(
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
) -> list[models.OrganizationResponse]:
"""List all organizations accessible to the current user.

Uses the public API endpoint: GET /organizations

Args:
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
List of OrganizationResponse objects containing organization_id, organization_name, email
"""
airbyte_instance = get_airbyte_server_instance(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)
response = airbyte_instance.organizations.list_organizations_for_user()

if status_ok(response.status_code) and response.organizations_response:
return response.organizations_response.data

raise AirbyteError(
message="Failed to list organizations for user.",
context={
"status_code": response.status_code,
"response": response,
},
)


def list_workspaces_in_organization(
organization_id: str,
*,
api_root: str,
client_id: SecretString,
client_secret: SecretString,
name_contains: str | None = None,
max_items_limit: int | None = None,
) -> list[dict[str, Any]]:
"""List workspaces within a specific organization.

Uses the Config API endpoint: POST /v1/workspaces/list_by_organization_id

Args:
organization_id: The organization ID to list workspaces for
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret
name_contains: Optional substring filter for workspace names (server-side)
max_items_limit: Optional maximum number of workspaces to return

Returns:
List of workspace dictionaries containing workspaceId, organizationId, name, slug, etc.
"""
result: list[dict[str, Any]] = []
page_size = 100
offset = 0

while True:
payload: dict[str, Any] = {
"organizationId": organization_id,
"pagination": {
"pageSize": page_size,
"rowOffset": offset,
},
}
if name_contains:
payload["nameContains"] = name_contains

json_result = _make_config_api_request(
path="/workspaces/list_by_organization_id",
json=payload,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

workspaces = json_result.get("workspaces", [])
result.extend(workspaces)

# Check if we've reached the limit or no more results
if max_items_limit is not None and len(result) >= max_items_limit:
result = result[:max_items_limit]
break

if len(workspaces) < page_size:
# No more pages
break

offset += page_size

return result
246 changes: 245 additions & 1 deletion airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from pydantic import BaseModel, Field

from airbyte import cloud, get_destination, get_source
from airbyte._util.api_util import PyAirbyteInputError
from airbyte._util import api_util
from airbyte.cloud.auth import (
resolve_cloud_api_url,
resolve_cloud_client_id,
Expand All @@ -18,6 +18,7 @@
from airbyte.cloud.connectors import CustomCloudSourceDefinition
from airbyte.cloud.workspaces import CloudWorkspace
from airbyte.destinations.util import get_noop_destination
from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError
from airbyte.mcp._tool_utils import (
check_guid_created_in_session,
mcp_tool,
Expand Down Expand Up @@ -121,6 +122,28 @@ class CloudConnectionDetails(BaseModel):
"""Table prefix applied when syncing to the destination."""


class CloudOrganizationResult(BaseModel):
"""Information about an organization in Airbyte Cloud."""

id: str
"""The organization ID."""
name: str
"""Display name of the organization."""
email: str
"""Email associated with the organization."""


class CloudWorkspaceResult(BaseModel):
"""Information about a workspace in Airbyte Cloud."""

id: str
"""The workspace ID."""
name: str
"""Display name of the workspace."""
organization_id: str
"""ID of the organization this workspace belongs to."""


def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace:
"""Get an authenticated CloudWorkspace.

Expand Down Expand Up @@ -852,6 +875,227 @@ def list_deployed_cloud_connections(
]


def _resolve_organization_id(
organization_id: str | None,
organization_name: str | None,
*,
api_root: str,
client_id: str,
client_secret: str,
) -> str:
"""Resolve organization ID from either ID or exact name match.

Args:
organization_id: The organization ID (if provided directly)
organization_name: The organization name (exact match required)
api_root: The API root URL
client_id: OAuth client ID
client_secret: OAuth client secret

Returns:
The resolved organization ID

Raises:
PyAirbyteInputError: If neither or both parameters are provided,
or if no organization matches the exact name
AirbyteMissingResourceError: If the organization is not found
"""
if organization_id and organization_name:
raise PyAirbyteInputError(
message="Provide either 'organization_id' or 'organization_name', not both."
)
if not organization_id and not organization_name:
raise PyAirbyteInputError(
message="Either 'organization_id' or 'organization_name' must be provided."
)

if organization_id:
return organization_id

# Look up organization by exact name match
orgs = api_util.list_organizations_for_user(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

# Find exact match (case-sensitive)
matching_orgs = [org for org in orgs if org.organization_name == organization_name]

if not matching_orgs:
raise AirbyteMissingResourceError(
resource_type="organization",
context={
"organization_name": organization_name,
"message": f"No organization found with exact name '{organization_name}' "
"for the current user.",
},
)

if len(matching_orgs) > 1:
raise PyAirbyteInputError(
message=f"Multiple organizations found with name '{organization_name}'. "
"Please use 'organization_id' instead to specify the exact organization."
)

return matching_orgs[0].organization_id


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def list_cloud_workspaces(
*,
organization_id: Annotated[
str | None,
Field(
description="Organization ID. Required if organization_name is not provided.",
default=None,
),
],
organization_name: Annotated[
str | None,
Field(
description=(
"Organization name (exact match). " "Required if organization_id is not provided."
),
default=None,
),
],
name_contains: Annotated[
str | None,
"Optional substring to filter workspaces by name (server-side filtering)",
] = None,
max_items_limit: Annotated[
int | None,
"Optional maximum number of items to return (default: no limit)",
] = None,
) -> list[CloudWorkspaceResult]:
"""List all workspaces in a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool will NOT list workspaces across all organizations - you must specify
which organization to list workspaces from.
"""
api_root = resolve_cloud_api_url()
client_id = resolve_cloud_client_id()
client_secret = resolve_cloud_client_secret()

resolved_org_id = _resolve_organization_id(
organization_id=organization_id,
organization_name=organization_name,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

workspaces = api_util.list_workspaces_in_organization(
organization_id=resolved_org_id,
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
name_contains=name_contains,
max_items_limit=max_items_limit,
)

return [
CloudWorkspaceResult(
id=ws.get("workspaceId", ""),
name=ws.get("name", ""),
organization_id=ws.get("organizationId", ""),
)
for ws in workspaces
]


@mcp_tool(
domain="cloud",
read_only=True,
idempotent=True,
open_world=True,
extra_help_text=CLOUD_AUTH_TIP_TEXT,
)
def describe_cloud_organization(
*,
organization_id: Annotated[
str | None,
Field(
description="Organization ID. Required if organization_name is not provided.",
default=None,
),
],
organization_name: Annotated[
str | None,
Field(
description=(
"Organization name (exact match). " "Required if organization_id is not provided."
),
default=None,
),
],
) -> CloudOrganizationResult:
"""Get details about a specific organization.

Requires either organization_id OR organization_name (exact match) to be provided.
This tool is useful for looking up an organization's ID from its name, or vice versa.
"""
api_root = resolve_cloud_api_url()
client_id = resolve_cloud_client_id()
client_secret = resolve_cloud_client_secret()

# Get all organizations for the user
orgs = api_util.list_organizations_for_user(
api_root=api_root,
client_id=client_id,
client_secret=client_secret,
)

# Find the matching organization
if organization_id:
matching_orgs = [org for org in orgs if org.organization_id == organization_id]
if not matching_orgs:
raise AirbyteMissingResourceError(
resource_type="organization",
context={
"organization_id": organization_id,
"message": f"No organization found with ID '{organization_id}' "
"for the current user.",
},
)
org = matching_orgs[0]
elif organization_name:
matching_orgs = [org for org in orgs if org.organization_name == organization_name]
if not matching_orgs:
raise AirbyteMissingResourceError(
resource_type="organization",
context={
"organization_name": organization_name,
"message": f"No organization found with exact name '{organization_name}' "
"for the current user.",
},
)
if len(matching_orgs) > 1:
raise PyAirbyteInputError(
message=f"Multiple organizations found with name '{organization_name}'. "
"Please use 'organization_id' instead to specify the exact organization."
)
org = matching_orgs[0]
else:
raise PyAirbyteInputError(
message="Either 'organization_id' or 'organization_name' must be provided."
)

return CloudOrganizationResult(
id=org.organization_id,
name=org.organization_name,
email=org.email,
)


def _get_custom_source_definition_description(
custom_source: CustomCloudSourceDefinition,
) -> str:
Expand Down
Loading