diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 314a87d85..3355b00f7 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1562,3 +1562,113 @@ def get_connector_builder_project_for_definition_id( client_secret=client_secret, ) return json_result.get("builderProjectId") + + +# Organization and workspace listing + + +def list_organizations_for_user( + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> list[models.OrganizationResponse]: + """List all organizations accessible to the current user. + + Uses the public API endpoint: GET /organizations + + Args: + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + List of OrganizationResponse objects containing organization_id, organization_name, email + """ + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + response = airbyte_instance.organizations.list_organizations_for_user() + + if status_ok(response.status_code) and response.organizations_response: + return response.organizations_response.data + + raise AirbyteError( + message="Failed to list organizations for user.", + context={ + "status_code": response.status_code, + "response": response, + }, + ) + + +def list_workspaces_in_organization( + organization_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, + name_contains: str | None = None, + max_items_limit: int | None = None, +) -> list[dict[str, Any]]: + """List workspaces within a specific organization. + + Uses the Config API endpoint: POST /v1/workspaces/list_by_organization_id + + Args: + organization_id: The organization ID to list workspaces for + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + name_contains: Optional substring filter for workspace names (server-side) + max_items_limit: Optional maximum number of workspaces to return + + Returns: + List of workspace dictionaries containing workspaceId, organizationId, name, slug, etc. + """ + result: list[dict[str, Any]] = [] + page_size = 100 + + # Build base payload + payload: dict[str, Any] = { + "organizationId": organization_id, + "pagination": { + "pageSize": page_size, + "rowOffset": 0, + }, + } + if name_contains: + payload["nameContains"] = name_contains + + # Fetch pages until we have all results or reach the limit + while True: + json_result = _make_config_api_request( + path="/workspaces/list_by_organization_id", + json=payload, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + workspaces = json_result.get("workspaces", []) + + # If no results returned, we've exhausted all pages + if not workspaces: + break + + result.extend(workspaces) + + # Check if we've reached the limit + if max_items_limit is not None and len(result) >= max_items_limit: + return result[:max_items_limit] + + # If we got fewer results than page_size, this was the last page + if len(workspaces) < page_size: + break + + # Bump offset for next iteration + payload["pagination"]["rowOffset"] += page_size + + return result diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index e1016b5aa..e90ea825c 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -8,7 +8,7 @@ from pydantic import BaseModel, Field from airbyte import cloud, get_destination, get_source -from airbyte._util.api_util import PyAirbyteInputError +from airbyte._util import api_util from airbyte.cloud.auth import ( resolve_cloud_api_url, resolve_cloud_client_id, @@ -18,6 +18,7 @@ from airbyte.cloud.connectors import CustomCloudSourceDefinition from airbyte.cloud.workspaces import CloudWorkspace from airbyte.destinations.util import get_noop_destination +from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError from airbyte.mcp._tool_utils import ( check_guid_created_in_session, mcp_tool, @@ -25,6 +26,7 @@ register_tools, ) from airbyte.mcp._util import resolve_config, resolve_list_of_strings +from airbyte.secrets import SecretString CLOUD_AUTH_TIP_TEXT = ( @@ -121,6 +123,28 @@ class CloudConnectionDetails(BaseModel): """Table prefix applied when syncing to the destination.""" +class CloudOrganizationResult(BaseModel): + """Information about an organization in Airbyte Cloud.""" + + id: str + """The organization ID.""" + name: str + """Display name of the organization.""" + email: str + """Email associated with the organization.""" + + +class CloudWorkspaceResult(BaseModel): + """Information about a workspace in Airbyte Cloud.""" + + id: str + """The workspace ID.""" + name: str + """Display name of the workspace.""" + organization_id: str + """ID of the organization this workspace belongs to.""" + + def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: """Get an authenticated CloudWorkspace. @@ -852,6 +876,226 @@ def list_deployed_cloud_connections( ] +def _resolve_organization( + organization_id: str | None, + organization_name: str | None, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> api_util.models.OrganizationResponse: + """Resolve organization from either ID or exact name match. + + Args: + organization_id: The organization ID (if provided directly) + organization_name: The organization name (exact match required) + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The resolved OrganizationResponse object + + Raises: + PyAirbyteInputError: If neither or both parameters are provided, + or if no organization matches the exact name + AirbyteMissingResourceError: If the organization is not found + """ + if organization_id and organization_name: + raise PyAirbyteInputError( + message="Provide either 'organization_id' or 'organization_name', not both." + ) + if not organization_id and not organization_name: + raise PyAirbyteInputError( + message="Either 'organization_id' or 'organization_name' must be provided." + ) + + # Get all organizations for the user + orgs = api_util.list_organizations_for_user( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + if organization_id: + # Find by ID + matching_orgs = [org for org in orgs if org.organization_id == organization_id] + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_id": organization_id, + "message": f"No organization found with ID '{organization_id}' " + "for the current user.", + }, + ) + return matching_orgs[0] + + # Find by exact name match (case-sensitive) + matching_orgs = [org for org in orgs if org.organization_name == organization_name] + + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_name": organization_name, + "message": f"No organization found with exact name '{organization_name}' " + "for the current user.", + }, + ) + + if len(matching_orgs) > 1: + raise PyAirbyteInputError( + message=f"Multiple organizations found with name '{organization_name}'. " + "Please use 'organization_id' instead to specify the exact organization." + ) + + return matching_orgs[0] + + +def _resolve_organization_id( + organization_id: str | None, + organization_name: str | None, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str: + """Resolve organization ID from either ID or exact name match. + + This is a convenience wrapper around _resolve_organization that returns just the ID. + """ + org = _resolve_organization( + organization_id=organization_id, + organization_name=organization_name, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + return org.organization_id + + +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def list_cloud_workspaces( + *, + organization_id: Annotated[ + str | None, + Field( + description="Organization ID. Required if organization_name is not provided.", + default=None, + ), + ], + organization_name: Annotated[ + str | None, + Field( + description=( + "Organization name (exact match). " "Required if organization_id is not provided." + ), + default=None, + ), + ], + name_contains: Annotated[ + str | None, + "Optional substring to filter workspaces by name (server-side filtering)", + ] = None, + max_items_limit: Annotated[ + int | None, + "Optional maximum number of items to return (default: no limit)", + ] = None, +) -> list[CloudWorkspaceResult]: + """List all workspaces in a specific organization. + + Requires either organization_id OR organization_name (exact match) to be provided. + This tool will NOT list workspaces across all organizations - you must specify + which organization to list workspaces from. + """ + api_root = resolve_cloud_api_url() + client_id = resolve_cloud_client_id() + client_secret = resolve_cloud_client_secret() + + resolved_org_id = _resolve_organization_id( + organization_id=organization_id, + organization_name=organization_name, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + workspaces = api_util.list_workspaces_in_organization( + organization_id=resolved_org_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + name_contains=name_contains, + max_items_limit=max_items_limit, + ) + + return [ + CloudWorkspaceResult( + id=ws.get("workspaceId", ""), + name=ws.get("name", ""), + organization_id=ws.get("organizationId", ""), + ) + for ws in workspaces + ] + + +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def describe_cloud_organization( + *, + organization_id: Annotated[ + str | None, + Field( + description="Organization ID. Required if organization_name is not provided.", + default=None, + ), + ], + organization_name: Annotated[ + str | None, + Field( + description=( + "Organization name (exact match). " "Required if organization_id is not provided." + ), + default=None, + ), + ], +) -> CloudOrganizationResult: + """Get details about a specific organization. + + Requires either organization_id OR organization_name (exact match) to be provided. + This tool is useful for looking up an organization's ID from its name, or vice versa. + """ + api_root = resolve_cloud_api_url() + client_id = resolve_cloud_client_id() + client_secret = resolve_cloud_client_secret() + + org = _resolve_organization( + organization_id=organization_id, + organization_name=organization_name, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + return CloudOrganizationResult( + id=org.organization_id, + name=org.organization_name, + email=org.email, + ) + + def _get_custom_source_definition_description( custom_source: CustomCloudSourceDefinition, ) -> str: