88
99from awswrangler .utils import calculate_bounders
1010
11- LOGGER = logging .getLogger (__name__ )
11+ logger = logging .getLogger (__name__ )
1212
1313
1414def mkdir_if_not_exists (fs , path ):
@@ -67,14 +67,14 @@ def delete_objects(self, path):
6767 service_name = "s3" , config = self ._session .botocore_config )
6868 procs = []
6969 args = {"Bucket" : bucket , "MaxKeys" : 1000 , "Prefix" : path }
70- LOGGER .debug (f"Arguments: \n { args } " )
70+ logger .debug (f"Arguments: \n { args } " )
7171 next_continuation_token = True
7272 while next_continuation_token :
7373 res = client .list_objects_v2 (** args )
7474 if not res .get ("Contents" ):
7575 break
7676 keys = [{"Key" : x .get ("Key" )} for x in res .get ("Contents" )]
77- LOGGER .debug (f"Number of listed keys: { len (keys )} " )
77+ logger .debug (f"Number of listed keys: { len (keys )} " )
7878 next_continuation_token = res .get ("NextContinuationToken" )
7979 if next_continuation_token :
8080 args ["ContinuationToken" ] = next_continuation_token
@@ -86,24 +86,24 @@ def delete_objects(self, path):
8686 proc .start ()
8787 procs .append (proc )
8888 while len (procs ) >= self ._session .procs_io_bound :
89- LOGGER .debug (
89+ logger .debug (
9090 f"len(procs) ({ len (procs )} ) >= self._session.procs_io_bound ({ self ._session .procs_io_bound } )"
9191 )
9292 procs [0 ].join ()
9393 del procs [0 ]
94- LOGGER .debug (f"Processes deleted from list." )
94+ logger .debug (f"Processes deleted from list." )
9595 else :
96- LOGGER .debug (f"Starting last delete call..." )
96+ logger .debug (f"Starting last delete call..." )
9797 self .delete_objects_batch (self ._session .primitives , bucket ,
9898 keys )
99- LOGGER .debug (f"Waiting final processes..." )
99+ logger .debug (f"Waiting final processes..." )
100100 for proc in procs :
101101 proc .join ()
102102
103103 def delete_listed_objects (self , objects_paths , procs_io_bound = None ):
104104 if not procs_io_bound :
105105 procs_io_bound = self ._session .procs_io_bound
106- LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
106+ logger .debug (f"procs_io_bound: { procs_io_bound } " )
107107 buckets = {}
108108 for path in objects_paths :
109109 path_cleaned = path .replace ("s3://" , "" )
@@ -114,11 +114,11 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
114114
115115 for bucket , batch in buckets .items ():
116116 procs = []
117- LOGGER .debug (f"bucket: { bucket } " )
117+ logger .debug (f"bucket: { bucket } " )
118118 if procs_io_bound > 1 :
119- LOGGER .debug (f"len(batch): { len (batch )} " )
119+ logger .debug (f"len(batch): { len (batch )} " )
120120 bounders = calculate_bounders (len (batch ), procs_io_bound )
121- LOGGER .debug (f"bounders: { bounders } " )
121+ logger .debug (f"bounders: { bounders } " )
122122 for bounder in bounders :
123123 proc = mp .Process (
124124 target = self .delete_objects_batch ,
@@ -142,7 +142,7 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
142142 def delete_not_listed_objects (self , objects_paths , procs_io_bound = None ):
143143 if not procs_io_bound :
144144 procs_io_bound = self ._session .procs_io_bound
145- LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
145+ logger .debug (f"procs_io_bound: { procs_io_bound } " )
146146
147147 partitions = {}
148148 for object_path in objects_paths :
@@ -160,13 +160,13 @@ def delete_not_listed_objects(self, objects_paths, procs_io_bound=None):
160160 proc .start ()
161161 procs .append (proc )
162162 while len (procs ) >= procs_io_bound :
163- LOGGER .debug (
163+ logger .debug (
164164 f"len(procs) ({ len (procs )} ) >= procs_io_bound ({ procs_io_bound } )"
165165 )
166166 procs [0 ].join ()
167167 del procs [0 ]
168- LOGGER .debug (f"Processes deleted from list." )
169- LOGGER .debug (f"Waiting final processes..." )
168+ logger .debug (f"Processes deleted from list." )
169+ logger .debug (f"Waiting final processes..." )
170170 for proc in procs :
171171 proc .join ()
172172
@@ -178,7 +178,7 @@ def delete_not_listed_batch(session_primitives,
178178 session = session_primitives .session
179179 if not procs_io_bound :
180180 procs_io_bound = session .procs_io_bound
181- LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
181+ logger .debug (f"procs_io_bound: { procs_io_bound } " )
182182 keys = session .s3 .list_objects (path = partition_path )
183183 dead_keys = [key for key in keys if key not in batch ]
184184 session .s3 .delete_listed_objects (objects_paths = dead_keys ,
@@ -191,7 +191,7 @@ def delete_objects_batch(session_primitives, bucket, batch):
191191 config = session .botocore_config )
192192 num_requests = int (ceil ((float (len (batch )) / 1000.0 )))
193193 bounders = calculate_bounders (len (batch ), num_requests )
194- LOGGER .debug (f"Bounders: { bounders } " )
194+ logger .debug (f"Bounders: { bounders } " )
195195 for bounder in bounders :
196196 client .delete_objects (
197197 Bucket = bucket ,
@@ -234,30 +234,30 @@ def _get_objects_head_remote(send_pipe, session_primitives, objects_paths):
234234 client = session .boto3_session .client (service_name = "s3" ,
235235 config = session .botocore_config )
236236 objects_sizes = {}
237- LOGGER .debug (f"len(objects_paths): { len (objects_paths )} " )
237+ logger .debug (f"len(objects_paths): { len (objects_paths )} " )
238238 for object_path in objects_paths :
239239 bucket , key = object_path .replace ("s3://" , "" ).split ("/" , 1 )
240240 res = S3 ._head_object_with_retry (client = client ,
241241 bucket = bucket ,
242242 key = key )
243243 size = res .get ("ContentLength" )
244244 objects_sizes [object_path ] = size
245- LOGGER .debug (f"len(objects_sizes): { len (objects_sizes )} " )
245+ logger .debug (f"len(objects_sizes): { len (objects_sizes )} " )
246246 send_pipe .send (objects_sizes )
247247 send_pipe .close ()
248248
249249 def get_objects_sizes (self , objects_paths , procs_io_bound = None ):
250250 if not procs_io_bound :
251251 procs_io_bound = self ._session .procs_io_bound
252- LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
252+ logger .debug (f"procs_io_bound: { procs_io_bound } " )
253253 objects_sizes = {}
254254 procs = []
255255 receive_pipes = []
256256 bounders = calculate_bounders (len (objects_paths ), procs_io_bound )
257- LOGGER .debug (f"len(bounders): { len (bounders )} " )
257+ logger .debug (f"len(bounders): { len (bounders )} " )
258258 for bounder in bounders :
259259 receive_pipe , send_pipe = mp .Pipe ()
260- LOGGER .debug (f"bounder: { bounder } " )
260+ logger .debug (f"bounder: { bounder } " )
261261 proc = mp .Process (
262262 target = self ._get_objects_head_remote ,
263263 args = (
@@ -270,14 +270,14 @@ def get_objects_sizes(self, objects_paths, procs_io_bound=None):
270270 proc .start ()
271271 procs .append (proc )
272272 receive_pipes .append (receive_pipe )
273- LOGGER .debug (f"len(procs): { len (bounders )} " )
273+ logger .debug (f"len(procs): { len (bounders )} " )
274274 for i in range (len (procs )):
275- LOGGER .debug (f"Waiting pipe number: { i } " )
275+ logger .debug (f"Waiting pipe number: { i } " )
276276 receved = receive_pipes [i ].recv ()
277277 objects_sizes .update (receved )
278- LOGGER .debug (f"Waiting proc number: { i } " )
278+ logger .debug (f"Waiting proc number: { i } " )
279279 procs [i ].join ()
280- LOGGER .debug (f"Closing proc number: { i } " )
280+ logger .debug (f"Closing proc number: { i } " )
281281 receive_pipes [i ].close ()
282282 return objects_sizes
283283
@@ -289,15 +289,15 @@ def copy_listed_objects(self,
289289 procs_io_bound = None ):
290290 if not procs_io_bound :
291291 procs_io_bound = self ._session .procs_io_bound
292- LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
293- LOGGER .debug (f"len(objects_paths): { len (objects_paths )} " )
292+ logger .debug (f"procs_io_bound: { procs_io_bound } " )
293+ logger .debug (f"len(objects_paths): { len (objects_paths )} " )
294294 if source_path [- 1 ] == "/" :
295295 source_path = source_path [:- 1 ]
296296 if target_path [- 1 ] == "/" :
297297 target_path = target_path [:- 1 ]
298298
299299 if mode == "overwrite" :
300- LOGGER .debug (f"Deleting to overwrite: { target_path } " )
300+ logger .debug (f"Deleting to overwrite: { target_path } " )
301301 self ._session .s3 .delete_objects (path = target_path )
302302 elif mode == "overwrite_partitions" :
303303 objects_wo_prefix = [
@@ -311,7 +311,7 @@ def copy_listed_objects(self,
311311 f"{ target_path } /{ p } " for p in partitions_paths
312312 ]
313313 for path in target_partitions_paths :
314- LOGGER .debug (f"Deleting to overwrite_partitions: { path } " )
314+ logger .debug (f"Deleting to overwrite_partitions: { path } " )
315315 self ._session .s3 .delete_objects (path = path )
316316
317317 batch = []
@@ -322,7 +322,7 @@ def copy_listed_objects(self,
322322
323323 if procs_io_bound > 1 :
324324 bounders = calculate_bounders (len (objects_paths ), procs_io_bound )
325- LOGGER .debug (f"bounders: { bounders } " )
325+ logger .debug (f"bounders: { bounders } " )
326326 procs = []
327327 for bounder in bounders :
328328 proc = mp .Process (
@@ -346,7 +346,7 @@ def copy_objects_batch(session_primitives, batch):
346346 session = session_primitives .session
347347 resource = session .boto3_session .resource (
348348 service_name = "s3" , config = session .botocore_config )
349- LOGGER .debug (f"len(batch): { len (batch )} " )
349+ logger .debug (f"len(batch): { len (batch )} " )
350350 for source_obj , target_obj in batch :
351351 source_bucket , source_key = S3 .parse_object_path (path = source_obj )
352352 copy_source = {"Bucket" : source_bucket , "Key" : source_key }
0 commit comments