Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions dbt-athena/.changes/unreleased/Fixes-20250818-100707.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Retry when S3 deletion fails instead of immediately failing
time: 2025-08-18T10:07:07.057583-04:00
custom:
Author: Avinash-1394
Issue: "1270"
25 changes: 25 additions & 0 deletions dbt-athena/src/dbt/adapters/athena/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import Any

from dbt_common.exceptions import CompilationError, DbtRuntimeError


Expand All @@ -7,3 +9,26 @@ class SnapshotMigrationRequired(CompilationError):

class S3LocationException(DbtRuntimeError):
pass


class S3DeleteRetriableException(DbtRuntimeError):
"""Raised when S3 object deletion fails due to a retriable error (e.g., SlowDown, InternalError)."""

def __init__(self, msg: str) -> None:
"""
Initializes the S3DeleteRetriableException with a message.

Args:
msg (str): The message describing the error.
"""
super().__init__(msg)
self.msg = msg

def __str__(self) -> Any:
"""
Returns a string representation of the exception.

Returns:
str: The message of the exception.
"""
return self.msg
131 changes: 103 additions & 28 deletions dbt-athena/src/dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
TableVersionTypeDef,
)
from pyathena.error import OperationalError
from tenacity import retry, retry_if_exception, stop_after_attempt, wait_exponential

from dbt.adapters.athena import AthenaConnectionManager
from dbt.adapters.athena.column import AthenaColumn
from dbt.adapters.athena.config import get_boto3_config
from dbt.adapters.athena.connections import AthenaCursor
from dbt.adapters.athena.constants import LOGGER
from dbt.adapters.athena.exceptions import (
S3DeleteRetriableException,
S3LocationException,
SnapshotMigrationRequired,
)
Expand Down Expand Up @@ -502,41 +504,114 @@ def upload_seed_to_s3(
@available
def delete_from_s3(self, s3_path: str) -> None:
"""
Deletes files from s3 given a s3 path in the format: s3://my_bucket/prefix
Additionally, parses the response from the s3 delete request and raises
a DbtRuntimeError in case it included errors.
Deletes the S3 path specified by `s3_path`. If the path does not exist, it does nothing.

Args:
s3_path: The S3 path to delete, e.g., "s3://my-bucket/my-prefix/".

Raises:
DbtRuntimeError: If the S3 delete operation fails after retries.
"""
conn = self.connections.get_thread_connection()
creds = conn.credentials
client = conn.handle
bucket_name, prefix = self._parse_s3_path(s3_path)
if self._s3_path_exists(bucket_name, prefix):
s3_resource = client.session.resource(
"s3",
region_name=client.region_name,
config=get_boto3_config(num_retries=creds.effective_num_retries),
if not self._s3_path_exists(bucket_name, prefix):
LOGGER.debug("S3 path does not exist, nothing to delete")
return
s3_resource = client.session.resource(
"s3",
region_name=client.region_name,
config=get_boto3_config(num_retries=creds.effective_num_retries),
)
s3_bucket = s3_resource.Bucket(bucket_name)
LOGGER.debug(
f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'"
)
try:
self._delete_with_app_retry(s3_bucket, bucket_name, prefix)
except Exception as e:
LOGGER.exception(
f"S3 delete operation failed after retries for: "
f"s3_path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'"
)
s3_bucket = s3_resource.Bucket(bucket_name)
LOGGER.debug(
f"Deleting table data: path='{s3_path}', bucket='{bucket_name}', prefix='{prefix}'"
raise DbtRuntimeError(
f"Failed to delete S3 path '{s3_path}', bucket='{bucket_name}', prefix='{prefix}' "
f"after {creds.effective_num_retries} retries. "
) from e

@retry(
stop=stop_after_attempt(4), # Up to 4x boto3 retries
wait=wait_exponential(multiplier=30, min=30, max=300), # 30s, 60s, 120s, 240s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these supposed to be hard coded? Shouldn't this use the s3_deletion_retry_* config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked for the possible connection configs here -
https://github.com/dbt-labs/dbt-adapters/blob/main/dbt-athena/src/dbt/adapters/athena/connections.py

I couldn’t find one specific for s3 retries. Should I add one there or are you referring to another config I have access to?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In your PR description you mentioned that you were going to add those:

Configuration options added:
s3_deletion_retry_attempts: Number of retry attempts (default: 3)
s3_deletion_retry_base_delay: Base delay in seconds (default: 1)
s3_deletion_retry_max_delay: Maximum delay in seconds (default: 30)

are you adding those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry for including that. I tried doing it but since the connection parameters were not available to the decorator I could not find a way to make it work. I've removed that from the description 👍🏽

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you nest the _delete_with_app_retry within the delete_from_s3 function it should be accessible (that's the way it's used in AthenaCursor.execute

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey just wanted to provide an update I'm trying to do this and will reach out to you over slack when I'm able to get it working with tests

retry=retry_if_exception(
lambda e: (
(
isinstance(e, ClientError)
and e.response.get("Error", {}).get("Code", "")
in ["SlowDown", "InternalError"]
)
or isinstance(e, S3DeleteRetriableException)
)
response = s3_bucket.objects.filter(Prefix=prefix).delete()
is_all_successful = True
for res in response:
if "Errors" in res:
for err in res["Errors"]:
is_all_successful = False
LOGGER.error(
"Failed to delete files: Key='{}', Code='{}', Message='{}', s3_bucket='{}'",
err["Key"],
err["Code"],
err["Message"],
bucket_name,
)
if is_all_successful is False:
raise DbtRuntimeError("Failed to delete files from S3.")
else:
LOGGER.debug("S3 path does not exist")
),
reraise=True,
before_sleep=lambda retry_state: LOGGER.warning(
f"Retrying S3 delete (attempt {retry_state.attempt_number}) due to error: "
f"{retry_state.outcome.exception() if retry_state.outcome is not None else 'Unknown error'}"
),
)
def _delete_with_app_retry(self, s3_bucket: Any, bucket_name: str, prefix: str) -> None:
"""
Deletes all objects in the S3 bucket with the given prefix.

Args:
s3_bucket: The S3 bucket resource.
bucket_name: The name of the S3 bucket.
prefix: The prefix to filter objects to delete.

Raises:
S3DeleteRetriableException: If there were retriable errors but they did not go
away after retries (e.g. SlowDown, InternalError).
DbtRuntimeError: If there were only non retryiable errors in the response
from the S3 delete operation.
"""
response = s3_bucket.objects.filter(Prefix=prefix).delete()
self._check_s3_delete_response(response, bucket_name, prefix)

def _check_s3_delete_response(self, response: Any, bucket_name: str, prefix: str) -> None:
"""
Checks the response from the S3 delete operation.

Args:
response: The response from the S3 delete operation.
bucket_name: The name of the S3 bucket.
Raises:
S3DeleteRetriableException: If retriable errors are found.
DbtRuntimeError: If non-retriable errors are found.
"""
retriable_errors = {"SlowDown", "InternalError"}
error_codes = set()
for res in response:
if "Errors" in res:
for err in res["Errors"]:
error_codes.add(err["Code"])
LOGGER.error(
"Failed to delete files: Key='{}', Code='{}', Message='{}', s3_bucket='{}'",
err["Key"],
err["Code"],
err["Message"],
bucket_name,
)
if not error_codes:
msg = "S3 delete operation completed successfully with no errors."
LOGGER.debug(msg)
return
# If there is at least one retriable error, we raise a S3DeleteRetriableException
# So that we try our best effort to delete the files.
if error_codes & retriable_errors:
raise S3DeleteRetriableException(
f"Retriable S3 delete errors: {error_codes} occurred when deleting prefix: {prefix}"
)
raise DbtRuntimeError("Failed to delete files from S3")

@staticmethod
def _parse_s3_path(s3_path: str) -> Tuple[str, str]:
Expand Down
145 changes: 144 additions & 1 deletion dbt-athena/tests/unit/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import boto3
import botocore
import pytest

from botocore.exceptions import ClientError
from dbt_common.clients import agate_helper
from dbt_common.exceptions import ConnectionError, DbtRuntimeError
from moto import mock_aws
Expand All @@ -19,7 +21,10 @@
from dbt.adapters.athena import Plugin as AthenaPlugin
from dbt.adapters.athena.column import AthenaColumn
from dbt.adapters.athena.connections import AthenaCursor, AthenaParameterFormatter
from dbt.adapters.athena.exceptions import S3LocationException
from dbt.adapters.athena.exceptions import (
S3DeleteRetriableException,
S3LocationException,
)
from dbt.adapters.athena.relation import AthenaRelation, TableType
from dbt.adapters.athena.utils import AthenaCatalogType
from dbt.adapters.contracts.connection import ConnectionState
Expand Down Expand Up @@ -1377,6 +1382,144 @@ def test_drop_glue_database(self):
self.adapter.drop_glue_database(database_name=test_input["Name"])
assert glue_client.get_databases()["DatabaseList"] == []

@pytest.mark.parametrize(
"path_exists, delete_side_effect, expected_exception",
[
# 1. Path does not exist
(False, None, None),
# 2. _delete_with_app_retry returns nothing
(True, None, None),
# 3. _delete_with_app_retry raises ClientError
(
True,
ClientError(
error_response={
"Error": {"Code": "InternalError", "Message": "Something went wrong"}
},
operation_name="DeleteObjects",
),
DbtRuntimeError,
),
# 4. _delete_with_app_retry raises DbtRuntimeError
(
True,
DbtRuntimeError("Failed to delete files from S3."),
DbtRuntimeError,
),
],
)
@mock_aws
def test_delete_from_s3(self, path_exists, delete_side_effect, expected_exception):
adapter = self.adapter
adapter.acquire_connection("dummy")
s3_path = "s3://bucket/prefix"

# Patch _parse_s3_path, _s3_path_exists, _delete_with_app_retry, and LOGGER
with (
patch.object(adapter, "_s3_path_exists", return_value=path_exists),
patch.object(adapter, "_delete_with_app_retry") as mock_delete_with_app_retry,
):
if delete_side_effect:
mock_delete_with_app_retry.side_effect = delete_side_effect

if expected_exception:
with pytest.raises(expected_exception):
adapter.delete_from_s3(s3_path)
else:
adapter.delete_from_s3(s3_path)

@pytest.mark.parametrize(
"response, expected_exception",
[
(
# Single error in response
[
{
"Errors": [
{
"Key": "some/key/file1",
"Code": "InternalError",
"Message": "An internal error occurred",
}
]
}
],
S3DeleteRetriableException,
),
(
# Multiple errors in response
[
{
"Errors": [
{
"Key": "some/key/file1",
"Code": "InternalError",
"Message": "An internal error occurred",
},
{
"Key": "some/key/file2",
"Code": "SlowDown",
"Message": "Reduce your request rate",
},
]
}
],
S3DeleteRetriableException,
),
(
# No errors in response
[{}],
None,
),
(
# Mixed: one with errors, one without
[
{},
{
"Errors": [
{
"Key": "some/key/file3",
"Code": "AccessDenied",
"Message": "Access Denied",
}
]
},
],
DbtRuntimeError,
),
(
# Mixed: one retriable error, one non-retriable error
[
{
"Errors": [
{
"Key": "some/key/file1",
"Code": "InternalError",
"Message": "An internal error occurred",
},
{
"Key": "some/key/file3",
"Code": "AccessDenied",
"Message": "Access Denied",
},
]
}
],
S3DeleteRetriableException,
),
],
)
@mock_aws
def test_check_s3_delete_response(self, response, expected_exception):
bucket_name = "my-bucket"
prefix = "test_prefix"
if expected_exception:
with pytest.raises(expected_exception):
self.adapter._check_s3_delete_response(response, bucket_name, prefix)
else:
# Should not raise if there are no errors
self.adapter._check_s3_delete_response(response, bucket_name, prefix)


class TestAthenaFilterCatalog:
def test__catalog_filter_table(self):
Expand Down
Loading