Skip to content

Commit 4e35a87

Browse files
committed
Add retry mechanism for InternalError on s3 object deletion.
1 parent 5b8657a commit 4e35a87

File tree

2 files changed

+72
-9
lines changed

2 files changed

+72
-9
lines changed

awswrangler/s3/_delete.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
import datetime
55
import itertools
66
import logging
7-
from typing import Dict, List, Optional, Union
7+
import time
8+
from typing import Any, Dict, List, Optional, Union
9+
from urllib.parse import unquote_plus as _unquote_plus
810

911
import boto3 # type: ignore
1012

@@ -26,17 +28,24 @@ def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]:
2628
return buckets
2729

2830

29-
def _delete_objects(bucket: str, keys: List[str], client_s3: boto3.client) -> None:
31+
def _delete_objects(bucket: str, keys: List[str], client_s3: boto3.client, attempt: int = 1) -> None:
3032
_logger.debug("len(keys): %s", len(keys))
3133
batch: List[Dict[str, str]] = [{"Key": key} for key in keys]
3234
res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch})
33-
deleted = res.get("Deleted")
34-
if deleted is not None:
35-
for i in deleted:
36-
_logger.debug("s3://%s/%s has been deleted.", bucket, i.get("Key"))
37-
errors = res.get("Errors")
38-
if errors is not None: # pragma: no cover
39-
raise exceptions.ServiceApiError(errors)
35+
deleted: List[Dict[str, Any]] = res.get("Deleted", [])
36+
for obj in deleted:
37+
_logger.debug("s3://%s/%s has been deleted.", bucket, obj.get("Key"))
38+
errors: List[Dict[str, Any]] = res.get("Errors", [])
39+
internal_errors: List[str] = []
40+
for error in errors:
41+
if error["Code"] != "InternalError":
42+
raise exceptions.ServiceApiError(errors)
43+
internal_errors.append(_unquote_plus(error["Key"]))
44+
if len(internal_errors) > 0:
45+
if attempt > 5: # Maximum of 5 attempts
46+
raise exceptions.ServiceApiError(errors)
47+
time.sleep(attempt) # Incremental delay (linear)
48+
_delete_objects(bucket=bucket, keys=internal_errors, client_s3=client_s3, attempt=(attempt + 1))
4049

4150

4251
def delete_objects(

tests/test_s3.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import logging
2+
import time
3+
from unittest.mock import patch
4+
5+
import botocore
6+
import pytest
7+
8+
import awswrangler as wr
9+
10+
from ._utils import extract_cloudformation_outputs
11+
12+
API_CALL = botocore.client.BaseClient._make_api_call
13+
14+
logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s")
15+
logging.getLogger("awswrangler").setLevel(logging.DEBUG)
16+
logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL)
17+
18+
19+
@pytest.fixture(scope="module")
20+
def cloudformation_outputs():
21+
yield extract_cloudformation_outputs()
22+
23+
24+
@pytest.fixture(scope="module")
25+
def bucket(cloudformation_outputs):
26+
if "BucketName" in cloudformation_outputs:
27+
bucket = cloudformation_outputs["BucketName"]
28+
else:
29+
raise Exception("You must deploy/update the test infrastructure (CloudFormation)")
30+
yield bucket
31+
32+
33+
def test_delete_internal_error(bucket):
34+
response = {
35+
"Errors": [
36+
{
37+
"Key": "foo/dt=2020-01-01 00%3A00%3A00/boo.txt",
38+
"Code": "InternalError",
39+
"Message": "We encountered an internal error. Please try again.",
40+
}
41+
]
42+
}
43+
44+
def mock_make_api_call(self, operation_name, kwarg):
45+
if operation_name == "DeleteObjects":
46+
return response
47+
return API_CALL(self, operation_name, kwarg)
48+
49+
start = time.time()
50+
with patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call):
51+
path = f"s3://{bucket}/foo/dt=2020-01-01 00:00:00/boo.txt"
52+
with pytest.raises(wr.exceptions.ServiceApiError):
53+
wr.s3.delete_objects(path=[path])
54+
assert 15 <= (time.time() - start) <= 20

0 commit comments

Comments
 (0)