@@ -63,12 +63,14 @@ def delete_objects(self, path):
6363 service_name = "s3" , config = self ._session .botocore_config )
6464 procs = []
6565 args = {"Bucket" : bucket , "MaxKeys" : 1000 , "Prefix" : path }
66+ LOGGER .debug (f"Arguments: \n { args } " )
6667 next_continuation_token = True
6768 while next_continuation_token :
6869 res = client .list_objects_v2 (** args )
6970 if not res .get ("Contents" ):
7071 break
7172 keys = [{"Key" : x .get ("Key" )} for x in res .get ("Contents" )]
73+ LOGGER .debug (f"Number of listed keys: { len (keys )} " )
7274 next_continuation_token = res .get ("NextContinuationToken" )
7375 if next_continuation_token :
7476 args ["ContinuationToken" ] = next_continuation_token
@@ -79,15 +81,25 @@ def delete_objects(self, path):
7981 proc .daemon = False
8082 proc .start ()
8183 procs .append (proc )
84+ while len (procs ) >= self ._session .procs_io_bound :
85+ LOGGER .debug (
86+ f"len(procs) ({ len (procs )} ) >= self._session.procs_io_bound ({ self ._session .procs_io_bound } )"
87+ )
88+ procs [0 ].join ()
89+ del procs [0 ]
90+ LOGGER .debug (f"Processes deleted from list." )
8291 else :
92+ LOGGER .debug (f"Starting last delete call..." )
8393 self .delete_objects_batch (self ._session .primitives , bucket ,
8494 keys )
95+ LOGGER .debug (f"Waiting final processes..." )
8596 for proc in procs :
8697 proc .join ()
8798
8899 def delete_listed_objects (self , objects_paths , procs_io_bound = None ):
89100 if not procs_io_bound :
90101 procs_io_bound = self ._session .procs_io_bound
102+ LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
91103 buckets = {}
92104 for path in objects_paths :
93105 path_cleaned = path .replace ("s3://" , "" )
@@ -98,8 +110,11 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
98110
99111 for bucket , batch in buckets .items ():
100112 procs = []
113+ LOGGER .debug (f"bucket: { bucket } " )
101114 if procs_io_bound > 1 :
115+ LOGGER .debug (f"len(batch): { len (batch )} " )
102116 bounders = calculate_bounders (len (batch ), procs_io_bound )
117+ LOGGER .debug (f"bounders: { bounders } " )
103118 for bounder in bounders :
104119 proc = mp .Process (
105120 target = self .delete_objects_batch ,
@@ -118,7 +133,11 @@ def delete_listed_objects(self, objects_paths, procs_io_bound=None):
118133 for proc in procs :
119134 proc .join ()
120135
121- def delete_not_listed_objects (self , objects_paths ):
136+ def delete_not_listed_objects (self , objects_paths , procs_io_bound = None ):
137+ if not procs_io_bound :
138+ procs_io_bound = self ._session .procs_io_bound
139+ LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
140+
122141 partitions = {}
123142 for object_path in objects_paths :
124143 partition_path = f"{ object_path .rsplit ('/' , 1 )[0 ]} /"
@@ -129,20 +148,35 @@ def delete_not_listed_objects(self, objects_paths):
129148 for partition_path , batch in partitions .items ():
130149 proc = mp .Process (
131150 target = self .delete_not_listed_batch ,
132- args = (self ._session .primitives , partition_path , batch ),
151+ args = (self ._session .primitives , partition_path , batch , 1 ),
133152 )
134153 proc .daemon = False
135154 proc .start ()
136155 procs .append (proc )
156+ while len (procs ) >= procs_io_bound :
157+ LOGGER .debug (
158+ f"len(procs) ({ len (procs )} ) >= procs_io_bound ({ procs_io_bound } )"
159+ )
160+ procs [0 ].join ()
161+ del procs [0 ]
162+ LOGGER .debug (f"Processes deleted from list." )
163+ LOGGER .debug (f"Waiting final processes..." )
137164 for proc in procs :
138165 proc .join ()
139166
140167 @staticmethod
141- def delete_not_listed_batch (session_primitives , partition_path , batch ):
168+ def delete_not_listed_batch (session_primitives ,
169+ partition_path ,
170+ batch ,
171+ procs_io_bound = None ):
142172 session = session_primitives .session
173+ if not procs_io_bound :
174+ procs_io_bound = session .procs_io_bound
175+ LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
143176 keys = session .s3 .list_objects (path = partition_path )
144177 dead_keys = [key for key in keys if key not in batch ]
145- session .s3 .delete_listed_objects (objects_paths = dead_keys )
178+ session .s3 .delete_listed_objects (objects_paths = dead_keys ,
179+ procs_io_bound = 1 )
146180
147181 @staticmethod
148182 def delete_objects_batch (session_primitives , bucket , batch ):
@@ -151,6 +185,7 @@ def delete_objects_batch(session_primitives, bucket, batch):
151185 config = session .botocore_config )
152186 num_requests = int (ceil ((float (len (batch )) / 1000.0 )))
153187 bounders = calculate_bounders (len (batch ), num_requests )
188+ LOGGER .debug (f"Bounders: { bounders } " )
154189 for bounder in bounders :
155190 client .delete_objects (
156191 Bucket = bucket ,
@@ -193,25 +228,30 @@ def _get_objects_head_remote(send_pipe, session_primitives, objects_paths):
193228 client = session .boto3_session .client (service_name = "s3" ,
194229 config = session .botocore_config )
195230 objects_sizes = {}
231+ LOGGER .debug (f"len(objects_paths): { len (objects_paths )} " )
196232 for object_path in objects_paths :
197233 bucket , key = object_path .replace ("s3://" , "" ).split ("/" , 1 )
198234 res = S3 ._head_object_with_retry (client = client ,
199235 bucket = bucket ,
200236 key = key )
201237 size = res .get ("ContentLength" )
202238 objects_sizes [object_path ] = size
239+ LOGGER .debug (f"len(objects_sizes): { len (objects_sizes )} " )
203240 send_pipe .send (objects_sizes )
204241 send_pipe .close ()
205242
206243 def get_objects_sizes (self , objects_paths , procs_io_bound = None ):
207244 if not procs_io_bound :
208245 procs_io_bound = self ._session .procs_io_bound
246+ LOGGER .debug (f"procs_io_bound: { procs_io_bound } " )
209247 objects_sizes = {}
210248 procs = []
211249 receive_pipes = []
212250 bounders = calculate_bounders (len (objects_paths ), procs_io_bound )
251+ LOGGER .debug (f"len(bounders): { len (bounders )} " )
213252 for bounder in bounders :
214253 receive_pipe , send_pipe = mp .Pipe ()
254+ LOGGER .debug (f"bounder: { bounder } " )
215255 proc = mp .Process (
216256 target = self ._get_objects_head_remote ,
217257 args = (
@@ -224,8 +264,13 @@ def get_objects_sizes(self, objects_paths, procs_io_bound=None):
224264 proc .start ()
225265 procs .append (proc )
226266 receive_pipes .append (receive_pipe )
267+ LOGGER .debug (f"len(procs): { len (bounders )} " )
227268 for i in range (len (procs )):
228- objects_sizes .update (receive_pipes [i ].recv ())
269+ LOGGER .debug (f"Waiting pipe number: { i } " )
270+ receved = receive_pipes [i ].recv ()
271+ objects_sizes .update (receved )
272+ LOGGER .debug (f"Waiting proc number: { i } " )
229273 procs [i ].join ()
274+ LOGGER .debug (f"Closing proc number: { i } " )
230275 receive_pipes [i ].close ()
231276 return objects_sizes
0 commit comments