From b42a3a595197e2fa79cd3894e910b66eb9bfab9d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 1 Dec 2025 23:06:13 +0000 Subject: [PATCH 1/4] feat(mcp): add list_cloud_workspaces and describe_cloud_organization tools Add two new MCP tools for organization-scoped workspace discovery: - list_cloud_workspaces: Lists workspaces within a specific organization, requiring either organization_id or organization_name (exact match). Supports name_contains filtering and max_items_limit. - describe_cloud_organization: Gets details about a specific organization, useful for looking up org ID from name or vice versa. These tools enable safe workspace discovery by requiring explicit organization context, preventing accidental access to workspaces across organizations. Also adds supporting api_util functions: - list_organizations_for_user: Lists orgs accessible to current user - list_workspaces_in_organization: Lists workspaces in a specific org Co-Authored-By: AJ Steers --- airbyte/_util/api_util.py | 104 ++++++++++++++++ airbyte/mcp/cloud_ops.py | 246 +++++++++++++++++++++++++++++++++++++- 2 files changed, 349 insertions(+), 1 deletion(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 314a87d85..68462cde7 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1562,3 +1562,107 @@ def get_connector_builder_project_for_definition_id( client_secret=client_secret, ) return json_result.get("builderProjectId") + + +# Organization and workspace listing + + +def list_organizations_for_user( + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> list[models.OrganizationResponse]: + """List all organizations accessible to the current user. + + Uses the public API endpoint: GET /organizations + + Args: + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + List of OrganizationResponse objects containing organization_id, organization_name, email + """ + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + response = airbyte_instance.organizations.list_organizations_for_user() + + if status_ok(response.status_code) and response.organizations_response: + return response.organizations_response.data + + raise AirbyteError( + message="Failed to list organizations for user.", + context={ + "status_code": response.status_code, + "response": response, + }, + ) + + +def list_workspaces_in_organization( + organization_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, + name_contains: str | None = None, + max_items_limit: int | None = None, +) -> list[dict[str, Any]]: + """List workspaces within a specific organization. + + Uses the Config API endpoint: POST /v1/workspaces/list_by_organization_id + + Args: + organization_id: The organization ID to list workspaces for + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + name_contains: Optional substring filter for workspace names (server-side) + max_items_limit: Optional maximum number of workspaces to return + + Returns: + List of workspace dictionaries containing workspaceId, organizationId, name, slug, etc. + """ + result: list[dict[str, Any]] = [] + page_size = 100 + offset = 0 + + while True: + payload: dict[str, Any] = { + "organizationId": organization_id, + "pagination": { + "pageSize": page_size, + "rowOffset": offset, + }, + } + if name_contains: + payload["nameContains"] = name_contains + + json_result = _make_config_api_request( + path="/workspaces/list_by_organization_id", + json=payload, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + workspaces = json_result.get("workspaces", []) + result.extend(workspaces) + + # Check if we've reached the limit or no more results + if max_items_limit is not None and len(result) >= max_items_limit: + result = result[:max_items_limit] + break + + if len(workspaces) < page_size: + # No more pages + break + + offset += page_size + + return result diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index e1016b5aa..c4e579af8 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -8,7 +8,7 @@ from pydantic import BaseModel, Field from airbyte import cloud, get_destination, get_source -from airbyte._util.api_util import PyAirbyteInputError +from airbyte._util import api_util from airbyte.cloud.auth import ( resolve_cloud_api_url, resolve_cloud_client_id, @@ -18,6 +18,7 @@ from airbyte.cloud.connectors import CustomCloudSourceDefinition from airbyte.cloud.workspaces import CloudWorkspace from airbyte.destinations.util import get_noop_destination +from airbyte.exceptions import AirbyteMissingResourceError, PyAirbyteInputError from airbyte.mcp._tool_utils import ( check_guid_created_in_session, mcp_tool, @@ -121,6 +122,28 @@ class CloudConnectionDetails(BaseModel): """Table prefix applied when syncing to the destination.""" +class CloudOrganizationResult(BaseModel): + """Information about an organization in Airbyte Cloud.""" + + id: str + """The organization ID.""" + name: str + """Display name of the organization.""" + email: str + """Email associated with the organization.""" + + +class CloudWorkspaceResult(BaseModel): + """Information about a workspace in Airbyte Cloud.""" + + id: str + """The workspace ID.""" + name: str + """Display name of the workspace.""" + organization_id: str + """ID of the organization this workspace belongs to.""" + + def _get_cloud_workspace(workspace_id: str | None = None) -> CloudWorkspace: """Get an authenticated CloudWorkspace. @@ -852,6 +875,227 @@ def list_deployed_cloud_connections( ] +def _resolve_organization_id( + organization_id: str | None, + organization_name: str | None, + *, + api_root: str, + client_id: str, + client_secret: str, +) -> str: + """Resolve organization ID from either ID or exact name match. + + Args: + organization_id: The organization ID (if provided directly) + organization_name: The organization name (exact match required) + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The resolved organization ID + + Raises: + PyAirbyteInputError: If neither or both parameters are provided, + or if no organization matches the exact name + AirbyteMissingResourceError: If the organization is not found + """ + if organization_id and organization_name: + raise PyAirbyteInputError( + message="Provide either 'organization_id' or 'organization_name', not both." + ) + if not organization_id and not organization_name: + raise PyAirbyteInputError( + message="Either 'organization_id' or 'organization_name' must be provided." + ) + + if organization_id: + return organization_id + + # Look up organization by exact name match + orgs = api_util.list_organizations_for_user( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + # Find exact match (case-sensitive) + matching_orgs = [org for org in orgs if org.organization_name == organization_name] + + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_name": organization_name, + "message": f"No organization found with exact name '{organization_name}' " + "for the current user.", + }, + ) + + if len(matching_orgs) > 1: + raise PyAirbyteInputError( + message=f"Multiple organizations found with name '{organization_name}'. " + "Please use 'organization_id' instead to specify the exact organization." + ) + + return matching_orgs[0].organization_id + + +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def list_cloud_workspaces( + *, + organization_id: Annotated[ + str | None, + Field( + description="Organization ID. Required if organization_name is not provided.", + default=None, + ), + ], + organization_name: Annotated[ + str | None, + Field( + description=( + "Organization name (exact match). " "Required if organization_id is not provided." + ), + default=None, + ), + ], + name_contains: Annotated[ + str | None, + "Optional substring to filter workspaces by name (server-side filtering)", + ] = None, + max_items_limit: Annotated[ + int | None, + "Optional maximum number of items to return (default: no limit)", + ] = None, +) -> list[CloudWorkspaceResult]: + """List all workspaces in a specific organization. + + Requires either organization_id OR organization_name (exact match) to be provided. + This tool will NOT list workspaces across all organizations - you must specify + which organization to list workspaces from. + """ + api_root = resolve_cloud_api_url() + client_id = resolve_cloud_client_id() + client_secret = resolve_cloud_client_secret() + + resolved_org_id = _resolve_organization_id( + organization_id=organization_id, + organization_name=organization_name, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + workspaces = api_util.list_workspaces_in_organization( + organization_id=resolved_org_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + name_contains=name_contains, + max_items_limit=max_items_limit, + ) + + return [ + CloudWorkspaceResult( + id=ws.get("workspaceId", ""), + name=ws.get("name", ""), + organization_id=ws.get("organizationId", ""), + ) + for ws in workspaces + ] + + +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, + extra_help_text=CLOUD_AUTH_TIP_TEXT, +) +def describe_cloud_organization( + *, + organization_id: Annotated[ + str | None, + Field( + description="Organization ID. Required if organization_name is not provided.", + default=None, + ), + ], + organization_name: Annotated[ + str | None, + Field( + description=( + "Organization name (exact match). " "Required if organization_id is not provided." + ), + default=None, + ), + ], +) -> CloudOrganizationResult: + """Get details about a specific organization. + + Requires either organization_id OR organization_name (exact match) to be provided. + This tool is useful for looking up an organization's ID from its name, or vice versa. + """ + api_root = resolve_cloud_api_url() + client_id = resolve_cloud_client_id() + client_secret = resolve_cloud_client_secret() + + # Get all organizations for the user + orgs = api_util.list_organizations_for_user( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + # Find the matching organization + if organization_id: + matching_orgs = [org for org in orgs if org.organization_id == organization_id] + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_id": organization_id, + "message": f"No organization found with ID '{organization_id}' " + "for the current user.", + }, + ) + org = matching_orgs[0] + elif organization_name: + matching_orgs = [org for org in orgs if org.organization_name == organization_name] + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_name": organization_name, + "message": f"No organization found with exact name '{organization_name}' " + "for the current user.", + }, + ) + if len(matching_orgs) > 1: + raise PyAirbyteInputError( + message=f"Multiple organizations found with name '{organization_name}'. " + "Please use 'organization_id' instead to specify the exact organization." + ) + org = matching_orgs[0] + else: + raise PyAirbyteInputError( + message="Either 'organization_id' or 'organization_name' must be provided." + ) + + return CloudOrganizationResult( + id=org.organization_id, + name=org.organization_name, + email=org.email, + ) + + def _get_custom_source_definition_description( custom_source: CustomCloudSourceDefinition, ) -> str: From 22eca44c0423761e76e7da57bce5bfa164eff4be Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 1 Dec 2025 23:11:28 +0000 Subject: [PATCH 2/4] fix: use SecretString type hints for client credentials Update _resolve_organization_id helper function to use SecretString type hints for client_id and client_secret parameters, matching the return types of resolve_cloud_client_id() and resolve_cloud_client_secret(). This fixes pyrefly type check errors where str was being passed to functions expecting SecretString. Co-Authored-By: AJ Steers --- airbyte/mcp/cloud_ops.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index c4e579af8..3525c68a4 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -26,6 +26,7 @@ register_tools, ) from airbyte.mcp._util import resolve_config, resolve_list_of_strings +from airbyte.secrets import SecretString CLOUD_AUTH_TIP_TEXT = ( @@ -880,8 +881,8 @@ def _resolve_organization_id( organization_name: str | None, *, api_root: str, - client_id: str, - client_secret: str, + client_id: SecretString, + client_secret: SecretString, ) -> str: """Resolve organization ID from either ID or exact name match. From 95ed65838cc68a819f019e8992d6643b58af6791 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Mon, 1 Dec 2025 23:33:18 +0000 Subject: [PATCH 3/4] refactor: extract _resolve_organization helper and simplify describe_cloud_organization - Create _resolve_organization() that returns full OrganizationResponse object - Simplify _resolve_organization_id() to call _resolve_organization and extract ID - Refactor describe_cloud_organization() to use _resolve_organization (addresses AJ's comment) - Refactor list_workspaces_in_organization() to use explicit has_more_pages flag instead of while True Co-Authored-By: AJ Steers --- airbyte/_util/api_util.py | 39 ++++++++-------- airbyte/mcp/cloud_ops.py | 93 +++++++++++++++++++-------------------- 2 files changed, 66 insertions(+), 66 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 68462cde7..657ccad49 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1630,19 +1630,21 @@ def list_workspaces_in_organization( """ result: list[dict[str, Any]] = [] page_size = 100 - offset = 0 - - while True: - payload: dict[str, Any] = { - "organizationId": organization_id, - "pagination": { - "pageSize": page_size, - "rowOffset": offset, - }, - } - if name_contains: - payload["nameContains"] = name_contains + # Build base payload + payload: dict[str, Any] = { + "organizationId": organization_id, + "pagination": { + "pageSize": page_size, + "rowOffset": 0, + }, + } + if name_contains: + payload["nameContains"] = name_contains + + # Fetch pages until we have all results or reach the limit + has_more_pages = True + while has_more_pages: json_result = _make_config_api_request( path="/workspaces/list_by_organization_id", json=payload, @@ -1654,15 +1656,14 @@ def list_workspaces_in_organization( workspaces = json_result.get("workspaces", []) result.extend(workspaces) - # Check if we've reached the limit or no more results + # Check if we've reached the limit if max_items_limit is not None and len(result) >= max_items_limit: - result = result[:max_items_limit] - break + return result[:max_items_limit] - if len(workspaces) < page_size: - # No more pages - break + # Check if there are more pages + has_more_pages = len(workspaces) >= page_size - offset += page_size + # Bump offset for next iteration + payload["pagination"]["rowOffset"] += page_size return result diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 3525c68a4..e90ea825c 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -876,15 +876,15 @@ def list_deployed_cloud_connections( ] -def _resolve_organization_id( +def _resolve_organization( organization_id: str | None, organization_name: str | None, *, api_root: str, client_id: SecretString, client_secret: SecretString, -) -> str: - """Resolve organization ID from either ID or exact name match. +) -> api_util.models.OrganizationResponse: + """Resolve organization from either ID or exact name match. Args: organization_id: The organization ID (if provided directly) @@ -894,7 +894,7 @@ def _resolve_organization_id( client_secret: OAuth client secret Returns: - The resolved organization ID + The resolved OrganizationResponse object Raises: PyAirbyteInputError: If neither or both parameters are provided, @@ -910,17 +910,28 @@ def _resolve_organization_id( message="Either 'organization_id' or 'organization_name' must be provided." ) - if organization_id: - return organization_id - - # Look up organization by exact name match + # Get all organizations for the user orgs = api_util.list_organizations_for_user( api_root=api_root, client_id=client_id, client_secret=client_secret, ) - # Find exact match (case-sensitive) + if organization_id: + # Find by ID + matching_orgs = [org for org in orgs if org.organization_id == organization_id] + if not matching_orgs: + raise AirbyteMissingResourceError( + resource_type="organization", + context={ + "organization_id": organization_id, + "message": f"No organization found with ID '{organization_id}' " + "for the current user.", + }, + ) + return matching_orgs[0] + + # Find by exact name match (case-sensitive) matching_orgs = [org for org in orgs if org.organization_name == organization_name] if not matching_orgs: @@ -939,7 +950,29 @@ def _resolve_organization_id( "Please use 'organization_id' instead to specify the exact organization." ) - return matching_orgs[0].organization_id + return matching_orgs[0] + + +def _resolve_organization_id( + organization_id: str | None, + organization_name: str | None, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str: + """Resolve organization ID from either ID or exact name match. + + This is a convenience wrapper around _resolve_organization that returns just the ID. + """ + org = _resolve_organization( + organization_id=organization_id, + organization_name=organization_name, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + return org.organization_id @mcp_tool( @@ -1048,48 +1081,14 @@ def describe_cloud_organization( client_id = resolve_cloud_client_id() client_secret = resolve_cloud_client_secret() - # Get all organizations for the user - orgs = api_util.list_organizations_for_user( + org = _resolve_organization( + organization_id=organization_id, + organization_name=organization_name, api_root=api_root, client_id=client_id, client_secret=client_secret, ) - # Find the matching organization - if organization_id: - matching_orgs = [org for org in orgs if org.organization_id == organization_id] - if not matching_orgs: - raise AirbyteMissingResourceError( - resource_type="organization", - context={ - "organization_id": organization_id, - "message": f"No organization found with ID '{organization_id}' " - "for the current user.", - }, - ) - org = matching_orgs[0] - elif organization_name: - matching_orgs = [org for org in orgs if org.organization_name == organization_name] - if not matching_orgs: - raise AirbyteMissingResourceError( - resource_type="organization", - context={ - "organization_name": organization_name, - "message": f"No organization found with exact name '{organization_name}' " - "for the current user.", - }, - ) - if len(matching_orgs) > 1: - raise PyAirbyteInputError( - message=f"Multiple organizations found with name '{organization_name}'. " - "Please use 'organization_id' instead to specify the exact organization." - ) - org = matching_orgs[0] - else: - raise PyAirbyteInputError( - message="Either 'organization_id' or 'organization_name' must be provided." - ) - return CloudOrganizationResult( id=org.organization_id, name=org.organization_name, From 0e133e4f2411d20b5b82f6e49a14dacf7362a49f Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 2 Dec 2025 02:51:58 +0000 Subject: [PATCH 4/4] fix: handle pagination edge case when total records is multiple of page size - Add explicit check for empty results before extending result list - Exit early if no workspaces returned (handles out-of-range offset case) - Keep existing check for partial page to avoid unnecessary API calls Co-Authored-By: AJ Steers --- airbyte/_util/api_util.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 657ccad49..3355b00f7 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1643,8 +1643,7 @@ def list_workspaces_in_organization( payload["nameContains"] = name_contains # Fetch pages until we have all results or reach the limit - has_more_pages = True - while has_more_pages: + while True: json_result = _make_config_api_request( path="/workspaces/list_by_organization_id", json=payload, @@ -1654,14 +1653,20 @@ def list_workspaces_in_organization( ) workspaces = json_result.get("workspaces", []) + + # If no results returned, we've exhausted all pages + if not workspaces: + break + result.extend(workspaces) # Check if we've reached the limit if max_items_limit is not None and len(result) >= max_items_limit: return result[:max_items_limit] - # Check if there are more pages - has_more_pages = len(workspaces) >= page_size + # If we got fewer results than page_size, this was the last page + if len(workspaces) < page_size: + break # Bump offset for next iteration payload["pagination"]["rowOffset"] += page_size