Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
from dataclasses import dataclass
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
from typing import TYPE_CHECKING, Any, Literal, overload

import yaml

Expand All @@ -52,6 +52,7 @@
CustomCloudSourceDefinition,
)
from airbyte.destinations.base import Destination
from airbyte.exceptions import AirbyteError
from airbyte.secrets.base import SecretString


Expand All @@ -61,6 +62,20 @@
from airbyte.sources.base import Source


@dataclass
class CloudOrganization:
"""Information about an organization in Airbyte Cloud.

This is a minimal value object returned by CloudWorkspace.get_organization().
"""

organization_id: str
"""The organization ID."""

organization_name: str | None = None
"""Display name of the organization."""


@dataclass
class CloudWorkspace:
"""A remote workspace on the Airbyte Cloud.
Expand Down Expand Up @@ -89,6 +104,7 @@ def _organization_info(self) -> dict[str, Any]:
"""Fetch and cache organization info for this workspace.

Uses the Config API endpoint for an efficient O(1) lookup.
This is an internal method; use get_organization() for public access.
"""
return api_util.get_workspace_organization_info(
workspace_id=self.workspace_id,
Expand All @@ -97,21 +113,56 @@ def _organization_info(self) -> dict[str, Any]:
client_secret=self.client_secret,
)

@property
def organization_id(self) -> str | None:
"""The ID of the organization this workspace belongs to.
@overload
def get_organization(self) -> CloudOrganization: ...

This value is cached after the first lookup.
"""
return self._organization_info.get("organizationId")
@overload
def get_organization(
self,
*,
raise_on_error: Literal[True],
) -> CloudOrganization: ...

@property
def organization_name(self) -> str | None:
"""The name of the organization this workspace belongs to.
@overload
def get_organization(
self,
*,
raise_on_error: Literal[False],
) -> CloudOrganization | None: ...

def get_organization(
self,
*,
raise_on_error: bool = True,
) -> CloudOrganization | None:
"""Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization,
which may not be available with workspace-scoped credentials.

This value is cached after the first lookup.
Args:
raise_on_error: If True (default), raises AirbyteError on permission or API errors.
If False, returns None instead of raising.

Returns:
CloudOrganization object with organization_id and organization_name,
or None if raise_on_error=False and an error occurred.

Raises:
AirbyteError: If raise_on_error=True and the organization info cannot be fetched
(e.g., due to insufficient permissions).
"""
return self._organization_info.get("organizationName")
try:
info = self._organization_info
except AirbyteError:
if raise_on_error:
raise
return None

return CloudOrganization(
organization_id=str(info.get("organizationId", "")),
organization_name=info.get("organizationName"),
)

# Test connection and creds

Expand Down
13 changes: 11 additions & 2 deletions airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,12 +485,21 @@ def check_airbyte_cloud_workspace(
client_secret=client_secret,
)

# Try to get organization info, but fail gracefully if we don't have permissions.
# Fetching organization info requires ORGANIZATION_READER permissions on the organization,
# which may not be available with workspace-scoped credentials.
organization = workspace.get_organization(raise_on_error=False)

return CloudWorkspaceResult(
workspace_id=workspace_response.workspace_id,
workspace_name=workspace_response.name,
workspace_url=workspace.workspace_url,
organization_id=workspace.organization_id or "[error: organization ID not discovered]",
organization_name=workspace.organization_name,
organization_id=(
organization.organization_id
if organization
else "[unavailable - requires ORGANIZATION_READER permission]"
),
organization_name=organization.organization_name if organization else None,
)


Expand Down