Skip to content

Commit 8901f54

Browse files
committed
FreeViz: Offload work to a separate thread
1 parent abdb14b commit 8901f54

File tree

4 files changed

+350
-190
lines changed

4 files changed

+350
-190
lines changed

Orange/widgets/tests/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,12 +1013,14 @@ def test_invalidated_embedding(self, timeout=DEFAULT_TIMEOUT):
10131013
self.widget.graph.update_coordinates.assert_not_called()
10141014
self.widget.graph.update_point_props.assert_called_once()
10151015

1016-
def test_saved_selection(self):
1016+
def test_saved_selection(self, timeout=DEFAULT_TIMEOUT):
10171017
self.send_signal(self.widget.Inputs.data, self.data)
1018+
self.wait_until_stop_blocking()
10181019
self.widget.graph.select_by_indices(list(range(0, len(self.data), 10)))
10191020
settings = self.widget.settingsHandler.pack_data(self.widget)
10201021
w = self.create_widget(self.widget.__class__, stored_settings=settings)
10211022
self.send_signal(self.widget.Inputs.data, self.data, widget=w)
1023+
self.wait_until_stop_blocking(widget=w)
10221024
self.assertEqual(np.sum(w.graph.selection), 15)
10231025
np.testing.assert_equal(self.widget.graph.selection, w.graph.selection)
10241026

Orange/widgets/utils/concurrent.py

Lines changed: 177 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
# TODO: Rename the module to something that does not conflict with stdlib
55
# concurrent
6+
from typing import Tuple, Callable, Any
67
import os
78
import threading
89
import atexit
@@ -11,18 +12,15 @@
1112
import weakref
1213
from functools import partial
1314
import concurrent.futures
14-
1515
from concurrent.futures import Future, TimeoutError
1616
from contextlib import contextmanager
17-
from typing import Callable, Optional, Any, List
1817

1918
from AnyQt.QtCore import (
2019
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable, QSemaphore,
21-
QEventLoop, QCoreApplication, QEvent, Q_ARG
20+
QEventLoop, QCoreApplication, QEvent, Q_ARG,
21+
pyqtSignal as Signal, pyqtSlot as Slot
2222
)
2323

24-
from AnyQt.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
25-
2624
_log = logging.getLogger(__name__)
2725

2826

@@ -784,3 +782,177 @@ def __call__(self, *args):
784782
args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
785783
return QMetaObject.invokeMethod(
786784
self.obj, self.method, self.conntype, *args)
785+
786+
787+
class TaskState(QObject):
788+
789+
status_changed = Signal(str)
790+
_p_status_changed = Signal(str)
791+
792+
progress_changed = Signal(float)
793+
_p_progress_changed = Signal(float)
794+
795+
partial_result_ready = Signal(object)
796+
_p_partial_result_ready = Signal(object)
797+
798+
stopped = Signal()
799+
800+
def __init__(self, *args):
801+
super().__init__(*args)
802+
self.__future = None
803+
self.watcher = FutureWatcher()
804+
self.__interruption_requested = False
805+
self.__progress = 0
806+
# Helpers to route the signal emits via a this object's queue.
807+
# This ensures 'atomic' disconnect from signals for targets/slots
808+
# in the same thread. Requires that the event loop is running in this
809+
# object's thread.
810+
self._p_status_changed.connect(
811+
self.status_changed, Qt.QueuedConnection)
812+
self._p_progress_changed.connect(
813+
self.progress_changed, Qt.QueuedConnection)
814+
self._p_partial_result_ready.connect(
815+
self.partial_result_ready, Qt.QueuedConnection)
816+
817+
@property
818+
def future(self):
819+
# type: () -> Future
820+
return self.__future
821+
822+
def set_status(self, text):
823+
self._p_status_changed.emit(text)
824+
825+
def set_progress_value(self, value):
826+
if round(value, 1) > round(self.__progress, 1):
827+
# Only emit progress when it has changed sufficiently
828+
self._p_progress_changed.emit(value)
829+
self.__progress = value
830+
831+
def set_partial_results(self, value):
832+
self._p_partial_result_ready.emit(value)
833+
834+
def is_interruption_requested(self):
835+
return self.__interruption_requested
836+
837+
def start(self, executor, func=None):
838+
# type: (concurrent.futures.Executor, Callable[[], Any]) -> Future
839+
assert self.future is None
840+
assert not self.__interruption_requested
841+
self.__future = executor.submit(func)
842+
self.watcher.setFuture(self.future)
843+
return self.future
844+
845+
def cancel(self):
846+
assert not self.__interruption_requested
847+
self.__interruption_requested = True
848+
if self.future is not None:
849+
rval = self.future.cancel()
850+
else:
851+
# not even scheduled yet
852+
rval = True
853+
return rval
854+
855+
def stop(self):
856+
self.stopped.emit()
857+
858+
859+
class InterruptRequested(BaseException):
860+
pass
861+
862+
863+
class ConcurrentWidgetMixin:
864+
def __init__(self):
865+
self.__executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
866+
self.__task = None # type: Optional[TaskState]
867+
868+
@property
869+
def task(self):
870+
return self.__task
871+
872+
def _start_partial(self, state: TaskState) -> Callable[[], Any]:
873+
raise NotImplementedError
874+
875+
def _set_partial_results(self, result: Tuple[str, Any]) -> None:
876+
raise NotImplementedError
877+
878+
def __set_results(self, results: "Results"):
879+
for key in results.__dict__:
880+
value = getattr(results, key)
881+
if value is not None:
882+
setattr(self, key, value)
883+
884+
def __set_state_ready(self):
885+
self.progressBarFinished()
886+
self.setBlocking(False)
887+
self.setStatusMessage("")
888+
889+
def __set_state_busy(self):
890+
self.progressBarInit()
891+
self.setBlocking(True)
892+
893+
def __start_task(self, task: Callable[[], Any], state: TaskState):
894+
assert self.__task is None
895+
state.status_changed.connect(self.setStatusMessage)
896+
state.progress_changed.connect(self.progressBarSet)
897+
state.partial_result_ready.connect(self._set_partial_results)
898+
state.stopped.connect(self.canceled)
899+
state.watcher.done.connect(self.finish)
900+
state.start(self.__executor, task)
901+
state.setParent(self)
902+
self.__task = state
903+
904+
def __cancel_task(self, wait=True):
905+
if self.__task is not None:
906+
state, self.__task = self.__task, None
907+
state.cancel()
908+
state.partial_result_ready.disconnect(self._set_partial_results)
909+
state.status_changed.disconnect(self.setStatusMessage)
910+
state.progress_changed.disconnect(self.progressBarSet)
911+
state.watcher.done.disconnect(self.finish)
912+
if wait:
913+
concurrent.futures.wait([state.future])
914+
state.deleteLater()
915+
else:
916+
w = FutureWatcher(state.future, parent=state)
917+
w.done.connect(state.deleteLater)
918+
919+
def start(self):
920+
""" Call to start the task. """
921+
self.__cancel_task(wait=False)
922+
923+
if self.data is None:
924+
self.__set_state_ready()
925+
return
926+
927+
state = TaskState(self)
928+
task = self._start_partial(state)
929+
self.__set_state_busy()
930+
self.__start_task(task, state)
931+
932+
def finish(self, future: Future):
933+
""" Invoked when task is done. """
934+
assert future.done()
935+
assert self.__task is not None
936+
assert self.__task.future is future
937+
assert self.__task.watcher.future() is future
938+
self.__task, task = None, self.__task
939+
task.deleteLater()
940+
self.__set_state_ready()
941+
result = future.result()
942+
self.__set_results(result)
943+
944+
def cancel(self):
945+
""" Call to stop the task. """
946+
self.__cancel_task(wait=False)
947+
self.__set_state_ready()
948+
949+
def canceled(self):
950+
""" Invoked when task is stopped. """
951+
pass
952+
953+
def cancel_task_no_wait(self):
954+
self.__cancel_task(wait=False)
955+
956+
def on_delete_widget(self):
957+
self.__cancel_task(wait=True)
958+
self.__executor.shutdown(True)

0 commit comments

Comments
 (0)