@@ -57,6 +57,10 @@ def parse_path(path):
5757 path += "/"
5858 return bucket , path
5959
60+ @staticmethod
61+ def parse_object_path (path ):
62+ return path .replace ("s3://" , "" ).split ("/" , 1 )
63+
6064 def delete_objects (self , path ):
6165 bucket , path = self .parse_path (path = path )
6266 client = self ._session .boto3_session .client (
@@ -127,11 +131,13 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
127131 proc .daemon = False
128132 proc .start ()
129133 procs .append (proc )
134+ for proc in procs :
135+ proc .join ()
130136 else :
131- self .delete_objects_batch (self . _session . primitives , bucket ,
132- batch )
133- for proc in procs :
134- proc . join ( )
137+ self .delete_objects_batch (
138+ session_primitives = self . _session . primitives ,
139+ bucket = bucket ,
140+ batch = batch )
135141
136142 def delete_not_listed_objects (self , objects_paths , procs_io_bound = None ):
137143 if not procs_io_bound :
@@ -274,3 +280,75 @@ def get_objects_sizes(self, objects_paths, procs_io_bound=None):
274280 LOGGER .debug (f"Closing proc number: { i } " )
275281 receive_pipes [i ].close ()
276282 return objects_sizes
283+
284+ def copy_listed_objects (self ,
285+ objects_paths ,
286+ source_path ,
287+ target_path ,
288+ mode = "append" ,
289+ procs_io_bound = None ):
290+ if not procs_io_bound :
291+ procs_io_bound = self ._session .procs_io_bound
292+ LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
293+ LOGGER .debug (f"len(objects_paths): { len (objects_paths )} " )
294+ if source_path [- 1 ] == "/" :
295+ source_path = source_path [:- 1 ]
296+ if target_path [- 1 ] == "/" :
297+ target_path = target_path [:- 1 ]
298+
299+ if mode == "overwrite" :
300+ LOGGER .debug (f"Deleting to overwrite: { target_path } " )
301+ self ._session .s3 .delete_objects (path = target_path )
302+ elif mode == "overwrite_partitions" :
303+ objects_wo_prefix = [
304+ o .replace (f"{ source_path } /" , "" ) for o in objects_paths
305+ ]
306+ objects_wo_filename = [
307+ f"{ o .rpartition ('/' )[0 ]} /" for o in objects_wo_prefix
308+ ]
309+ partitions_paths = list (set (objects_wo_filename ))
310+ target_partitions_paths = [
311+ f"{ target_path } /{ p } " for p in partitions_paths
312+ ]
313+ for path in target_partitions_paths :
314+ LOGGER .debug (f"Deleting to overwrite_partitions: { path } " )
315+ self ._session .s3 .delete_objects (path = path )
316+
317+ batch = []
318+ for obj in objects_paths :
319+ object_wo_prefix = obj .replace (f"{ source_path } /" , "" )
320+ target_object = f"{ target_path } /{ object_wo_prefix } "
321+ batch .append ((obj , target_object ))
322+
323+ if procs_io_bound > 1 :
324+ bounders = calculate_bounders (len (objects_paths ), procs_io_bound )
325+ LOGGER .debug (f"bounders: { bounders } " )
326+ procs = []
327+ for bounder in bounders :
328+ proc = mp .Process (
329+ target = self .copy_objects_batch ,
330+ args = (
331+ self ._session .primitives ,
332+ batch [bounder [0 ]:bounder [1 ]],
333+ ),
334+ )
335+ proc .daemon = False
336+ proc .start ()
337+ procs .append (proc )
338+ for proc in procs :
339+ proc .join ()
340+ else :
341+ self .copy_objects_batch (
342+ session_primitives = self ._session .primitives , batch = batch )
343+
344+ @staticmethod
345+ def copy_objects_batch (session_primitives , batch ):
346+ session = session_primitives .session
347+ resource = session .boto3_session .resource (
348+ service_name = "s3" , config = session .botocore_config )
349+ LOGGER .debug (f"len(batch): { len (batch )} " )
350+ for source_obj , target_obj in batch :
351+ source_bucket , source_key = S3 .parse_object_path (path = source_obj )
352+ copy_source = {"Bucket" : source_bucket , "Key" : source_key }
353+ target_bucket , target_key = S3 .parse_object_path (path = target_obj )
354+ resource .meta .client .copy (copy_source , target_bucket , target_key )
0 commit comments