Skip to content

Commit a643b00

Browse files
committed
added multiple ways to view progress; removed unnecessary code.
1 parent 34b4d0e commit a643b00

File tree

1 file changed

+24
-30
lines changed

1 file changed

+24
-30
lines changed

ezpq/Queue.py

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(self,
9696
self._qid = str(uuid4())[:8]
9797
else:
9898
self._qid = qid
99+
self.show_progress = show_progress
99100
self._max_size = max_size
100101
self._n_submitted = 0
101102
self._n_completed = 0
@@ -141,7 +142,7 @@ def __call__(self, fun, *args, **kwargs):
141142
def wrapped_f(iterable, *args, **kwargs):
142143
for x in iterable:
143144
self.put(function=fun, args=[x]+list(args), kwargs=kwargs)
144-
self.wait()
145+
self.wait(show_progress = self.show_progress)
145146
job_data = self.collect()
146147
self.dispose()
147148
return job_data
@@ -407,7 +408,7 @@ def wait_worker(self, poll=0.1, timeout=0):
407408

408409
return not self.is_busy()
409410

410-
def wait(self, poll=0.1, timeout=0, _tqdm=None):
411+
def wait(self, poll=0.1, timeout=0, show_progress=False):
411412
"""Waits for jobs to be completed by the queue system.
412413
413414
Args:
@@ -422,18 +423,23 @@ def wait(self, poll=0.1, timeout=0, _tqdm=None):
422423
0 if the expected number of jobs completed. > 0 otherwise.
423424
"""
424425

425-
n_pending = self.size(waiting=True, working=True)
426+
n_pending = 0
426427

427-
if n_pending > 0:
428-
start = time.time()
428+
if show_progress:
429+
n_pending = self.waitpb(poll=poll, timeout=timeout)
430+
else:
431+
n_pending = self.size(waiting=True, working=True)
429432

430-
while n_pending > 0 and (timeout==0 or time.time() - start < timeout):
431-
time.sleep(poll)
432-
n_pending = self.size(waiting=True, working=True)
433+
if n_pending > 0:
434+
start = time.time()
435+
436+
while n_pending > 0 and (timeout==0 or time.time() - start < timeout):
437+
time.sleep(poll)
438+
n_pending = self.size(waiting=True, working=True)
433439

434440
return n_pending
435441

436-
def waitpb(self, poll=0.1, timeout=0, _tqdm=None):
442+
def waitpb(self, poll=0.1, timeout=0):
437443
"""Waits for jobs to be completed by the queue system.
438444
439445
Args:
@@ -451,13 +457,12 @@ def waitpb(self, poll=0.1, timeout=0, _tqdm=None):
451457
n_pending = self.size(waiting=True, working=True)
452458

453459
if n_pending > 0:
454-
if _tqdm is None:
455-
from tqdm.auto import tqdm
456-
_tqdm = tqdm
460+
461+
from tqdm.auto import tqdm
457462

458463
start = time.time()
459464

460-
with _tqdm(total=n_pending, unit='op') as pb:
465+
with tqdm(total=n_pending, unit='op') as pb:
461466
while n_pending > 0 and (timeout==0 or time.time() - start < timeout):
462467
time.sleep(poll)
463468
tmp = self.size(waiting=True, working=True)
@@ -556,7 +561,7 @@ def submit(self, job):
556561

557562
return job._id
558563

559-
def put(self, function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0):
564+
def put(self, *args, **kwargs): # function, args=None, kwargs=None, name=None, priority=100, lane=None, timeout=0):
560565
"""Creates a ezpq.Job object with the given parameters, then submits it to the ezpq.Queue system.
561566
562567
Args:
@@ -582,11 +587,11 @@ def put(self, function, args=None, kwargs=None, name=None, priority=100, lane=No
582587
The number of jobs submitted to the queue.
583588
"""
584589

585-
job = ezpq.Job(function=function, args=args, kwargs=kwargs, name=name, priority=priority, lane=lane, timeout=timeout)
590+
job = ezpq.Job(*args, **kwargs) #function=function, args=args, kwargs=kwargs, name=name, priority=priority, lane=lane, timeout=timeout)
586591

587592
return self.submit(job)
588593

589-
def map(self, function, iterable, args=None, kwargs=None, ordered=True, timeout=0, show_progress=False):
594+
def map(self, function, iterable, args=None, kwargs=None, timeout=0, show_progress=False):
590595

591596
assert hasattr(iterable, '__iter__')
592597

@@ -598,24 +603,13 @@ def map(self, function, iterable, args=None, kwargs=None, ordered=True, timeout=
598603
args = [None]
599604
else:
600605
args = [None] + list(args)
601-
602-
if ordered and self.is_started():
603-
self._stop()
604606

605-
for i,x in enumerate(iterable):
607+
for x in iterable:
606608
args[0] = x
607-
job = ezpq.Job(function=function, args=list(args), kwargs=kwargs, priority=1, timeout=timeout)
608-
if ordered: job.priority = i
609+
job = ezpq.Job(function=function, args=list(args), kwargs=kwargs, timeout=timeout)
609610
self.submit(job)
610-
611-
if ordered:
612-
self.start()
613611

614-
if show_progress is True:
615-
from tqdm.auto import tqdm
616-
self.waitpb(_tqdm = tqdm)
617-
else:
618-
self.wait()
612+
self.wait(show_progress=show_progress)
619613

620614
return self.collect()
621615

0 commit comments

Comments
 (0)