Skip to content

Commit c98b412

Browse files
authored
decorate _select_object_content with a retry (#1780)
1 parent 2a2ba0f commit c98b412

File tree

3 files changed

+46
-3
lines changed

3 files changed

+46
-3
lines changed

awswrangler/_utils.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
import random
99
import time
1010
from concurrent.futures import FIRST_COMPLETED, Future, wait
11-
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Tuple, Union, cast
11+
from functools import partial, wraps
12+
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Tuple, Type, Union, cast
1213

1314
import boto3
1415
import botocore.config
@@ -333,9 +334,42 @@ def check_duplicated_columns(df: pd.DataFrame) -> Any:
333334
)
334335

335336

337+
def retry(
338+
ex: Type[Exception],
339+
ex_code: Optional[str] = None,
340+
base: float = 1.0,
341+
max_num_tries: int = 3,
342+
) -> Callable[..., Any]:
343+
"""
344+
Decorate function with decorrelated Jitter retries.
345+
346+
Parameters
347+
----------
348+
ex : Exception
349+
Exception to retry on
350+
ex_code : Optional[str]
351+
Response error code
352+
base : float
353+
Base delay
354+
max_num_tries : int
355+
Maximum number of retries
356+
357+
Returns
358+
-------
359+
Callable[..., Any]
360+
Function
361+
"""
362+
363+
def wrapper(f: Callable[..., Any]) -> Any:
364+
return wraps(f)(partial(try_it, f, ex, ex_code=ex_code, base=base, max_num_tries=max_num_tries))
365+
366+
return wrapper
367+
368+
336369
def try_it(
337370
f: Callable[..., Any],
338371
ex: Any,
372+
*args: Any,
339373
ex_code: Optional[str] = None,
340374
base: float = 1.0,
341375
max_num_tries: int = 3,
@@ -348,7 +382,7 @@ def try_it(
348382
delay: float = base
349383
for i in range(max_num_tries):
350384
try:
351-
return f(**kwargs)
385+
return f(*args, **kwargs)
352386
except ex as exception:
353387
if ex_code is not None and hasattr(exception, "response"):
354388
if exception.response["Error"]["Code"] != ex_code:

awswrangler/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,3 +115,7 @@ class FailedQualityCheck(Exception):
115115

116116
class AlreadyExists(Exception):
117117
"""AlreadyExists."""
118+
119+
120+
class S3SelectRequestIncomplete(Exception):
121+
"""S3SelectRequestIncomplete."""

awswrangler/s3/_select.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ def _gen_scan_range(obj_size: int, scan_range_chunk_size: Optional[int] = None)
3535

3636

3737
@engine.dispatch_on_engine
38+
@_utils.retry(
39+
ex=exceptions.S3SelectRequestIncomplete,
40+
)
3841
def _select_object_content(
3942
boto3_session: Optional[boto3.Session],
4043
args: Dict[str, Any],
@@ -64,7 +67,9 @@ def _select_object_content(
6467
request_complete = True
6568
# If the End Event is not received, the results may be incomplete
6669
if not request_complete:
67-
raise Exception(f"S3 Select request for path {args['Key']} is incomplete as End Event was not received")
70+
raise exceptions.S3SelectRequestIncomplete(
71+
f"S3 Select request for path {args['Key']} is incomplete as End Event was not received"
72+
)
6873

6974
return _utils.list_to_arrow_table(mapping=payload_records)
7075

0 commit comments

Comments
 (0)