diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 1b08c1db5..57b0bfd5b 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -38,7 +38,7 @@ from dataclasses import dataclass from functools import cached_property from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Literal, overload import yaml @@ -52,6 +52,7 @@ CustomCloudSourceDefinition, ) from airbyte.destinations.base import Destination +from airbyte.exceptions import AirbyteError from airbyte.secrets.base import SecretString @@ -61,6 +62,20 @@ from airbyte.sources.base import Source +@dataclass +class CloudOrganization: + """Information about an organization in Airbyte Cloud. + + This is a minimal value object returned by CloudWorkspace.get_organization(). + """ + + organization_id: str + """The organization ID.""" + + organization_name: str | None = None + """Display name of the organization.""" + + @dataclass class CloudWorkspace: """A remote workspace on the Airbyte Cloud. @@ -89,6 +104,7 @@ def _organization_info(self) -> dict[str, Any]: """Fetch and cache organization info for this workspace. Uses the Config API endpoint for an efficient O(1) lookup. + This is an internal method; use get_organization() for public access. """ return api_util.get_workspace_organization_info( workspace_id=self.workspace_id, @@ -97,21 +113,71 @@ def _organization_info(self) -> dict[str, Any]: client_secret=self.client_secret, ) - @property - def organization_id(self) -> str | None: - """The ID of the organization this workspace belongs to. + @overload + def get_organization(self) -> CloudOrganization: ... - This value is cached after the first lookup. - """ - return self._organization_info.get("organizationId") + @overload + def get_organization( + self, + *, + raise_on_error: Literal[True], + ) -> CloudOrganization: ... - @property - def organization_name(self) -> str | None: - """The name of the organization this workspace belongs to. + @overload + def get_organization( + self, + *, + raise_on_error: Literal[False], + ) -> CloudOrganization | None: ... + + def get_organization( + self, + *, + raise_on_error: bool = True, + ) -> CloudOrganization | None: + """Get the organization this workspace belongs to. + + Fetching organization info requires ORGANIZATION_READER permissions on the organization, + which may not be available with workspace-scoped credentials. + + Args: + raise_on_error: If True (default), raises AirbyteError on permission or API errors. + If False, returns None instead of raising. - This value is cached after the first lookup. + Returns: + CloudOrganization object with organization_id and organization_name, + or None if raise_on_error=False and an error occurred. + + Raises: + AirbyteError: If raise_on_error=True and the organization info cannot be fetched + (e.g., due to insufficient permissions or missing data). """ - return self._organization_info.get("organizationName") + try: + info = self._organization_info + except AirbyteError: + if raise_on_error: + raise + return None + + organization_id = info.get("organizationId") + organization_name = info.get("organizationName") + + # Validate that both organization_id and organization_name are non-null and non-empty + if not organization_id or not organization_name: + if raise_on_error: + raise AirbyteError( + message="Organization info is incomplete.", + context={ + "organization_id": organization_id, + "organization_name": organization_name, + }, + ) + return None + + return CloudOrganization( + organization_id=organization_id, + organization_name=organization_name, + ) # Test connection and creds diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 1eddca451..970dad2ba 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -159,9 +159,9 @@ class CloudWorkspaceResult(BaseModel): workspace_url: str | None = None """URL to access the workspace in Airbyte Cloud.""" organization_id: str - """ID of the organization this workspace belongs to.""" + """ID of the organization (requires ORGANIZATION_READER permission).""" organization_name: str | None = None - """Name of the organization this workspace belongs to.""" + """Name of the organization (requires ORGANIZATION_READER permission).""" class LogReadResult(BaseModel): @@ -485,12 +485,21 @@ def check_airbyte_cloud_workspace( client_secret=client_secret, ) + # Try to get organization info, but fail gracefully if we don't have permissions. + # Fetching organization info requires ORGANIZATION_READER permissions on the organization, + # which may not be available with workspace-scoped credentials. + organization = workspace.get_organization(raise_on_error=False) + return CloudWorkspaceResult( workspace_id=workspace_response.workspace_id, workspace_name=workspace_response.name, workspace_url=workspace.workspace_url, - organization_id=workspace.organization_id or "[error: organization ID not discovered]", - organization_name=workspace.organization_name, + organization_id=( + organization.organization_id + if organization + else "[unavailable - requires ORGANIZATION_READER permission]" + ), + organization_name=organization.organization_name if organization else None, )