Skip to content

Commit a6aafd6

Browse files
authored
Merge pull request #2834 from ales-erjavec/fixes/future-set-watcher-empty
[FIX] utils/concurrent: Handle an empty futures list
2 parents 67fa9df + 0c0ecc9 commit a6aafd6

File tree

2 files changed

+81
-11
lines changed

2 files changed

+81
-11
lines changed

Orange/widgets/utils/concurrent.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
pass
2222

2323
from AnyQt.QtCore import (
24-
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable,
24+
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable, QSemaphore,
2525
QEventLoop, QCoreApplication, QEvent, Q_ARG
2626
)
2727

@@ -570,10 +570,12 @@ class FutureSetWatcher(QObject):
570570
doneAll = Signal()
571571

572572
def __init__(self, futures=None, *args, **kwargs):
573+
# type: (List[Future], ...) -> None
573574
super().__init__(*args, **kwargs)
574575
self.__futures = None
576+
self.__semaphore = None
575577
self.__countdone = 0
576-
if futures:
578+
if futures is not None:
577579
self.setFutures(futures)
578580

579581
def setFutures(self, futures):
@@ -593,22 +595,33 @@ def setFutures(self, futures):
593595
selfweakref = weakref.ref(self)
594596
schedule_emit = methodinvoke(self, "__emitpending", (int, Future))
595597

598+
# Semaphore counting the number of future that have enqueued
599+
# done notifications. Used for the `wait` implementation.
600+
self.__semaphore = semaphore = QSemaphore(0)
601+
596602
for i, future in enumerate(futures):
597603
self.__futures.append(future)
598604

599605
def on_done(index, f):
600-
selfref = selfweakref() # not safe really
601-
if selfref is None:
602-
return
603606
try:
604-
schedule_emit(index, f)
605-
except RuntimeError:
606-
# Ignore RuntimeErrors (when C++ side of QObject is deleted)
607-
# (? Use QObject.destroyed and remove the done callback ?)
608-
pass
607+
selfref = selfweakref() # not safe really
608+
if selfref is None: # pragma: no cover
609+
return
610+
try:
611+
schedule_emit(index, f)
612+
except RuntimeError: # pragma: no cover
613+
# Ignore RuntimeErrors (when C++ side of QObject is deleted)
614+
# (? Use QObject.destroyed and remove the done callback ?)
615+
pass
616+
finally:
617+
semaphore.release()
609618

610619
future.add_done_callback(partial(on_done, i))
611620

621+
if not self.__futures:
622+
# `futures` was an empty sequence.
623+
methodinvoke(self, "doneAll", ())()
624+
612625
@Slot(int, Future)
613626
def __emitpending(self, index, future):
614627
# type: (int, Future) -> None
@@ -640,10 +653,29 @@ def __emitpending(self, index, future):
640653
def flush(self):
641654
"""
642655
Flush all pending signal emits currently enqueued.
656+
657+
Must only ever be called from the thread this object lives in
658+
(:func:`QObject.thread()`).
643659
"""
644-
assert QThread.currentThread() is self.thread()
660+
if QThread.currentThread() is not self.thread():
661+
raise RuntimeError("`flush()` called from a wrong thread.")
662+
# NOTE: QEvent.MetaCall is the event implementing the
663+
# `Qt.QueuedConnection` method invocation.
645664
QCoreApplication.sendPostedEvents(self, QEvent.MetaCall)
646665

666+
def wait(self):
667+
"""
668+
Wait for for all the futures to complete and *enqueue* notifications
669+
to this object, but do not emit any signals.
670+
671+
Use `flush()` to emit all signals after a `wait()`
672+
"""
673+
if self.__futures is None:
674+
raise RuntimeError("Futures were not set.")
675+
676+
self.__semaphore.acquire(len(self.__futures))
677+
self.__semaphore.release(len(self.__futures))
678+
647679

648680
class methodinvoke(object):
649681
"""

Orange/widgets/utils/tests/test_concurrent.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
import unittest.mock
23
import threading
34
import random
45

@@ -201,6 +202,43 @@ def as_set(seq):
201202
self.assertSetEqual(as_set(spy.resultAt), {(0, True)})
202203
self.assertSetEqual(as_set(spy.exceptionAt), set())
203204

205+
# doneAll must always be emitted after the doneAt signals.
206+
executor = ThreadPoolExecutor(max_workers=2)
207+
futures = [executor.submit(pow, 1000, 1000) for _ in range(100)]
208+
watcher = FutureSetWatcher(futures)
209+
emithistory = []
210+
watcher.doneAt.connect(lambda i, f: emithistory.append(("doneAt", i, f)))
211+
watcher.doneAll.connect(lambda: emithistory.append(("doneAll", )))
212+
213+
spy = spies(watcher)
214+
watcher.wait()
215+
self.assertEqual(len(spy.doneAll), 0)
216+
self.assertEqual(len(spy.doneAt), 0)
217+
watcher.flush()
218+
self.assertEqual(len(spy.doneAt), 100)
219+
self.assertEqual(list(spy.doneAll), [[]])
220+
self.assertSetEqual(set(emithistory[:-1]),
221+
{("doneAt", i, f) for i, f in enumerate(futures)})
222+
self.assertEqual(emithistory[-1], ("doneAll",))
223+
224+
# doneAll must be emitted even when on an empty futures list
225+
watcher = FutureSetWatcher()
226+
watcher.setFutures([])
227+
spy = spies(watcher)
228+
self.assertTrue(spy.doneAll.wait())
229+
230+
watcher = FutureSetWatcher()
231+
watcher.setFutures([])
232+
watcher.wait()
233+
234+
watcher = FutureSetWatcher()
235+
with self.assertRaises(RuntimeError):
236+
watcher.wait()
237+
238+
with unittest.mock.patch.object(watcher, "thread", lambda: 42), \
239+
self.assertRaises(RuntimeError):
240+
watcher.flush()
241+
204242

205243
class TestTask(CoreAppTestCase):
206244

0 commit comments

Comments
 (0)