Skip to content

Commit bae5b96

Browse files
Merge pull request #61 from jeromekelleher/progress-fixes
Fix progress deadlocks
2 parents 880c3af + 682536a commit bae5b96

File tree

1 file changed

+45
-42
lines changed

1 file changed

+45
-42
lines changed

bio2zarr/core.py

Lines changed: 45 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import multiprocessing
55
import threading
66
import logging
7-
import functools
87
import time
98

109
import zarr
@@ -159,32 +158,6 @@ def set_progress(value):
159158
_progress_counter.value = value
160159

161160

162-
def progress_thread_worker(config):
163-
pbar = tqdm.tqdm(
164-
total=config.total,
165-
desc=f"{config.title:>7}",
166-
unit_scale=True,
167-
unit=config.units,
168-
smoothing=0.1,
169-
disable=not config.show,
170-
)
171-
172-
while (current := get_progress()) < config.total:
173-
inc = current - pbar.n
174-
pbar.update(inc)
175-
time.sleep(config.poll_interval)
176-
# TODO figure out why we're sometimes going over total
177-
# if get_progress() != config.total:
178-
# print("HOW DID THIS HAPPEN!!")
179-
# print(get_progress())
180-
# print(config)
181-
# assert get_progress() == config.total
182-
inc = config.total - pbar.n
183-
pbar.update(inc)
184-
pbar.close()
185-
# print("EXITING PROGRESS THREAD")
186-
187-
188161
class ParallelWorkManager(contextlib.AbstractContextManager):
189162
def __init__(self, worker_processes=1, progress_config=None):
190163
if worker_processes <= 0:
@@ -194,18 +167,42 @@ def __init__(self, worker_processes=1, progress_config=None):
194167
self.executor = cf.ProcessPoolExecutor(
195168
max_workers=worker_processes,
196169
)
170+
self.futures = []
171+
197172
set_progress(0)
198173
if progress_config is None:
199174
progress_config = ProgressConfig()
200-
self.bar_thread = threading.Thread(
201-
target=progress_thread_worker,
202-
args=(progress_config,),
203-
name="progress",
204-
daemon=True,
205-
)
206-
self.bar_thread.start()
207175
self.progress_config = progress_config
208-
self.futures = []
176+
self.progress_bar = tqdm.tqdm(
177+
total=progress_config.total,
178+
desc=f"{progress_config.title:>7}",
179+
unit_scale=True,
180+
unit=progress_config.units,
181+
smoothing=0.1,
182+
disable=not progress_config.show,
183+
)
184+
self.completed = False
185+
self.completed_lock = threading.Lock()
186+
self.progress_thread = threading.Thread(
187+
target=self._update_progress_worker,
188+
name="progress-update",
189+
)
190+
self.progress_thread.start()
191+
192+
def _update_progress(self):
193+
current = get_progress()
194+
inc = current - self.progress_bar.n
195+
# print("UPDATE PROGRESS: current = ", current, self.progress_config.total, inc)
196+
self.progress_bar.update(inc)
197+
198+
def _update_progress_worker(self):
199+
completed = False
200+
while not completed:
201+
self._update_progress()
202+
time.sleep(self.progress_config.poll_interval)
203+
with self.completed_lock:
204+
completed = self.completed
205+
logger.debug("Exit progress thread")
209206

210207
def submit(self, *args, **kwargs):
211208
self.futures.append(self.executor.submit(*args, **kwargs))
@@ -217,13 +214,19 @@ def results_as_completed(self):
217214
def __exit__(self, exc_type, exc_val, exc_tb):
218215
if exc_type is None:
219216
wait_on_futures(self.futures)
220-
# Note: this doesn't seem to be working correctly. If
221-
# we set a timeout of None we get deadlocks
222-
set_progress(self.progress_config.total)
223-
timeout = 0.1
224217
else:
225218
cancel_futures(self.futures)
226-
timeout = 0
227-
self.bar_thread.join(timeout)
228-
self.executor.shutdown()
219+
# There's probably a much cleaner way of doing this with a Condition
220+
# or something, but this seems to work OK for now. This setup might
221+
# make small conversions a bit laggy as we wait on the sleep interval
222+
# though.
223+
with self.completed_lock:
224+
self.completed = True
225+
self.executor.shutdown(wait=False)
226+
# FIXME there's currently some thing weird happening at the end of
227+
# Encode 1D for 1kg-p3. The progress bar disappears, like we're
228+
# setting a total of zero or something.
229+
self.progress_thread.join()
230+
self._update_progress()
231+
self.progress_bar.close()
229232
return False

0 commit comments

Comments
 (0)