Skip to content

Commit 0c0ecc9

Browse files
committed
utils/concurrent: Fix FutureSetWatcher.wait implementation
Explicitly track done callback invocations.
1 parent 73cc7ad commit 0c0ecc9

File tree

2 files changed

+49
-12
lines changed

2 files changed

+49
-12
lines changed

Orange/widgets/utils/concurrent.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
pass
2222

2323
from AnyQt.QtCore import (
24-
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable,
24+
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable, QSemaphore,
2525
QEventLoop, QCoreApplication, QEvent, Q_ARG
2626
)
2727

@@ -573,6 +573,7 @@ def __init__(self, futures=None, *args, **kwargs):
573573
# type: (List[Future], ...) -> None
574574
super().__init__(*args, **kwargs)
575575
self.__futures = None
576+
self.__semaphore = None
576577
self.__countdone = 0
577578
if futures is not None:
578579
self.setFutures(futures)
@@ -594,21 +595,29 @@ def setFutures(self, futures):
594595
selfweakref = weakref.ref(self)
595596
schedule_emit = methodinvoke(self, "__emitpending", (int, Future))
596597

598+
# Semaphore counting the number of future that have enqueued
599+
# done notifications. Used for the `wait` implementation.
600+
self.__semaphore = semaphore = QSemaphore(0)
601+
597602
for i, future in enumerate(futures):
598603
self.__futures.append(future)
599604

600605
def on_done(index, f):
601-
selfref = selfweakref() # not safe really
602-
if selfref is None:
603-
return
604606
try:
605-
schedule_emit(index, f)
606-
except RuntimeError:
607-
# Ignore RuntimeErrors (when C++ side of QObject is deleted)
608-
# (? Use QObject.destroyed and remove the done callback ?)
609-
pass
607+
selfref = selfweakref() # not safe really
608+
if selfref is None: # pragma: no cover
609+
return
610+
try:
611+
schedule_emit(index, f)
612+
except RuntimeError: # pragma: no cover
613+
# Ignore RuntimeErrors (when C++ side of QObject is deleted)
614+
# (? Use QObject.destroyed and remove the done callback ?)
615+
pass
616+
finally:
617+
semaphore.release()
610618

611619
future.add_done_callback(partial(on_done, i))
620+
612621
if not self.__futures:
613622
# `futures` was an empty sequence.
614623
methodinvoke(self, "doneAll", ())()
@@ -644,13 +653,28 @@ def __emitpending(self, index, future):
644653
def flush(self):
645654
"""
646655
Flush all pending signal emits currently enqueued.
656+
657+
Must only ever be called from the thread this object lives in
658+
(:func:`QObject.thread()`).
647659
"""
648-
assert QThread.currentThread() is self.thread()
660+
if QThread.currentThread() is not self.thread():
661+
raise RuntimeError("`flush()` called from a wrong thread.")
662+
# NOTE: QEvent.MetaCall is the event implementing the
663+
# `Qt.QueuedConnection` method invocation.
649664
QCoreApplication.sendPostedEvents(self, QEvent.MetaCall)
650665

651666
def wait(self):
652-
assert self.__futures is not None
653-
concurrent.futures.wait(self.__futures)
667+
"""
668+
Wait for for all the futures to complete and *enqueue* notifications
669+
to this object, but do not emit any signals.
670+
671+
Use `flush()` to emit all signals after a `wait()`
672+
"""
673+
if self.__futures is None:
674+
raise RuntimeError("Futures were not set.")
675+
676+
self.__semaphore.acquire(len(self.__futures))
677+
self.__semaphore.release(len(self.__futures))
654678

655679

656680
class methodinvoke(object):

Orange/widgets/utils/tests/test_concurrent.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import unittest
2+
import unittest.mock
23
import threading
34
import random
45

@@ -226,6 +227,18 @@ def as_set(seq):
226227
spy = spies(watcher)
227228
self.assertTrue(spy.doneAll.wait())
228229

230+
watcher = FutureSetWatcher()
231+
watcher.setFutures([])
232+
watcher.wait()
233+
234+
watcher = FutureSetWatcher()
235+
with self.assertRaises(RuntimeError):
236+
watcher.wait()
237+
238+
with unittest.mock.patch.object(watcher, "thread", lambda: 42), \
239+
self.assertRaises(RuntimeError):
240+
watcher.flush()
241+
229242

230243
class TestTask(CoreAppTestCase):
231244

0 commit comments

Comments
 (0)