diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 3f72bcfbc..314a87d85 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -538,13 +538,57 @@ def get_source( def delete_source( source_id: str, *, + source_name: str | None = None, api_root: str, client_id: SecretString, client_secret: SecretString, workspace_id: str | None = None, + safe_mode: bool = True, ) -> None: - """Delete a source.""" + """Delete a source. + + Args: + source_id: The source ID to delete + source_name: Optional source name. If not provided and safe_mode is enabled, + the source name will be fetched from the API to perform safety checks. + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + workspace_id: The workspace ID (not currently used) + safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. + + Raises: + PyAirbyteInputError: If safe_mode is True and the source name does not meet + the safety requirements. + """ _ = workspace_id # Not used (yet) + + if safe_mode: + if source_name is None: + source_info = get_source( + source_id=source_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + source_name = source_info.name + + if not _is_safe_name_to_delete(source_name): + raise PyAirbyteInputError( + message=( + f"Cannot delete source '{source_name}' with safe_mode enabled. " + "To authorize deletion, the source name must contain 'delete-me' or 'deleteme' " + "(case insensitive).\n\n" + "Please rename the source to meet this requirement before attempting deletion." + ), + context={ + "source_id": source_id, + "source_name": source_name, + "safe_mode": True, + }, + ) + airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, @@ -700,7 +744,7 @@ def get_destination( # the destination API response is of the wrong type. # https://github.com/airbytehq/pyairbyte/issues/320 raw_response: dict[str, Any] = json.loads(response.raw_response.text) - raw_configuration: dict[str, Any] = raw_response["configuration"] + raw_configuration: dict[str, Any] | None = raw_response.get("configuration") destination_type = raw_response.get("destinationType") destination_mapping = { @@ -710,7 +754,7 @@ def get_destination( "duckdb": models.DestinationDuckdb, } - if destination_type in destination_mapping: + if destination_type in destination_mapping and raw_configuration is not None: response.destination_response.configuration = destination_mapping[ destination_type # pyrefly: ignore[index-error] ](**raw_configuration) @@ -726,13 +770,58 @@ def get_destination( def delete_destination( destination_id: str, *, + destination_name: str | None = None, api_root: str, client_id: SecretString, client_secret: SecretString, workspace_id: str | None = None, + safe_mode: bool = True, ) -> None: - """Delete a destination.""" + """Delete a destination. + + Args: + destination_id: The destination ID to delete + destination_name: Optional destination name. If not provided and safe_mode is enabled, + the destination name will be fetched from the API to perform safety checks. + api_root: The API root URL + client_id: OAuth client ID + client_secret: OAuth client secret + workspace_id: The workspace ID (not currently used) + safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. + + Raises: + PyAirbyteInputError: If safe_mode is True and the destination name does not meet + the safety requirements. + """ _ = workspace_id # Not used (yet) + + if safe_mode: + if destination_name is None: + destination_info = get_destination( + destination_id=destination_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + destination_name = destination_info.name + + if not _is_safe_name_to_delete(destination_name): + raise PyAirbyteInputError( + message=( + f"Cannot delete destination '{destination_name}' with safe_mode enabled. " + "To authorize deletion, the destination name must contain 'delete-me' or " + "'deleteme' (case insensitive).\n\n" + "Please rename the destination to meet this requirement " + "before attempting deletion." + ), + context={ + "destination_id": destination_id, + "destination_name": destination_name, + "safe_mode": True, + }, + ) + airbyte_instance = get_airbyte_server_instance( client_id=client_id, client_secret=client_secret, @@ -902,13 +991,74 @@ def get_connection_by_name( return found[0] +def _is_safe_name_to_delete(name: str) -> bool: + """Check if a name is safe to delete. + + Requires the name to contain either "delete-me" or "deleteme" (case insensitive). + """ + name_lower = name.lower() + return any( + { + "delete-me" in name_lower, + "deleteme" in name_lower, + } + ) + + def delete_connection( connection_id: str, + connection_name: str | None = None, + *, api_root: str, workspace_id: str, client_id: SecretString, client_secret: SecretString, + safe_mode: bool = True, ) -> None: + """Delete a connection. + + Args: + connection_id: The connection ID to delete + connection_name: Optional connection name. If not provided and safe_mode is enabled, + the connection name will be fetched from the API to perform safety checks. + api_root: The API root URL + workspace_id: The workspace ID + client_id: OAuth client ID + client_secret: OAuth client secret + safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. + + Raises: + PyAirbyteInputError: If safe_mode is True and the connection name does not meet + the safety requirements. + """ + if safe_mode: + if connection_name is None: + connection_info = get_connection( + workspace_id=workspace_id, + connection_id=connection_id, + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + connection_name = connection_info.name + + if not _is_safe_name_to_delete(connection_name): + raise PyAirbyteInputError( + message=( + f"Cannot delete connection '{connection_name}' with safe_mode enabled. " + "To authorize deletion, the connection name must contain 'delete-me' or " + "'deleteme' (case insensitive).\n\n" + "Please rename the connection to meet this requirement " + "before attempting deletion." + ), + context={ + "connection_id": connection_id, + "connection_name": connection_name, + "safe_mode": True, + }, + ) + _ = workspace_id # Not used (yet) airbyte_instance = get_airbyte_server_instance( client_id=client_id, @@ -1317,9 +1467,8 @@ def delete_custom_yaml_source_definition( api_root: The API root URL client_id: OAuth client ID client_secret: OAuth client secret - safe_mode: If True, requires the connector name to either start with "delete:" - or contain "delete-me" (case insensitive) to prevent accidental deletion. - Defaults to True. + safe_mode: If True, requires the connector name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. Raises: PyAirbyteInputError: If safe_mode is True and the connector name does not meet @@ -1335,19 +1484,14 @@ def delete_custom_yaml_source_definition( ) connector_name = definition_info.name - def is_safe_to_delete(name: str) -> bool: - name_lower = name.lower() - return name_lower.startswith("delete:") or "delete-me" in name_lower - - if not is_safe_to_delete(definition_info.name): + if not _is_safe_name_to_delete(definition_info.name): raise PyAirbyteInputError( message=( f"Cannot delete custom connector definition '{connector_name}' " "with safe_mode enabled. " - "To authorize deletion, the connector name must either:\n" - " 1. Start with 'delete:' (case insensitive), OR\n" - " 2. Contain 'delete-me' (case insensitive)\n\n" - "Please rename the connector to meet one of these requirements " + "To authorize deletion, the connector name must contain 'delete-me' or " + "'deleteme' (case insensitive).\n\n" + "Please rename the connector to meet this requirement " "before attempting deletion." ), context={ diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index d1edd85c4..768319fa1 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -500,9 +500,8 @@ def permanently_delete( """Permanently delete this custom source definition. Args: - safe_mode: If True, requires the connector name to either start with "delete:" - or contain "delete-me" (case insensitive) to prevent accidental deletion. - Defaults to True. + safe_mode: If True, requires the connector name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. """ if self.definition_type == "yaml": api_util.delete_custom_yaml_source_definition( diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index af201e4c9..7957a9e29 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -250,10 +250,17 @@ def deploy_destination( def permanently_delete_source( self, source: str | CloudSource, + *, + safe_mode: bool = True, ) -> None: """Delete a source from the workspace. You can pass either the source ID `str` or a deployed `Source` object. + + Args: + source: The source ID or CloudSource object to delete + safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. """ if not isinstance(source, (str, CloudSource)): raise exc.PyAirbyteInputError( @@ -263,9 +270,11 @@ def permanently_delete_source( api_util.delete_source( source_id=source.connector_id if isinstance(source, CloudSource) else source, + source_name=source.name if isinstance(source, CloudSource) else None, api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + safe_mode=safe_mode, ) # Deploy and delete destinations @@ -273,10 +282,17 @@ def permanently_delete_source( def permanently_delete_destination( self, destination: str | CloudDestination, + *, + safe_mode: bool = True, ) -> None: """Delete a deployed destination from the workspace. You can pass either the `Cache` class or the deployed destination ID as a `str`. + + Args: + destination: The destination ID or CloudDestination object to delete + safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. """ if not isinstance(destination, (str, CloudDestination)): raise exc.PyAirbyteInputError( @@ -288,9 +304,13 @@ def permanently_delete_destination( destination_id=( destination if isinstance(destination, str) else destination.destination_id ), + destination_name=( + destination.name if isinstance(destination, CloudDestination) else None + ), api_root=self.api_root, client_id=self.client_id, client_secret=self.client_secret, + safe_mode=safe_mode, ) # Deploy and delete connections @@ -351,8 +371,19 @@ def permanently_delete_connection( *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, + safe_mode: bool = True, ) -> None: - """Delete a deployed connection from the workspace.""" + """Delete a deployed connection from the workspace. + + Args: + connection: The connection ID or CloudConnection object to delete + cascade_delete_source: If True, also delete the source after deleting the connection + cascade_delete_destination: If True, also delete the destination after deleting + the connection + safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" + (case insensitive) to prevent accidental deletion. Defaults to True. Also applies + to cascade deletes. + """ if connection is None: raise ValueError("No connection ID provided.") @@ -364,16 +395,24 @@ def permanently_delete_connection( api_util.delete_connection( connection_id=connection.connection_id, + connection_name=connection.name, api_root=self.api_root, workspace_id=self.workspace_id, client_id=self.client_id, client_secret=self.client_secret, + safe_mode=safe_mode, ) if cascade_delete_source: - self.permanently_delete_source(source=connection.source_id) + self.permanently_delete_source( + source=connection.source_id, + safe_mode=safe_mode, + ) if cascade_delete_destination: - self.permanently_delete_destination(destination=connection.destination_id) + self.permanently_delete_destination( + destination=connection.destination_id, + safe_mode=safe_mode, + ) # List sources, destinations, and connections diff --git a/airbyte/mcp/_tool_utils.py b/airbyte/mcp/_tool_utils.py index 96ecc5252..c9fd333b7 100644 --- a/airbyte/mcp/_tool_utils.py +++ b/airbyte/mcp/_tool_utils.py @@ -50,6 +50,8 @@ def register_guid_created_in_session(guid: str) -> None: def check_guid_created_in_session(guid: str) -> None: """Check if a GUID was created in this session. + This is a no-op if AIRBYTE_CLOUD_MCP_SAFE_MODE is set to "0". + Raises SafeModeError if the GUID was not created in this session and AIRBYTE_CLOUD_MCP_SAFE_MODE is set to 1. diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index 1cba32d63..849d72921 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -8,6 +8,7 @@ from pydantic import BaseModel, Field from airbyte import cloud, get_destination, get_source +from airbyte._util.api_util import PyAirbyteInputError from airbyte.cloud.auth import ( resolve_cloud_api_url, resolve_cloud_client_id, @@ -127,33 +128,29 @@ def deploy_source_to_cloud( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - source = get_source( - source_connector_name, - no_executor=True, - ) - config_dict = resolve_config( - config=config, - config_secret_name=config_secret_name, - config_spec_jsonschema=source.config_spec, - ) - source.set_config(config_dict, validate=True) + source = get_source( + source_connector_name, + no_executor=True, + ) + config_dict = resolve_config( + config=config, + config_secret_name=config_secret_name, + config_spec_jsonschema=source.config_spec, + ) + source.set_config(config_dict, validate=True) - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - deployed_source = workspace.deploy_source( - name=source_name, - source=source, - unique=unique, - ) + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + deployed_source = workspace.deploy_source( + name=source_name, + source=source, + unique=unique, + ) - except Exception as ex: - return f"Failed to deploy source '{source_name}': {ex}" - else: - register_guid_created_in_session(deployed_source.connector_id) - return ( - f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" - f" and URL: {deployed_source.connector_url}" - ) + register_guid_created_in_session(deployed_source.connector_id) + return ( + f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" + f" and URL: {deployed_source.connector_url}" + ) @mcp_tool( @@ -205,33 +202,29 @@ def deploy_destination_to_cloud( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - destination = get_destination( - destination_connector_name, - no_executor=True, - ) - config_dict = resolve_config( - config=config, - config_secret_name=config_secret_name, - config_spec_jsonschema=destination.config_spec, - ) - destination.set_config(config_dict, validate=True) + destination = get_destination( + destination_connector_name, + no_executor=True, + ) + config_dict = resolve_config( + config=config, + config_secret_name=config_secret_name, + config_spec_jsonschema=destination.config_spec, + ) + destination.set_config(config_dict, validate=True) - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - deployed_destination = workspace.deploy_destination( - name=destination_name, - destination=destination, - unique=unique, - ) + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + deployed_destination = workspace.deploy_destination( + name=destination_name, + destination=destination, + unique=unique, + ) - except Exception as ex: - return f"Failed to deploy destination '{destination_name}': {ex}" - else: - register_guid_created_in_session(deployed_destination.connector_id) - return ( - f"Successfully deployed destination '{destination_name}' " - f"with ID: {deployed_destination.connector_id}" - ) + register_guid_created_in_session(deployed_destination.connector_id) + return ( + f"Successfully deployed destination '{destination_name}' " + f"with ID: {deployed_destination.connector_id}" + ) @mcp_tool( @@ -284,25 +277,21 @@ def create_connection_on_cloud( Airbyte Cloud API. """ resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams) - try: - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - deployed_connection = workspace.deploy_connection( - connection_name=connection_name, - source=source_id, - destination=destination_id, - selected_streams=resolved_streams_list, - table_prefix=table_prefix, - ) + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + deployed_connection = workspace.deploy_connection( + connection_name=connection_name, + source=source_id, + destination=destination_id, + selected_streams=resolved_streams_list, + table_prefix=table_prefix, + ) - except Exception as ex: - return f"Failed to create connection '{connection_name}': {ex}" - else: - register_guid_created_in_session(deployed_connection.connection_id) - return ( - f"Successfully created connection '{connection_name}' " - f"with ID '{deployed_connection.connection_id}' and " - f"URL: {deployed_connection.connection_url}" - ) + register_guid_created_in_session(deployed_connection.connection_id) + return ( + f"Successfully created connection '{connection_name}' " + f"with ID '{deployed_connection.connection_id}' and " + f"URL: {deployed_connection.connection_url}" + ) @mcp_tool( @@ -343,26 +332,18 @@ def run_cloud_sync( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - connection = workspace.get_connection(connection_id=connection_id) - sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + connection = workspace.get_connection(connection_id=connection_id) + sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) - except Exception as ex: - return f"Failed to run sync for connection '{connection_id}': {ex}" - else: - if wait: - status = sync_result.get_job_status() - return ( - f"Sync completed with status: {status}. " # Sync completed. - f"Job ID is '{sync_result.job_id}' and " - f"job URL is: {sync_result.job_url}" - ) + if wait: + status = sync_result.get_job_status() return ( - f"Sync started. " # Sync started. + f"Sync completed with status: {status}. " f"Job ID is '{sync_result.job_id}' and " f"job URL is: {sync_result.job_url}" ) + return f"Sync started. Job ID is '{sync_result.job_id}' and job URL is: {sync_result.job_url}" @mcp_tool( @@ -389,18 +370,14 @@ def check_airbyte_cloud_workspace( Returns workspace ID and workspace URL for verification. """ - try: - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - workspace.connect() + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + workspace.connect() - except Exception as ex: - return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" - else: - return ( - f"✅ Successfully connected to Airbyte Cloud workspace.\n" - f"Workspace ID: {workspace.workspace_id}\n" - f"Workspace URL: {workspace.workspace_url}" - ) + return ( + f"✅ Successfully connected to Airbyte Cloud workspace.\n" + f"Workspace ID: {workspace.workspace_id}\n" + f"Workspace URL: {workspace.workspace_url}" + ) @mcp_tool( @@ -425,23 +402,19 @@ def deploy_noop_destination_to_cloud( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - destination = get_noop_destination() - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - deployed_destination = workspace.deploy_destination( - name=name, - destination=destination, - unique=unique, - ) - except Exception as ex: - return f"Failed to deploy No-op Destination: {ex}" - else: - register_guid_created_in_session(deployed_destination.connector_id) - return ( - f"Successfully deployed No-op Destination " - f"with ID '{deployed_destination.connector_id}' and " - f"URL: {deployed_destination.connector_url}" - ) + destination = get_noop_destination() + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + deployed_destination = workspace.deploy_destination( + name=name, + destination=destination, + unique=unique, + ) + register_guid_created_in_session(deployed_destination.connector_id) + return ( + f"Successfully deployed No-op Destination " + f"with ID '{deployed_destination.connector_id}' and " + f"URL: {deployed_destination.connector_url}" + ) @mcp_tool( @@ -486,49 +459,40 @@ def get_cloud_sync_status( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - connection = workspace.get_connection(connection_id=connection_id) - - # If a job ID is provided, get the job by ID. - sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) - - if not sync_result: - return {"status": None, "job_id": None, "attempts": []} - - result = { - "status": sync_result.get_job_status(), - "job_id": sync_result.job_id, - "bytes_synced": sync_result.bytes_synced, - "records_synced": sync_result.records_synced, - "start_time": sync_result.start_time.isoformat(), - "job_url": sync_result.job_url, - "attempts": [], - } + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + connection = workspace.get_connection(connection_id=connection_id) - if include_attempts: - attempts = sync_result.get_attempts() - result["attempts"] = [ - { - "attempt_number": attempt.attempt_number, - "attempt_id": attempt.attempt_id, - "status": attempt.status, - "bytes_synced": attempt.bytes_synced, - "records_synced": attempt.records_synced, - "created_at": attempt.created_at.isoformat(), - } - for attempt in attempts - ] - - return result # noqa: TRY300 - - except Exception as ex: - return { - "status": None, - "job_id": job_id, - "error": f"Failed to get sync status for connection '{connection_id}': {ex}", - "attempts": [], - } + # If a job ID is provided, get the job by ID. + sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) + + if not sync_result: + return {"status": None, "job_id": None, "attempts": []} + + result = { + "status": sync_result.get_job_status(), + "job_id": sync_result.job_id, + "bytes_synced": sync_result.bytes_synced, + "records_synced": sync_result.records_synced, + "start_time": sync_result.start_time.isoformat(), + "job_url": sync_result.job_url, + "attempts": [], + } + + if include_attempts: + attempts = sync_result.get_attempts() + result["attempts"] = [ + { + "attempt_number": attempt.attempt_number, + "attempt_id": attempt.attempt_id, + "status": attempt.status, + "bytes_synced": attempt.bytes_synced, + "records_synced": attempt.records_synced, + "created_at": attempt.created_at.isoformat(), + } + for attempt in attempts + ] + + return result @mcp_tool( @@ -673,44 +637,40 @@ def get_cloud_sync_logs( and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the Airbyte Cloud API. """ - try: - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - connection = workspace.get_connection(connection_id=connection_id) + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + connection = workspace.get_connection(connection_id=connection_id) - sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) + sync_result: cloud.SyncResult | None = connection.get_sync_result(job_id=job_id) - if not sync_result: - return f"No sync job found for connection '{connection_id}'" + if not sync_result: + return f"No sync job found for connection '{connection_id}'" - attempts = sync_result.get_attempts() + attempts = sync_result.get_attempts() - if not attempts: - return f"No attempts found for job '{sync_result.job_id}'" + if not attempts: + return f"No attempts found for job '{sync_result.job_id}'" - if attempt_number is not None: - target_attempt = None - for attempt in attempts: - if attempt.attempt_number == attempt_number: - target_attempt = attempt - break + if attempt_number is not None: + target_attempt = None + for attempt in attempts: + if attempt.attempt_number == attempt_number: + target_attempt = attempt + break - if target_attempt is None: - return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" - else: - target_attempt = max(attempts, key=lambda a: a.attempt_number) + if target_attempt is None: + return f"Attempt number {attempt_number} not found for job '{sync_result.job_id}'" + else: + target_attempt = max(attempts, key=lambda a: a.attempt_number) - logs = target_attempt.get_full_log_text() + logs = target_attempt.get_full_log_text() - if not logs: - return ( - f"No logs available for job '{sync_result.job_id}', " - f"attempt {target_attempt.attempt_number}" - ) - - return logs # noqa: TRY300 + if not logs: + return ( + f"No logs available for job '{sync_result.job_id}', " + f"attempt {target_attempt.attempt_number}" + ) - except Exception as ex: - return f"Failed to get logs for connection '{connection_id}': {ex}" + return logs @mcp_tool( @@ -829,29 +789,25 @@ def publish_custom_source_definition( Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available. """ - try: - processed_manifest = manifest_yaml - if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: - processed_manifest = Path(manifest_yaml) - - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - custom_source = workspace.publish_custom_source_definition( - name=name, - manifest_yaml=processed_manifest, - unique=unique, - pre_validate=pre_validate, - ) - except Exception as ex: - return f"Failed to publish custom source definition '{name}': {ex}" - else: - register_guid_created_in_session(custom_source.definition_id) - return ( - "Successfully published custom YAML source definition:\n" - + _get_custom_source_definition_description( - custom_source=custom_source, - ) - + "\n" + processed_manifest = manifest_yaml + if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: + processed_manifest = Path(manifest_yaml) + + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + custom_source = workspace.publish_custom_source_definition( + name=name, + manifest_yaml=processed_manifest, + unique=unique, + pre_validate=pre_validate, + ) + register_guid_created_in_session(custom_source.definition_id) + return ( + "Successfully published custom YAML source definition:\n" + + _get_custom_source_definition_description( + custom_source=custom_source, ) + + "\n" + ) @mcp_tool( @@ -929,29 +885,25 @@ def update_custom_source_definition( Docker-based custom sources are not yet available. """ check_guid_created_in_session(definition_id) - try: - processed_manifest = manifest_yaml - if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: - processed_manifest = Path(manifest_yaml) - - workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) - definition = workspace.get_custom_source_definition( - definition_id=definition_id, - definition_type="yaml", - ) - custom_source: CustomCloudSourceDefinition = definition.update_definition( - manifest_yaml=processed_manifest, - pre_validate=pre_validate, - ) - except Exception as ex: - return f"Failed to update custom source definition '{definition_id}': {ex}" - else: - return ( - "Successfully updated custom YAML source definition:\n" - + _get_custom_source_definition_description( - custom_source=custom_source, - ) + processed_manifest = manifest_yaml + if isinstance(manifest_yaml, str) and "\n" not in manifest_yaml: + processed_manifest = Path(manifest_yaml) + + workspace: CloudWorkspace = _get_cloud_workspace(workspace_id) + definition = workspace.get_custom_source_definition( + definition_id=definition_id, + definition_type="yaml", + ) + custom_source: CustomCloudSourceDefinition = definition.update_definition( + manifest_yaml=processed_manifest, + pre_validate=pre_validate, + ) + return ( + "Successfully updated custom YAML source definition:\n" + + _get_custom_source_definition_description( + custom_source=custom_source, ) + ) @mcp_tool( @@ -964,6 +916,10 @@ def permanently_delete_custom_source_definition( str, Field(description="The ID of the custom source definition to delete."), ], + name: Annotated[ + str, + Field(description="The expected name of the custom source definition (for verification)."), + ], *, workspace_id: Annotated[ str | None, @@ -975,14 +931,16 @@ def permanently_delete_custom_source_definition( ) -> str: """Permanently delete a custom YAML source definition from Airbyte Cloud. - IMPORTANT: This operation requires the connector name to either: - 1. Start with "delete:" (case insensitive), OR - 2. Contain "delete-me" (case insensitive) + IMPORTANT: This operation requires the connector name to contain "delete-me" or "deleteme" + (case insensitive). - If the connector does not meet these requirements, the deletion will be rejected with a + If the connector does not meet this requirement, the deletion will be rejected with a helpful error message. Instruct the user to rename the connector appropriately to authorize the deletion. + The provided name must match the actual name of the definition for the operation to proceed. + This is a safety measure to ensure you are deleting the correct resource. + Note: Only YAML (declarative) connectors are currently supported. Docker-based custom sources are not yet available. """ @@ -992,13 +950,225 @@ def permanently_delete_custom_source_definition( definition_id=definition_id, definition_type="yaml", ) - definition_name: str = definition.name # Capture name before deletion + actual_name: str = definition.name + + # Verify the name matches + if actual_name != name: + raise PyAirbyteInputError( + message=( + f"Name mismatch: expected '{name}' but found '{actual_name}'. " + "The provided name must exactly match the definition's actual name. " + "This is a safety measure to prevent accidental deletion." + ), + context={ + "definition_id": definition_id, + "expected_name": name, + "actual_name": actual_name, + }, + ) + definition.permanently_delete( safe_mode=True, # Hard-coded safe mode for extra protection when running in LLM agents. ) - return ( - f"Successfully deleted custom source definition '{definition_name}' (ID: {definition_id})" + return f"Successfully deleted custom source definition '{actual_name}' (ID: {definition_id})" + + +@mcp_tool( + domain="cloud", + destructive=True, + open_world=True, +) +def permanently_delete_cloud_source( + source_id: Annotated[ + str, + Field(description="The ID of the deployed source to delete."), + ], + name: Annotated[ + str, + Field(description="The expected name of the source (for verification)."), + ], +) -> str: + """Permanently delete a deployed source connector from Airbyte Cloud. + + IMPORTANT: This operation requires the source name to contain "delete-me" or "deleteme" + (case insensitive). + + If the source does not meet this requirement, the deletion will be rejected with a + helpful error message. Instruct the user to rename the source appropriately to authorize + the deletion. + + The provided name must match the actual name of the source for the operation to proceed. + This is a safety measure to ensure you are deleting the correct resource. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + check_guid_created_in_session(source_id) + workspace: CloudWorkspace = _get_cloud_workspace() + source = workspace.get_source(source_id=source_id) + actual_name: str = cast(str, source.name) + + # Verify the name matches + if actual_name != name: + raise PyAirbyteInputError( + message=( + f"Name mismatch: expected '{name}' but found '{actual_name}'. " + "The provided name must exactly match the source's actual name. " + "This is a safety measure to prevent accidental deletion." + ), + context={ + "source_id": source_id, + "expected_name": name, + "actual_name": actual_name, + }, + ) + + # Safe mode is hard-coded to True for extra protection when running in LLM agents + workspace.permanently_delete_source( + source=source_id, + safe_mode=True, # Requires name to contain "delete-me" or "deleteme" (case insensitive) + ) + return f"Successfully deleted source '{actual_name}' (ID: {source_id})" + + +@mcp_tool( + domain="cloud", + destructive=True, + open_world=True, +) +def permanently_delete_cloud_destination( + destination_id: Annotated[ + str, + Field(description="The ID of the deployed destination to delete."), + ], + name: Annotated[ + str, + Field(description="The expected name of the destination (for verification)."), + ], +) -> str: + """Permanently delete a deployed destination connector from Airbyte Cloud. + + IMPORTANT: This operation requires the destination name to contain "delete-me" or "deleteme" + (case insensitive). + + If the destination does not meet this requirement, the deletion will be rejected with a + helpful error message. Instruct the user to rename the destination appropriately to authorize + the deletion. + + The provided name must match the actual name of the destination for the operation to proceed. + This is a safety measure to ensure you are deleting the correct resource. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + check_guid_created_in_session(destination_id) + workspace: CloudWorkspace = _get_cloud_workspace() + destination = workspace.get_destination(destination_id=destination_id) + actual_name: str = cast(str, destination.name) + + # Verify the name matches + if actual_name != name: + raise PyAirbyteInputError( + message=( + f"Name mismatch: expected '{name}' but found '{actual_name}'. " + "The provided name must exactly match the destination's actual name. " + "This is a safety measure to prevent accidental deletion." + ), + context={ + "destination_id": destination_id, + "expected_name": name, + "actual_name": actual_name, + }, + ) + + # Safe mode is hard-coded to True for extra protection when running in LLM agents + workspace.permanently_delete_destination( + destination=destination_id, + safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") + ) + return f"Successfully deleted destination '{actual_name}' (ID: {destination_id})" + + +@mcp_tool( + domain="cloud", + destructive=True, + open_world=True, +) +def permanently_delete_cloud_connection( + connection_id: Annotated[ + str, + Field(description="The ID of the connection to delete."), + ], + name: Annotated[ + str, + Field(description="The expected name of the connection (for verification)."), + ], + *, + cascade_delete_source: Annotated[ + bool, + Field( + description=( + "Whether to also delete the source connector associated with this connection." + ), + default=False, + ), + ] = False, + cascade_delete_destination: Annotated[ + bool, + Field( + description=( + "Whether to also delete the destination connector associated with this connection." + ), + default=False, + ), + ] = False, +) -> str: + """Permanently delete a connection from Airbyte Cloud. + + IMPORTANT: This operation requires the connection name to contain "delete-me" or "deleteme" + (case insensitive). + + If the connection does not meet this requirement, the deletion will be rejected with a + helpful error message. Instruct the user to rename the connection appropriately to authorize + the deletion. + + The provided name must match the actual name of the connection for the operation to proceed. + This is a safety measure to ensure you are deleting the correct resource. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + check_guid_created_in_session(connection_id) + workspace: CloudWorkspace = _get_cloud_workspace() + connection = workspace.get_connection(connection_id=connection_id) + actual_name: str = cast(str, connection.name) + + # Verify the name matches + if actual_name != name: + raise PyAirbyteInputError( + message=( + f"Name mismatch: expected '{name}' but found '{actual_name}'. " + "The provided name must exactly match the connection's actual name. " + "This is a safety measure to prevent accidental deletion." + ), + context={ + "connection_id": connection_id, + "expected_name": name, + "actual_name": actual_name, + }, + ) + + # Safe mode is hard-coded to True for extra protection when running in LLM agents + workspace.permanently_delete_connection( + safe_mode=True, # Requires name-based delete disposition ("delete-me" or "deleteme") + connection=connection_id, + cascade_delete_source=cascade_delete_source, + cascade_delete_destination=cascade_delete_destination, ) + return f"Successfully deleted connection '{actual_name}' (ID: {connection_id})" @mcp_tool( diff --git a/tests/integration_tests/cloud/test_cloud_workspaces.py b/tests/integration_tests/cloud/test_cloud_workspaces.py index e9900b9cb..419383be5 100644 --- a/tests/integration_tests/cloud/test_cloud_workspaces.py +++ b/tests/integration_tests/cloud/test_cloud_workspaces.py @@ -18,7 +18,7 @@ def test_deploy_destination( ) -> None: """Test deploying a source to a workspace.""" cloud_destination = cloud_workspace.deploy_destination( - name="test-destination", + name="test-destination-deleteme", destination=deployable_dummy_destination, random_name_suffix=True, ) @@ -68,18 +68,18 @@ def test_deploy_connection( """Test deploying a source and cache to a workspace as a new connection.""" stream_names = deployable_dummy_source.get_selected_streams() cloud_source = cloud_workspace.deploy_source( - name="test-source", + name="test-source-deleteme", source=deployable_dummy_source, random_name_suffix=True, ) cloud_destination = cloud_workspace.deploy_destination( - name="test-destination", + name="test-destination-deleteme", destination=deployable_dummy_destination, random_name_suffix=True, ) connection: CloudConnection = cloud_workspace.deploy_connection( - connection_name="test-connection", + connection_name="test-connection-deleteme", source=cloud_source, destination=cloud_destination, selected_streams=stream_names, diff --git a/tests/integration_tests/cloud/test_custom_definitions.py b/tests/integration_tests/cloud/test_custom_definitions.py index fbabf469e..779d082ec 100644 --- a/tests/integration_tests/cloud/test_custom_definitions.py +++ b/tests/integration_tests/cloud/test_custom_definitions.py @@ -126,7 +126,6 @@ def test_yaml_validation_error( "name_template,expect_allow", [ ("test-yaml-source-{suffix}", False), - ("delete:test-yaml-source-{suffix}", True), ("test-delete-me-yaml-source-{suffix}", True), ], ) @@ -164,7 +163,7 @@ def test_safe_mode_deletion( error_message = str(exc_info.value).lower() assert "safe_mode" in error_message - assert "delete:" in error_message or "delete-me" in error_message + assert "delete-me" in error_message or "deleteme" in error_message finally: definition.permanently_delete(safe_mode=False)