From 87942f452c3581b853005e669932aab9e95495b8 Mon Sep 17 00:00:00 2001 From: Avinash Santhanagopalan <43074786+Avinash-1394@users.noreply.github.com> Date: Mon, 18 Aug 2025 10:08:44 -0400 Subject: [PATCH] Support retrying when S3 deletion fails --- .../unreleased/Fixes-20250818-100707.yaml | 6 + .../src/dbt/adapters/athena/exceptions.py | 25 +++ dbt-athena/src/dbt/adapters/athena/impl.py | 131 ++++++++++++---- dbt-athena/tests/unit/test_adapter.py | 145 +++++++++++++++++- 4 files changed, 278 insertions(+), 29 deletions(-) create mode 100644 dbt-athena/.changes/unreleased/Fixes-20250818-100707.yaml diff --git a/dbt-athena/.changes/unreleased/Fixes-20250818-100707.yaml b/dbt-athena/.changes/unreleased/Fixes-20250818-100707.yaml new file mode 100644 index 000000000..4f3f3f893 --- /dev/null +++ b/dbt-athena/.changes/unreleased/Fixes-20250818-100707.yaml @@ -0,0 +1,6 @@ +kind: Fixes +body: Retry when S3 deletion fails instead of immediately failing +time: 2025-08-18T10:07:07.057583-04:00 +custom: + Author: Avinash-1394 + Issue: "1270" diff --git a/dbt-athena/src/dbt/adapters/athena/exceptions.py b/dbt-athena/src/dbt/adapters/athena/exceptions.py index adf6928f3..0782601dc 100644 --- a/dbt-athena/src/dbt/adapters/athena/exceptions.py +++ b/dbt-athena/src/dbt/adapters/athena/exceptions.py @@ -1,3 +1,5 @@ +from typing import Any + from dbt_common.exceptions import CompilationError, DbtRuntimeError @@ -7,3 +9,26 @@ class SnapshotMigrationRequired(CompilationError): class S3LocationException(DbtRuntimeError): pass + + +class S3DeleteRetriableException(DbtRuntimeError): + """Raised when S3 object deletion fails due to a retriable error (e.g., SlowDown, InternalError).""" + + def __init__(self, msg: str) -> None: + """ + Initializes the S3DeleteRetriableException with a message. + + Args: + msg (str): The message describing the error. + """ + super().__init__(msg) + self.msg = msg + + def __str__(self) -> Any: + """ + Returns a string representation of the exception. + + Returns: + str: The message of the exception. + """ + return self.msg diff --git a/dbt-athena/src/dbt/adapters/athena/impl.py b/dbt-athena/src/dbt/adapters/athena/impl.py index f646504bc..b3b9384ee 100755 --- a/dbt-athena/src/dbt/adapters/athena/impl.py +++ b/dbt-athena/src/dbt/adapters/athena/impl.py @@ -28,6 +28,7 @@ TableVersionTypeDef, ) from pyathena.error import OperationalError +from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential from dbt.adapters.athena import AthenaConnectionManager from dbt.adapters.athena.column import AthenaColumn @@ -35,6 +36,7 @@ from dbt.adapters.athena.connections import AthenaCursor from dbt.adapters.athena.constants import LOGGER from dbt.adapters.athena.exceptions import ( + S3DeleteRetriableException, S3LocationException, SnapshotMigrationRequired, ) @@ -502,41 +504,114 @@ def upload_seed_to_s3( @available def delete_from_s3(self, s3_path: str) -> None: """ - Deletes files from s3 given a s3 path in the format: s3://my_bucket/prefix - Additionally, parses the response from the s3 delete request and raises - a DbtRuntimeError in case it included errors. + Deletes the S3 path specified by `s3_path`. If the path does not exist, it does nothing. + + Args: + s3_path: The S3 path to delete, e.g., "s3://my-bucket/my-prefix/". + + Raises: + DbtRuntimeError: If the S3 delete operation fails after retries. """ conn = self.connections.get_thread_connection() creds = conn.credentials client = conn.handle bucket_name, prefix = self._parse_s3_path(s3_path) - if self._s3_path_exists(bucket_name, prefix): - s3_resource = client.session.resource( - "s3", - region_name=client.region_name, - config=get_boto3_config(num_retries=creds.effective_num_retries), + if not self._s3_path_exists(bucket_name, prefix): + LOGGER.debug("S3 path does not exist, nothing to delete") + return + s3_resource = client.session.resource( + "s3", + region_name=client.region_name, + config=get_boto3_config(num_retries=creds.effective_num_retries), + ) + s3_bucket = s3_resource.Bucket(bucket_name) + LOGGER.debug( + f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'" + ) + try: + self._delete_with_app_retry(s3_bucket, bucket_name, prefix) + except Exception as e: + LOGGER.exception( + f"S3 delete operation failed after retries for: " + f"s3_path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'" ) - s3_bucket = s3_resource.Bucket(bucket_name) - LOGGER.debug( - f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'" + raise DbtRuntimeError( + f"Failed to delete S3 path '{s3_path}', bucket='{bucket_name}', prefix='{prefix}' " + f"after {creds.effective_num_retries} retries. " + ) from e + + @retry( + stop=stop_after_attempt(4), # Up to 4x boto3 retries + wait=wait_exponential(multiplier=30, min=30, max=300), # 30s, 60s, 120s, 240s + retry=retry_if_exception( + lambda e: ( + ( + isinstance(e, ClientError) + and e.response.get("Error", {}).get("Code", "") + in ["SlowDown", "InternalError"] + ) + or isinstance(e, S3DeleteRetriableException) ) - response = s3_bucket.objects.filter(Prefix=prefix).delete() - is_all_successful = True - for res in response: - if "Errors" in res: - for err in res["Errors"]: - is_all_successful = False - LOGGER.error( - "Failed to delete files: Key='{}', Code='{}', Message='{}', s3_bucket='{}'", - err["Key"], - err["Code"], - err["Message"], - bucket_name, - ) - if is_all_successful is False: - raise DbtRuntimeError("Failed to delete files from S3.") - else: - LOGGER.debug("S3 path does not exist") + ), + reraise=True, + before_sleep=lambda retry_state: LOGGER.warning( + f"Retrying S3 delete (attempt {retry_state.attempt_number}) due to error: " + f"{retry_state.outcome.exception() if retry_state.outcome is not None else 'Unknown error'}" + ), + ) + def _delete_with_app_retry(self, s3_bucket: Any, bucket_name: str, prefix: str) -> None: + """ + Deletes all objects in the S3 bucket with the given prefix. + + Args: + s3_bucket: The S3 bucket resource. + bucket_name: The name of the S3 bucket. + prefix: The prefix to filter objects to delete. + + Raises: + S3DeleteRetriableException: If there were retriable errors but they did not go + away after retries (e.g. SlowDown, InternalError). + DbtRuntimeError: If there were only non retryiable errors in the response + from the S3 delete operation. + """ + response = s3_bucket.objects.filter(Prefix=prefix).delete() + self._check_s3_delete_response(response, bucket_name, prefix) + + def _check_s3_delete_response(self, response: Any, bucket_name: str, prefix: str) -> None: + """ + Checks the response from the S3 delete operation. + + Args: + response: The response from the S3 delete operation. + bucket_name: The name of the S3 bucket. + Raises: + S3DeleteRetriableException: If retriable errors are found. + DbtRuntimeError: If non-retriable errors are found. + """ + retriable_errors = {"SlowDown", "InternalError"} + error_codes = set() + for res in response: + if "Errors" in res: + for err in res["Errors"]: + error_codes.add(err["Code"]) + LOGGER.error( + "Failed to delete files: Key='{}', Code='{}', Message='{}', s3_bucket='{}'", + err["Key"], + err["Code"], + err["Message"], + bucket_name, + ) + if not error_codes: + msg = "S3 delete operation completed successfully with no errors." + LOGGER.debug(msg) + return + # If there is at least one retriable error, we raise a S3DeleteRetriableException + # So that we try our best effort to delete the files. + if error_codes & retriable_errors: + raise S3DeleteRetriableException( + f"Retriable S3 delete errors: {error_codes} occurred when deleting prefix: {prefix}" + ) + raise DbtRuntimeError("Failed to delete files from S3") @staticmethod def _parse_s3_path(s3_path: str) -> Tuple[str, str]: diff --git a/dbt-athena/tests/unit/test_adapter.py b/dbt-athena/tests/unit/test_adapter.py index f6391d47e..1d00113bd 100644 --- a/dbt-athena/tests/unit/test_adapter.py +++ b/dbt-athena/tests/unit/test_adapter.py @@ -9,6 +9,8 @@ import boto3 import botocore import pytest + +from botocore.exceptions import ClientError from dbt_common.clients import agate_helper from dbt_common.exceptions import ConnectionError, DbtRuntimeError from moto import mock_aws @@ -19,7 +21,10 @@ from dbt.adapters.athena import Plugin as AthenaPlugin from dbt.adapters.athena.column import AthenaColumn from dbt.adapters.athena.connections import AthenaCursor, AthenaParameterFormatter -from dbt.adapters.athena.exceptions import S3LocationException +from dbt.adapters.athena.exceptions import ( + S3DeleteRetriableException, + S3LocationException, +) from dbt.adapters.athena.relation import AthenaRelation, TableType from dbt.adapters.athena.utils import AthenaCatalogType from dbt.adapters.contracts.connection import ConnectionState @@ -1377,6 +1382,144 @@ def test_drop_glue_database(self): self.adapter.drop_glue_database(database_name=test_input["Name"]) assert glue_client.get_databases()["DatabaseList"] == [] + @pytest.mark.parametrize( + "path_exists, delete_side_effect, expected_exception", + [ + # 1. Path does not exist + (False, None, None), + # 2. _delete_with_app_retry returns nothing + (True, None, None), + # 3. _delete_with_app_retry raises ClientError + ( + True, + ClientError( + error_response={ + "Error": {"Code": "InternalError", "Message": "Something went wrong"} + }, + operation_name="DeleteObjects", + ), + DbtRuntimeError, + ), + # 4. _delete_with_app_retry raises DbtRuntimeError + ( + True, + DbtRuntimeError("Failed to delete files from S3."), + DbtRuntimeError, + ), + ], + ) + @mock_aws + def test_delete_from_s3(self, path_exists, delete_side_effect, expected_exception): + adapter = self.adapter + adapter.acquire_connection("dummy") + s3_path = "s3://bucket/prefix" + + # Patch _parse_s3_path, _s3_path_exists, _delete_with_app_retry, and LOGGER + with ( + patch.object(adapter, "_s3_path_exists", return_value=path_exists), + patch.object(adapter, "_delete_with_app_retry") as mock_delete_with_app_retry, + ): + if delete_side_effect: + mock_delete_with_app_retry.side_effect = delete_side_effect + + if expected_exception: + with pytest.raises(expected_exception): + adapter.delete_from_s3(s3_path) + else: + adapter.delete_from_s3(s3_path) + + @pytest.mark.parametrize( + "response, expected_exception", + [ + ( + # Single error in response + [ + { + "Errors": [ + { + "Key": "some/key/file1", + "Code": "InternalError", + "Message": "An internal error occurred", + } + ] + } + ], + S3DeleteRetriableException, + ), + ( + # Multiple errors in response + [ + { + "Errors": [ + { + "Key": "some/key/file1", + "Code": "InternalError", + "Message": "An internal error occurred", + }, + { + "Key": "some/key/file2", + "Code": "SlowDown", + "Message": "Reduce your request rate", + }, + ] + } + ], + S3DeleteRetriableException, + ), + ( + # No errors in response + [{}], + None, + ), + ( + # Mixed: one with errors, one without + [ + {}, + { + "Errors": [ + { + "Key": "some/key/file3", + "Code": "AccessDenied", + "Message": "Access Denied", + } + ] + }, + ], + DbtRuntimeError, + ), + ( + # Mixed: one retriable error, one non-retriable error + [ + { + "Errors": [ + { + "Key": "some/key/file1", + "Code": "InternalError", + "Message": "An internal error occurred", + }, + { + "Key": "some/key/file3", + "Code": "AccessDenied", + "Message": "Access Denied", + }, + ] + } + ], + S3DeleteRetriableException, + ), + ], + ) + @mock_aws + def test_check_s3_delete_response(self, response, expected_exception): + bucket_name = "my-bucket" + prefix = "test_prefix" + if expected_exception: + with pytest.raises(expected_exception): + self.adapter._check_s3_delete_response(response, bucket_name, prefix) + else: + # Should not raise if there are no errors + self.adapter._check_s3_delete_response(response, bucket_name, prefix) + class TestAthenaFilterCatalog: def test__catalog_filter_table(self):