1
1
import multiprocessing
2
+ import random
3
+ import time
2
4
import warnings
3
5
from pathlib import Path
4
6
from queue import Empty
5
- from typing import Generator , List , Tuple , Union
7
+ from typing import Callable , Generator , List , Tuple , Union
6
8
7
9
import boto3
10
+ import botocore .exceptions
8
11
9
12
from .file import File , Status
10
13
@@ -18,10 +21,13 @@ def __init__(
18
21
aws_secret_access_key : str ,
19
22
region_name : str ,
20
23
bucket_name : str ,
21
- buffer_size : int = 1000 ,
22
- n_workers = 32 ,
23
- worker_batch_size = 100 ,
24
- callback = lambda x : x ,
24
+ buffer_size : int = 1024 ,
25
+ n_workers : int = 32 ,
26
+ worker_batch_size : int = 128 ,
27
+ n_retries : int = 3 ,
28
+ backoff_factor : float = 0.5 ,
29
+ verbose : bool = False ,
30
+ callback : Callable = lambda x : x ,
25
31
ordered : bool = False ,
26
32
):
27
33
self .paths = multiprocessing .Manager ().list (list (enumerate (paths ))[::- 1 ])
@@ -33,6 +39,9 @@ def __init__(
33
39
self .n_workers = n_workers
34
40
self .buffer_size = min (buffer_size , len (paths ))
35
41
self .worker_batch_size = worker_batch_size
42
+ self .n_retries = n_retries
43
+ self .backoff_factor = backoff_factor
44
+ self .verbose = verbose
36
45
self .ordered = ordered
37
46
self .callback = callback
38
47
@@ -58,18 +67,41 @@ def _create_s3_client(self):
58
67
def download_batch (self , batch : List [Tuple [int , Union [Path , str ]]]):
59
68
client = self ._create_s3_client ()
60
69
for index , path in batch :
61
- try :
62
- file = File (
63
- content = self .callback (
64
- client .get_object (Bucket = self .bucket_name , Key = str (path ))[
65
- "Body"
66
- ].read ()
67
- ),
68
- path = path ,
69
- status = Status .succeeded ,
70
- )
71
- except Exception as e :
72
- file = File (content = None , path = path , status = Status .failed , exception = e )
70
+ for attempt in range (self .n_retries ):
71
+ try :
72
+ file = File (
73
+ content = self .callback (
74
+ client .get_object (Bucket = self .bucket_name , Key = str (path ))[
75
+ "Body"
76
+ ].read ()
77
+ ),
78
+ path = path ,
79
+ status = Status .succeeded ,
80
+ )
81
+ break
82
+ except (
83
+ botocore .exceptions .EndpointConnectionError ,
84
+ botocore .exceptions .NoCredentialsError ,
85
+ botocore .exceptions .PartialCredentialsError ,
86
+ botocore .exceptions .SSLError ,
87
+ botocore .exceptions .ClientError ,
88
+ botocore .exceptions .BotoCoreError ,
89
+ ConnectionError ,
90
+ ) as e :
91
+ wait_time = self .backoff_factor * (2 ** attempt ) + random .uniform (
92
+ 0 , 1
93
+ )
94
+ if self .verbose :
95
+ print (
96
+ f"Retrying { path } due to: { e } . Waiting { wait_time :.2f} seconds before retrying..."
97
+ )
98
+ time .sleep (wait_time )
99
+ file = File (
100
+ content = None , path = path , status = Status .failed , exception = e
101
+ )
102
+ else :
103
+ if self .verbose :
104
+ print (f"Failed to download { path } after { self .n_retries } retries" )
73
105
if self .ordered :
74
106
self .results [index ] = file
75
107
else :
0 commit comments