Skip to content

Commit b7160f9

Browse files
committed
Improving orchestration of I/O bound processes with wait_process_release.
1 parent ae2224c commit b7160f9

File tree

2 files changed

+27
-19
lines changed

2 files changed

+27
-19
lines changed

awswrangler/s3.py

Lines changed: 7 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import s3fs
77
import tenacity
88

9-
from awswrangler.utils import calculate_bounders
9+
from awswrangler.utils import calculate_bounders, wait_process_release
1010

1111
logger = logging.getLogger(__name__)
1212

@@ -92,13 +92,8 @@ def delete_objects(self, path):
9292
proc.daemon = False
9393
proc.start()
9494
procs.append(proc)
95-
while len(procs) >= self._session.procs_io_bound:
96-
logger.debug(
97-
f"len(procs) ({len(procs)}) >= self._session.procs_io_bound ({self._session.procs_io_bound})"
98-
)
99-
procs[0].join()
100-
del procs[0]
101-
logger.debug(f"Processes deleted from list.")
95+
if len(procs) == self._session.procs_io_bound:
96+
wait_process_release(procs)
10297
else:
10398
logger.debug(f"Starting last delete call...")
10499
self.delete_objects_batch(self._session.primitives, bucket,
@@ -166,13 +161,8 @@ def delete_not_listed_objects(self, objects_paths, procs_io_bound=None):
166161
proc.daemon = False
167162
proc.start()
168163
procs.append(proc)
169-
while len(procs) >= procs_io_bound:
170-
logger.debug(
171-
f"len(procs) ({len(procs)}) >= procs_io_bound ({procs_io_bound})"
172-
)
173-
procs[0].join()
174-
del procs[0]
175-
logger.debug(f"Processes deleted from list.")
164+
if len(procs) == self._session.procs_io_bound:
165+
wait_process_release(procs)
176166
logger.debug(f"Waiting final processes...")
177167
for proc in procs:
178168
proc.join()
@@ -280,8 +270,8 @@ def get_objects_sizes(self, objects_paths, procs_io_bound=None):
280270
logger.debug(f"len(procs): {len(bounders)}")
281271
for i in range(len(procs)):
282272
logger.debug(f"Waiting pipe number: {i}")
283-
receved = receive_pipes[i].recv()
284-
objects_sizes.update(receved)
273+
received = receive_pipes[i].recv()
274+
objects_sizes.update(received)
285275
logger.debug(f"Waiting proc number: {i}")
286276
procs[i].join()
287277
logger.debug(f"Closing proc number: {i}")

awswrangler/utils.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from math import ceil
2-
from math import gcd
1+
from math import ceil, gcd
2+
from time import sleep
33
import logging
44

55
from awswrangler.exceptions import InvalidArguments
@@ -29,6 +29,24 @@ def calculate_bounders(num_items, num_groups=None, max_size=None):
2929
raise InvalidArguments("You must give num_groups or max_size!")
3030

3131

32+
def wait_process_release(processes):
33+
"""
34+
Wait one of the processes releases
35+
:param processes: List of processes
36+
:return: None
37+
"""
38+
n = len(processes)
39+
i = 0
40+
while True:
41+
if not processes[i].is_alive():
42+
del processes[i]
43+
return None
44+
i += 1
45+
if i == n:
46+
i = 0
47+
sleep(0.1)
48+
49+
3250
def lcm(a, b):
3351
"""
3452
Least Common Multiple

0 commit comments

Comments
 (0)