diff --git a/.ruff.toml b/.ruff.toml index f0b61926a..7c3f9689f 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -93,6 +93,7 @@ ignore = [ "PLW0603", # Using the global statement to update _cache is discouraged "PLW0108", # Lambda may be unnecessary; consider inlining inner function "TRY003", # Allow exceptions to receive strings in constructors. + "TRY300", # Consider moving statement to else block (conflicts with RET505) # "TD003", # Require links for TODOs (now enabled) "UP038", # Allow tuples instead of "|" syntax in `isinstance()` checks ("|" is sometimes slower) ] diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 3f72bcfbc..242f6ac18 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -1010,8 +1010,8 @@ def get_bearer_token( "accept": "application/json", }, json={ - "client_id": client_id, - "client_secret": client_secret, + "client_id": str(client_id), + "client_secret": str(client_secret), }, ) if not status_ok(response.status_code): @@ -1418,3 +1418,384 @@ def get_connector_builder_project_for_definition_id( client_secret=client_secret, ) return json_result.get("builderProjectId") + + +def get_connector_version( + *, + connector_id: str, + connector_type: Literal["source", "destination"], + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> dict[str, Any]: + """Get the current version for a source or destination connector. + + Uses the Config API endpoint: + - /v1/actor_definition_versions/get_for_source + - /v1/actor_definition_versions/get_for_destination + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml + + Args: + connector_id: The source or destination ID + connector_type: Either "source" or "destination" + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + A dictionary containing version information including: + - dockerImageTag: The version string (e.g., "0.1.0") + - actorDefinitionId: The connector definition ID + - actorDefinitionVersionId: The specific version ID + - isOverrideApplied: Whether a version override is active + """ + endpoint = f"/actor_definition_versions/get_for_{connector_type}" + return _make_config_api_request( + path=endpoint, + json={ + f"{connector_type}Id": connector_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + +def resolve_connector_version( + *, + actor_definition_id: str, + connector_type: Literal["source", "destination"], + version: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str: + """Resolve a semver version string to an actor_definition_version_id. + + Uses the Config API endpoint: + /v1/actor_definition_versions/resolve + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml + + Args: + actor_definition_id: The connector definition ID + connector_type: Either "source" or "destination" + version: The semver version string (e.g., "0.1.0") + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The actor_definition_version_id (UUID as string) + + Raises: + AirbyteError: If the version cannot be resolved + """ + json_result = _make_config_api_request( + path="/actor_definition_versions/resolve", + json={ + "actorDefinitionId": actor_definition_id, + "actorType": connector_type, + "dockerImageTag": version, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + version_id = json_result.get("versionId") + if not version_id: + raise AirbyteError( + message=f"Could not resolve version '{version}' for connector", + context={ + "actor_definition_id": actor_definition_id, + "connector_type": connector_type, + "version": version, + "response": json_result, + }, + ) + return version_id + + +def _find_connector_version_override_id( + *, + connector_id: str, + actor_definition_id: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str | None: + """Find the scoped configuration ID for a connector version override. + + Uses the /v1/scoped_configuration/get_context endpoint to retrieve the active + configuration for the given scope without needing to list all configurations. + + Args: + connector_id: The source or destination ID + actor_definition_id: The connector definition ID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The scoped configuration ID if found, None otherwise + """ + json_result = _make_config_api_request( + path="/scoped_configuration/get_context", + json={ + "config_key": "connector_version", + "scope_type": "actor", + "scope_id": connector_id, + "resource_type": "actor_definition", + "resource_id": actor_definition_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + active_config = json_result.get("activeConfiguration") + if active_config: + return active_config.get("id") + + return None + + +def get_connector_version_override_info( + *, + connector_id: str, + actor_definition_id: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> dict[str, Any] | None: + """Get full information about a connector version override if one exists. + + Uses the /v1/scoped_configuration/get_context endpoint to retrieve the active + configuration for the given scope. + + Args: + connector_id: The source or destination ID + actor_definition_id: The connector definition ID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + Dictionary with override info including 'id', 'origin' (user UUID), 'description', etc. + Returns None if no override exists. + """ + json_result = _make_config_api_request( + path="/scoped_configuration/get_context", + json={ + "config_key": "connector_version", + "scope_type": "actor", + "scope_id": connector_id, + "resource_type": "actor_definition", + "resource_id": actor_definition_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + return json_result.get("activeConfiguration") + + +def clear_connector_version_override( + *, + connector_id: str, + actor_definition_id: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> bool: + """Clear a version override for a source or destination connector. + + Deletes the scoped configuration that pins the connector to a specific version. + + Uses the Config API endpoints: + - /v1/scoped_configuration/get_context (to find the override) + - /v1/scoped_configuration/delete (to remove it) + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml + + Args: + connector_id: The source or destination ID + actor_definition_id: The connector definition ID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + True if an override was found and deleted, False if no override existed + """ + config_id = _find_connector_version_override_id( + connector_id=connector_id, + actor_definition_id=actor_definition_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + if not config_id: + return False + + _make_config_api_request( + path="/scoped_configuration/delete", + json={ + "scopedConfigurationId": config_id, + }, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + return True + + +def get_user_id_by_email( + *, + email: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str: + """Get a user's UUID by their email address. + + Uses the Config API endpoint: + /v1/users/list_instance_admin + + Args: + email: The user's email address + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The user's UUID as a string + + Raises: + ValueError: If no user with the given email is found + """ + response = _make_config_api_request( + path="/users/list_instance_admin", + json={}, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + users = response.get("users", []) + for user in users: + if user.get("email") == email: + return user["userId"] + + raise ValueError(f"No user found with email: {email}") + + +def get_user_email_by_id( + *, + user_id: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> str | None: + """Get a user's email address by their UUID. + + Uses the Config API endpoint: + /v1/users/list_instance_admin + + Args: + user_id: The user's UUID + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + + Returns: + The user's email address, or None if not found + """ + response = _make_config_api_request( + path="/users/list_instance_admin", + json={}, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + users = response.get("users", []) + for user in users: + if user.get("userId") == user_id: + return user.get("email") + + return None + + +def set_connector_version_override( # noqa: PLR0913 + *, + connector_id: str, + actor_definition_id: str, + actor_definition_version_id: str, + override_reason: str, + user_email: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, + override_reason_reference_url: str | None = None, +) -> dict[str, Any]: + """Set a version override for a source or destination connector. + + Creates a scoped configuration at the ACTOR level to pin the connector + to a specific version. + + Uses the Config API endpoint: + /v1/scoped_configuration/create + + See: https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml + + Args: + connector_id: The source or destination ID + actor_definition_id: The connector definition ID + actor_definition_version_id: The version ID to pin to + override_reason: Explanation for why the version override is being set + user_email: Email address of the user creating the override + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + override_reason_reference_url: Optional URL with more context (e.g., issue link) + + Returns: + The created scoped configuration response + """ + user_id = get_user_id_by_email( + email=user_email, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body: dict[str, Any] = { + "config_key": "connector_version", + "value": actor_definition_version_id, + "resource_type": "actor_definition", + "resource_id": actor_definition_id, + "scope_type": "actor", + "scope_id": connector_id, + "origin": user_id, + "origin_type": "user", + "description": override_reason, + } + if override_reason_reference_url: + request_body["reference_url"] = override_reason_reference_url + + # Note: The ScopedConfiguration API also supports an optional "expiresAt" field + # (ISO 8601 datetime string) to automatically expire the override after a certain date. + # This could be added in the future if there are business cases for time-limited overrides. + + return _make_config_api_request( + path="/scoped_configuration/create", + json=request_body, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) diff --git a/airbyte/_util/biz_logic.py b/airbyte/_util/biz_logic.py new file mode 100644 index 000000000..010bf4e13 --- /dev/null +++ b/airbyte/_util/biz_logic.py @@ -0,0 +1,104 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Business logic utilities for PyAirbyte. + +This module contains business logic functions that enforce policies and rules +for various operations in PyAirbyte. +""" + +from __future__ import annotations + + +def validate_version_override_permission( + *, + current_version: str, + new_version: str | None, + is_setting_override: bool, + override_reason: str, + user_email: str, + existing_override_creator_email: str | None = None, +) -> tuple[bool, str]: + """Validate whether a version override operation should be permitted. + + This function encapsulates the business logic for determining when version + overrides should be allowed. It can be refined iteratively based on feedback + from the team. + + Args: + current_version: The current version string (e.g., "0.1.0", "0.1.0-dev", "0.1.0-rc") + new_version: The version to pin to (None if unsetting) + is_setting_override: True if setting an override, False if clearing + override_reason: The reason provided for the override + user_email: Email of the user performing the operation + existing_override_creator_email: Email of who created the existing override (if any) + + Returns: + A tuple of (is_permitted, reason_if_not_permitted) + - is_permitted: True if the operation should be allowed + - reason_if_not_permitted: Empty string if permitted, otherwise an explanation + + Business Rules (to be refined): + 1. Dev versions (-dev suffix): + - Only the creator can unpin their own dev version override + - Anyone can pin to a dev version (for testing) + + 2. Release candidates (-rc suffix): + - Any admin can pin/unpin RC versions + - These are for pre-release testing + + 3. Production versions (no suffix): + - Should require strong justification + - Override reason must clearly indicate customer request or support investigation + - Consider requiring additional approval or stricter validation + + Note: + This is a placeholder implementation. The actual business logic should be + refined with input from Catherine Noll and other stakeholders. + """ + + def _get_version_suffix(version: str) -> str | None: + """Extract suffix from version string (e.g., '-dev', '-rc', or None for prod).""" + if "-dev" in version: + return "-dev" + if "-rc" in version: + return "-rc" + return None + + current_suffix = _get_version_suffix(current_version) + new_suffix = _get_version_suffix(new_version) if new_version else None + + if ( + current_suffix == "-dev" + and not is_setting_override + and existing_override_creator_email + and user_email != existing_override_creator_email + ): + return ( + False, + f"Cannot unpin dev version override created by {existing_override_creator_email}. " + "Only the creator can remove their own dev version pins.", + ) + + if is_setting_override and new_suffix is None: + reason_lower = override_reason.lower() + valid_keywords = ["customer", "support", "investigation", "requested", "incident"] + if not any(keyword in reason_lower for keyword in valid_keywords): + return ( + False, + "Pinning to a production version requires strong justification. " + "Override reason must mention customer request, support investigation, " + "or similar critical need. " + "Keywords: customer, support, investigation, requested, incident", + ) + + if not is_setting_override and current_suffix is None: + reason_lower = override_reason.lower() + valid_keywords = ["resolved", "fixed", "safe", "tested", "approved"] + if not any(keyword in reason_lower for keyword in valid_keywords): + return ( + False, + "Clearing a production version override requires justification. " + "Override reason must indicate the issue is resolved or it's safe to revert. " + "Keywords: resolved, fixed, safe, tested, approved", + ) + + return (True, "") diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index d1edd85c4..07417af38 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -46,11 +46,14 @@ import yaml from airbyte_api import models as api_models # noqa: TC002 +from pydantic import BaseModel from airbyte import exceptions as exc -from airbyte._util import api_util, text_util +from airbyte._util import api_util, biz_logic, text_util +MIN_OVERRIDE_REASON_LENGTH = 10 + if TYPE_CHECKING: from airbyte.cloud.workspaces import CloudWorkspace @@ -84,6 +87,20 @@ def __repr__(self) -> str: ) +class CloudConnectorVersionInfo(BaseModel): + """Information about a cloud connector's version.""" + + version: str + """The version string (e.g., '0.1.0').""" + + is_version_pinned: bool + """Whether a version override is active for this connector.""" + + def __str__(self) -> str: + """Return a string representation of the version.""" + return self.version if not self.is_version_pinned else f"{self.version} (pinned)" + + class CloudConnector(abc.ABC): """A cloud connector is a deployed source or destination on Airbyte Cloud. @@ -257,6 +274,165 @@ def _from_source_response( result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API return result + def get_connector_version(self) -> CloudConnectorVersionInfo: + """Get the current version information for this source. + + Returns: + CloudConnectorVersionInfo with version string and pinned status + """ + version_data = api_util.get_connector_version( + connector_id=self.connector_id, + connector_type=self.connector_type, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudConnectorVersionInfo( + version=version_data["dockerImageTag"], + is_version_pinned=version_data.get("isVersionOverrideApplied", False), + ) + + def _set_connector_version_override( + self, + version: str | None = None, + *, + unset: bool = False, + override_reason: str | None = None, + override_reason_reference_url: str | None = None, + user_email: str, + ) -> bool: + """Set or clear a version override for this source. + + **Internal use only.** This method is only for internal Airbyte admin use. + Endpoints are not enabled for end user access. + + You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. + When setting a version, `override_reason` is required. + + Args: + version: The semver version string to pin to (e.g., `"0.1.0"`) + unset: If `True`, removes any existing version override + override_reason: Required when setting a version. Explanation for the override. + override_reason_reference_url: Optional URL with more context (e.g., issue link) + user_email: Email of the user creating the override (from AIRBYTE_INTERNAL_ADMIN_USER). + + Returns: + `True` if the operation succeeded, `False` if no override existed (unset only) + + Raises: + exc.PyAirbyteInputError: If both or neither parameters are provided, or if + override_reason is missing when setting a version + """ + if (version is None) == (not unset): + raise exc.PyAirbyteInputError( + message=( + "Must specify EXACTLY ONE of version (to set) OR " + "unset=True (to clear), but not both" + ), + context={ + "version_provided": version is not None, + "unset": unset, + }, + ) + + if version is not None and not override_reason: + raise exc.PyAirbyteInputError( + message="override_reason is required when setting a version override", + context={ + "version": version, + "override_reason": override_reason, + }, + ) + + if override_reason is not None and len(override_reason) < MIN_OVERRIDE_REASON_LENGTH: + raise exc.PyAirbyteInputError( + message=( + f"override_reason must be at least {MIN_OVERRIDE_REASON_LENGTH} " + "characters long" + ), + context={ + "override_reason": override_reason, + "length": len(override_reason), + }, + ) + + connector_info = self._fetch_connector_info() + actor_definition_id = connector_info.definition_id + + current_version_info = self.get_connector_version() + current_version = current_version_info.version + + existing_override_info = api_util.get_connector_version_override_info( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + existing_override_creator_email = None + if existing_override_info and existing_override_info.get("origin"): + creator_user_id = existing_override_info["origin"] + existing_override_creator_email = api_util.get_user_email_by_id( + user_id=creator_user_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + is_permitted, denial_reason = biz_logic.validate_version_override_permission( + current_version=current_version, + new_version=version, + is_setting_override=not unset, + override_reason=override_reason or "", + user_email=user_email, + existing_override_creator_email=existing_override_creator_email, + ) + + if not is_permitted: + raise exc.PyAirbyteInputError( + message=f"Version override operation not permitted: {denial_reason}", + context={ + "current_version": current_version, + "new_version": version, + "is_setting_override": not unset, + "user_email": user_email, + "existing_override_creator_email": existing_override_creator_email, + }, + ) + + if unset: + return api_util.clear_connector_version_override( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + actor_definition_version_id = api_util.resolve_connector_version( + actor_definition_id=actor_definition_id, + connector_type=self.connector_type, + version=version, # type: ignore[arg-type] + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + api_util.set_connector_version_override( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + actor_definition_version_id=actor_definition_version_id, + override_reason=override_reason, # type: ignore[arg-type] + user_email=user_email, # type: ignore[arg-type] + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + override_reason_reference_url=override_reason_reference_url, + ) + + return True + class CloudDestination(CloudConnector): """A cloud destination is a destination that is deployed on Airbyte Cloud.""" @@ -339,6 +515,165 @@ def _from_destination_response( result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API return result + def get_connector_version(self) -> CloudConnectorVersionInfo: + """Get the current version information for this destination. + + Returns: + CloudConnectorVersionInfo with version string and pinned status + """ + version_data = api_util.get_connector_version( + connector_id=self.connector_id, + connector_type=self.connector_type, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudConnectorVersionInfo( + version=version_data["dockerImageTag"], + is_version_pinned=version_data.get("isVersionOverrideApplied", False), + ) + + def _set_connector_version_override( + self, + version: str | None = None, + *, + unset: bool = False, + override_reason: str | None = None, + override_reason_reference_url: str | None = None, + user_email: str, + ) -> bool: + """Set or clear a version override for this destination. + + **Internal use only.** This method is only for internal Airbyte admin use. + Endpoints are not enabled for end user access. + + You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. + When setting a version, `override_reason` is required. + + Args: + version: The semver version string to pin to (e.g., `"0.1.0"`) + unset: If `True`, removes any existing version override + override_reason: Required when setting a version. Explanation for the override. + override_reason_reference_url: Optional URL with more context (e.g., issue link) + user_email: Email of the user creating the override (from AIRBYTE_INTERNAL_ADMIN_USER). + + Returns: + `True` if the operation succeeded, `False` if no override existed (unset only) + + Raises: + exc.PyAirbyteInputError: If both or neither parameters are provided, or if + override_reason is missing when setting a version + """ + if (version is None) == (not unset): + raise exc.PyAirbyteInputError( + message=( + "Must specify EXACTLY ONE of version (to set) OR " + "unset=True (to clear), but not both" + ), + context={ + "version_provided": version is not None, + "unset": unset, + }, + ) + + if version is not None and not override_reason: + raise exc.PyAirbyteInputError( + message="override_reason is required when setting a version override", + context={ + "version": version, + "override_reason": override_reason, + }, + ) + + if override_reason is not None and len(override_reason) < MIN_OVERRIDE_REASON_LENGTH: + raise exc.PyAirbyteInputError( + message=( + f"override_reason must be at least {MIN_OVERRIDE_REASON_LENGTH} " + "characters long" + ), + context={ + "override_reason": override_reason, + "length": len(override_reason), + }, + ) + + connector_info = self._fetch_connector_info() + actor_definition_id = connector_info.definition_id + + current_version_info = self.get_connector_version() + current_version = current_version_info.version + + existing_override_info = api_util.get_connector_version_override_info( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + existing_override_creator_email = None + if existing_override_info and existing_override_info.get("origin"): + creator_user_id = existing_override_info["origin"] + existing_override_creator_email = api_util.get_user_email_by_id( + user_id=creator_user_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + is_permitted, denial_reason = biz_logic.validate_version_override_permission( + current_version=current_version, + new_version=version, + is_setting_override=not unset, + override_reason=override_reason or "", + user_email=user_email, + existing_override_creator_email=existing_override_creator_email, + ) + + if not is_permitted: + raise exc.PyAirbyteInputError( + message=f"Version override operation not permitted: {denial_reason}", + context={ + "current_version": current_version, + "new_version": version, + "is_setting_override": not unset, + "user_email": user_email, + "existing_override_creator_email": existing_override_creator_email, + }, + ) + + if unset: + return api_util.clear_connector_version_override( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + actor_definition_version_id = api_util.resolve_connector_version( + actor_definition_id=actor_definition_id, + connector_type=self.connector_type, + version=version, # type: ignore[arg-type] + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + api_util.set_connector_version_override( + connector_id=self.connector_id, + actor_definition_id=actor_definition_id, + actor_definition_version_id=actor_definition_version_id, + override_reason=override_reason, # type: ignore[arg-type] + user_email=user_email, # type: ignore[arg-type] + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + override_reason_reference_url=override_reason_reference_url, + ) + + return True + class CustomCloudSourceDefinition: """A custom source connector definition in Airbyte Cloud. diff --git a/airbyte/mcp/_annotations.py b/airbyte/mcp/_annotations.py index 78cd82f05..f18577d68 100644 --- a/airbyte/mcp/_annotations.py +++ b/airbyte/mcp/_annotations.py @@ -50,3 +50,13 @@ FastMCP default if not specified: True """ + +AIRBYTE_INTERNAL = "airbyte_internal" +"""Custom Airbyte annotation indicating the tool is for internal admin use only. + +This is not a standard MCP annotation. When True, the tool requires both +AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER environment +variables to be properly configured for registration. + +Default if not specified: False +""" diff --git a/airbyte/mcp/_tool_utils.py b/airbyte/mcp/_tool_utils.py index 21f6ff3ba..5e0c50ba4 100644 --- a/airbyte/mcp/_tool_utils.py +++ b/airbyte/mcp/_tool_utils.py @@ -12,6 +12,7 @@ from typing import Any, Literal, TypeVar from airbyte.mcp._annotations import ( + AIRBYTE_INTERNAL, DESTRUCTIVE_HINT, IDEMPOTENT_HINT, OPEN_WORLD_HINT, @@ -66,11 +67,18 @@ def should_register_tool(annotations: dict[str, Any]) -> bool: """Check if a tool should be registered based on mode settings. Args: - annotations: Tool annotations dict containing domain, readOnlyHint, and destructiveHint + annotations: Tool annotations dict containing domain, readOnlyHint, destructiveHint, + and airbyte_internal Returns: True if the tool should be registered, False if it should be filtered out """ + if annotations.get(AIRBYTE_INTERNAL): + admin_flag = os.environ.get("AIRBYTE_INTERNAL_ADMIN_FLAG") + admin_user = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") + if admin_flag != "airbyte.io" or not admin_user or not admin_user.endswith("@airbyte.io"): + return False + if annotations.get("domain") != "cloud": return True @@ -106,6 +114,7 @@ def mcp_tool( destructive: bool = False, idempotent: bool = False, open_world: bool = False, + airbyte_internal: bool = False, extra_help_text: str | None = None, ) -> Callable[[F], F]: """Decorator to tag an MCP tool function with annotations for deferred registration. @@ -119,6 +128,7 @@ def mcp_tool( destructive: If True, tool modifies/deletes existing data (default: False) idempotent: If True, repeated calls have same effect (default: False) open_world: If True, tool interacts with external systems (default: False) + airbyte_internal: If True, tool is only for internal Airbyte admin use. extra_help_text: Optional text to append to the function's docstring with a newline delimiter @@ -136,6 +146,7 @@ def list_sources(): DESTRUCTIVE_HINT: destructive, IDEMPOTENT_HINT: idempotent, OPEN_WORLD_HINT: open_world, + AIRBYTE_INTERNAL: airbyte_internal, } def decorator(func: F) -> F: diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 305731006..bea3e5795 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -1,6 +1,7 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. """Airbyte Cloud MCP operations.""" +import os from pathlib import Path from typing import Annotated, Any @@ -27,6 +28,44 @@ from airbyte.mcp._util import resolve_config, resolve_list_of_strings +def _check_internal_admin_flag() -> bool: + """Check if internal admin flag is properly configured. + + Returns: + True if both AIRBYTE_INTERNAL_ADMIN_FLAG and AIRBYTE_INTERNAL_ADMIN_USER are set correctly. + """ + admin_flag = os.environ.get("AIRBYTE_INTERNAL_ADMIN_FLAG") + admin_user = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") + + if admin_flag != "airbyte.io": + print( + "Warning: Invalid AIRBYTE_INTERNAL_ADMIN_FLAG configuration. " + "Remove or correct the environment variable." + ) + return False + + if not admin_user: + print( + "Warning: Invalid AIRBYTE_INTERNAL_ADMIN_USER configuration. " + "Remove or correct the environment variable." + ) + return False + + return True + + +def _get_admin_user_email() -> str | None: + """Get the admin user email from environment variable. + + Returns: + The admin user email from AIRBYTE_INTERNAL_ADMIN_USER, or None if not configured. + """ + if not _check_internal_admin_flag(): + return None + + return os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") + + def _get_cloud_workspace() -> CloudWorkspace: """Get an authenticated CloudWorkspace using environment variables.""" return CloudWorkspace( @@ -427,7 +466,7 @@ def get_cloud_sync_status( for attempt in attempts ] - return result # noqa: TRY300 + return result except Exception as ex: return { @@ -534,7 +573,7 @@ def get_cloud_sync_logs( f"attempt {target_attempt.attempt_number}" ) - return logs # noqa: TRY300 + return logs except Exception as ex: return f"Failed to get logs for connection '{connection_id}': {ex}" @@ -759,6 +798,222 @@ def permanently_delete_custom_source_definition( ) +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, +) +def get_cloud_source_connector_version( + source_id: Annotated[ + str, + Field(description="The ID of the deployed source connector."), + ], +) -> str: + """Get the current version information for a deployed source connector. + + Returns version details including the current version string and whether an override + is applied. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + source = workspace.get_source(source_id=source_id) + version_info = source.get_connector_version() + return str(version_info) + + +@mcp_tool( + domain="cloud", + read_only=True, + idempotent=True, + open_world=True, +) +def get_cloud_destination_connector_version( + destination_id: Annotated[ + str, + Field(description="The ID of the deployed destination connector."), + ], +) -> str: + """Get the current version information for a deployed destination connector. + + Returns version details including the current version string and whether an override + is applied. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + destination = workspace.get_destination(destination_id=destination_id) + version_info = destination.get_connector_version() + return str(version_info) + + +@mcp_tool( + domain="cloud", + destructive=True, + idempotent=False, + open_world=True, + airbyte_internal=True, +) +def set_cloud_source_connector_version_override( + source_id: Annotated[ + str, + Field(description="The ID of the deployed source connector."), + ], + version: Annotated[ + str | None, + Field( + description="The semver version string to pin to (e.g., '0.1.0'). " + "Must be None if unset is True.", + default=None, + ), + ] = None, + *, + unset: Annotated[ + bool, + Field( + description="If True, removes any existing version override. " + "Cannot be True if version is provided.", + default=False, + ), + ] = False, + override_reason: Annotated[ + str | None, + Field( + description=( + "Required when setting a version. " + "Explanation for the override (min 10 characters)." + ), + default=None, + ), + ] = None, + override_reason_reference_url: Annotated[ + str | None, + Field( + description="Optional URL with more context (e.g., issue link).", + default=None, + ), + ] = None, +) -> str: + """Set or clear a version override for a deployed source connector. + + You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. + When setting a version, `override_reason` is required and must be at least 10 characters. + + This is an admin-only operation. Requires environment variables: + - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io + - AIRBYTE_INTERNAL_ADMIN_USER= + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") + if not admin_user_email: + raise ValueError("AIRBYTE_INTERNAL_ADMIN_USER environment variable is required") + + workspace: CloudWorkspace = _get_cloud_workspace() + source = workspace.get_source(source_id=source_id) + result = source._set_connector_version_override( # noqa: SLF001 # Accessing Non-Public API + version=version, + unset=unset, + override_reason=override_reason, + override_reason_reference_url=override_reason_reference_url, + user_email=admin_user_email, + ) + + if unset: + if result: + return f"Successfully cleared version override for source '{source_id}'" + return f"No version override was set for source '{source_id}'" + return f"Successfully set version override to '{version}' for source '{source_id}'" + + +@mcp_tool( + domain="cloud", + destructive=True, + idempotent=False, + open_world=True, + airbyte_internal=True, +) +def set_cloud_destination_connector_version_override( + destination_id: Annotated[ + str, + Field(description="The ID of the deployed destination connector."), + ], + version: Annotated[ + str | None, + Field( + description="The semver version string to pin to (e.g., '0.1.0'). " + "Must be None if unset is True.", + default=None, + ), + ] = None, + *, + unset: Annotated[ + bool, + Field( + description="If True, removes any existing version override. " + "Cannot be True if version is provided.", + default=False, + ), + ] = False, + override_reason: Annotated[ + str | None, + Field( + description=( + "Required when setting a version. " + "Explanation for the override (min 10 characters)." + ), + default=None, + ), + ] = None, + override_reason_reference_url: Annotated[ + str | None, + Field( + description="Optional URL with more context (e.g., issue link).", + default=None, + ), + ] = None, +) -> str: + """Set or clear a version override for a deployed destination connector. + + You must specify EXACTLY ONE of `version` OR `unset=True`, but not both. + When setting a version, `override_reason` is required and must be at least 10 characters. + + This is an admin-only operation. Requires environment variables: + - AIRBYTE_INTERNAL_ADMIN_FLAG=airbyte.io + - AIRBYTE_INTERNAL_ADMIN_USER= + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + admin_user_email = os.environ.get("AIRBYTE_INTERNAL_ADMIN_USER") + if not admin_user_email: + raise ValueError("AIRBYTE_INTERNAL_ADMIN_USER environment variable is required") + + workspace: CloudWorkspace = _get_cloud_workspace() + destination = workspace.get_destination(destination_id=destination_id) + result = destination._set_connector_version_override( # noqa: SLF001 # Accessing Non-Public API + version=version, + unset=unset, + override_reason=override_reason, + override_reason_reference_url=override_reason_reference_url, + user_email=admin_user_email, + ) + + if unset: + if result: + return f"Successfully cleared version override for destination '{destination_id}'" + return f"No version override was set for destination '{destination_id}'" + return f"Successfully set version override to '{version}' for destination '{destination_id}'" + + @mcp_tool( domain="cloud", open_world=True,