Skip to content

Commit 81258df

Browse files
committed
FreeViz: Offload work to a separate thread
1 parent abdb14b commit 81258df

File tree

5 files changed

+344
-189
lines changed

5 files changed

+344
-189
lines changed

Orange/projection/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import inspect
23
import threading
34

@@ -135,6 +136,7 @@ def proj_variable(i, name):
135136

136137
super().__init__(proj=proj)
137138
self.orig_domain = domain
139+
self.n_components = n_components
138140
var_names = self._get_var_names(n_components)
139141
self.domain = Orange.data.Domain(
140142
[proj_variable(i, var_names[i]) for i in range(n_components)],
@@ -145,6 +147,13 @@ def _get_var_names(self, n):
145147
names = [f"{self.var_prefix}-{postfix}" for postfix in postfixes]
146148
return get_unique_names(self.orig_domain, names)
147149

150+
def copy(self):
151+
proj = copy.deepcopy(self.proj)
152+
model = type(self)(proj, self.domain.copy(), self.n_components)
153+
model.pre_domain = self.pre_domain.copy()
154+
model.name = self.name
155+
return model
156+
148157

149158
class LinearProjector(Projector):
150159
name = "Linear Projection"

Orange/widgets/tests/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,12 +1013,14 @@ def test_invalidated_embedding(self, timeout=DEFAULT_TIMEOUT):
10131013
self.widget.graph.update_coordinates.assert_not_called()
10141014
self.widget.graph.update_point_props.assert_called_once()
10151015

1016-
def test_saved_selection(self):
1016+
def test_saved_selection(self, timeout=DEFAULT_TIMEOUT):
10171017
self.send_signal(self.widget.Inputs.data, self.data)
1018+
self.wait_until_stop_blocking()
10181019
self.widget.graph.select_by_indices(list(range(0, len(self.data), 10)))
10191020
settings = self.widget.settingsHandler.pack_data(self.widget)
10201021
w = self.create_widget(self.widget.__class__, stored_settings=settings)
10211022
self.send_signal(self.widget.Inputs.data, self.data, widget=w)
1023+
self.wait_until_stop_blocking(widget=w)
10221024
self.assertEqual(np.sum(w.graph.selection), 15)
10231025
np.testing.assert_equal(self.widget.graph.selection, w.graph.selection)
10241026

Orange/widgets/utils/concurrent.py

Lines changed: 167 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
# TODO: Rename the module to something that does not conflict with stdlib
55
# concurrent
6+
from typing import Callable, Any
67
import os
78
import threading
89
import atexit
@@ -11,18 +12,15 @@
1112
import weakref
1213
from functools import partial
1314
import concurrent.futures
14-
1515
from concurrent.futures import Future, TimeoutError
1616
from contextlib import contextmanager
17-
from typing import Callable, Optional, Any, List
1817

1918
from AnyQt.QtCore import (
2019
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable, QSemaphore,
21-
QEventLoop, QCoreApplication, QEvent, Q_ARG
20+
QEventLoop, QCoreApplication, QEvent, Q_ARG,
21+
pyqtSignal as Signal, pyqtSlot as Slot
2222
)
2323

24-
from AnyQt.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
25-
2624
_log = logging.getLogger(__name__)
2725

2826

@@ -784,3 +782,167 @@ def __call__(self, *args):
784782
args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
785783
return QMetaObject.invokeMethod(
786784
self.obj, self.method, self.conntype, *args)
785+
786+
787+
class TaskState(QObject):
788+
789+
status_changed = Signal(str)
790+
_p_status_changed = Signal(str)
791+
792+
progress_changed = Signal(float)
793+
_p_progress_changed = Signal(float)
794+
795+
partial_result_ready = Signal(object)
796+
_p_partial_result_ready = Signal(object)
797+
798+
def __init__(self, *args):
799+
super().__init__(*args)
800+
self.__future = None
801+
self.watcher = FutureWatcher()
802+
self.__interruption_requested = False
803+
self.__progress = 0
804+
# Helpers to route the signal emits via a this object's queue.
805+
# This ensures 'atomic' disconnect from signals for targets/slots
806+
# in the same thread. Requires that the event loop is running in this
807+
# object's thread.
808+
self._p_status_changed.connect(
809+
self.status_changed, Qt.QueuedConnection)
810+
self._p_progress_changed.connect(
811+
self.progress_changed, Qt.QueuedConnection)
812+
self._p_partial_result_ready.connect(
813+
self.partial_result_ready, Qt.QueuedConnection)
814+
815+
@property
816+
def future(self):
817+
# type: () -> Future
818+
return self.__future
819+
820+
def set_status(self, text):
821+
self._p_status_changed.emit(text)
822+
823+
def set_progress_value(self, value):
824+
if round(value, 1) > round(self.__progress, 1):
825+
# Only emit progress when it has changed sufficiently
826+
self._p_progress_changed.emit(value)
827+
self.__progress = value
828+
829+
def set_partial_results(self, value):
830+
self._p_partial_result_ready.emit(value)
831+
832+
def is_interruption_requested(self):
833+
return self.__interruption_requested
834+
835+
def start(self, executor, func=None):
836+
# type: (concurrent.futures.Executor, Callable[[], Any]) -> Future
837+
assert self.future is None
838+
assert not self.__interruption_requested
839+
self.__future = executor.submit(func)
840+
self.watcher.setFuture(self.future)
841+
return self.future
842+
843+
def cancel(self):
844+
assert not self.__interruption_requested
845+
self.__interruption_requested = True
846+
if self.future is not None:
847+
rval = self.future.cancel()
848+
else:
849+
# not even scheduled yet
850+
rval = True
851+
return rval
852+
853+
854+
class InterruptRequested(BaseException):
855+
pass
856+
857+
858+
class ConcurrentWidgetMixin:
859+
def __init__(self):
860+
self.__executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
861+
self.__task = None # type: Optional[TaskState]
862+
863+
@property
864+
def task(self):
865+
return self.__task
866+
867+
def _prepare_task(self, state: TaskState) -> Callable[[], Any]:
868+
raise NotImplementedError
869+
870+
def _set_partial_results(self, result: Any) -> None:
871+
raise NotImplementedError
872+
873+
def _set_results(self, results: Any) -> None:
874+
# NOTE: All of these have already been set by _set_partial_results,
875+
# we double check that they are aliases
876+
for key in results.__dict__:
877+
value = getattr(results, key)
878+
if value is not None:
879+
setattr(self, key, value)
880+
881+
def __set_state_ready(self):
882+
self.progressBarFinished()
883+
self.setBlocking(False)
884+
self.setStatusMessage("")
885+
886+
def __set_state_busy(self):
887+
self.progressBarInit()
888+
self.setBlocking(True)
889+
890+
def __start_task(self, task: Callable[[], Any], state: TaskState):
891+
assert self.__task is None
892+
state.status_changed.connect(self.setStatusMessage)
893+
state.progress_changed.connect(self.progressBarSet)
894+
state.partial_result_ready.connect(self._set_partial_results)
895+
state.watcher.done.connect(self.on_done)
896+
state.start(self.__executor, task)
897+
state.setParent(self)
898+
self.__task = state
899+
900+
def __cancel_task(self, wait=True):
901+
if self.__task is not None:
902+
state, self.__task = self.__task, None
903+
state.cancel()
904+
state.partial_result_ready.disconnect(self._set_partial_results)
905+
state.status_changed.disconnect(self.setStatusMessage)
906+
state.progress_changed.disconnect(self.progressBarSet)
907+
state.watcher.done.disconnect(self.on_done)
908+
if wait:
909+
concurrent.futures.wait([state.future])
910+
state.deleteLater()
911+
else:
912+
w = FutureWatcher(state.future, parent=state)
913+
w.done.connect(state.deleteLater)
914+
915+
def start(self):
916+
""" Call to start the task. """
917+
self.__cancel_task(wait=False)
918+
919+
if self.data is None:
920+
self.__set_state_ready()
921+
return
922+
923+
state = TaskState(self)
924+
task = self._prepare_task(state)
925+
self.__set_state_busy()
926+
self.__start_task(task, state)
927+
928+
def on_done(self, future: Future):
929+
""" Invoked when task is done. """
930+
assert future.done()
931+
assert self.__task is not None
932+
assert self.__task.future is future
933+
assert self.__task.watcher.future() is future
934+
self.__task, task = None, self.__task
935+
task.deleteLater()
936+
self.__set_state_ready()
937+
result = future.result()
938+
self._set_results(result)
939+
940+
def cancel(self):
941+
""" Call to stop the task. """
942+
self.__cancel_task(wait=False)
943+
self.__set_state_ready()
944+
945+
def shutdown(self):
946+
""" Call when widget is deleted (in onDeleteWidget). """
947+
self.__cancel_task(wait=True)
948+
self.__executor.shutdown(True)

0 commit comments

Comments
 (0)