Skip to content

Commit 049b6ed

Browse files
committed
FreeViz: Offload work to a separate thread
1 parent abdb14b commit 049b6ed

File tree

5 files changed

+385
-207
lines changed

5 files changed

+385
-207
lines changed

Orange/projection/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import inspect
23
import threading
34

@@ -135,6 +136,7 @@ def proj_variable(i, name):
135136

136137
super().__init__(proj=proj)
137138
self.orig_domain = domain
139+
self.n_components = n_components
138140
var_names = self._get_var_names(n_components)
139141
self.domain = Orange.data.Domain(
140142
[proj_variable(i, var_names[i]) for i in range(n_components)],
@@ -145,6 +147,13 @@ def _get_var_names(self, n):
145147
names = [f"{self.var_prefix}-{postfix}" for postfix in postfixes]
146148
return get_unique_names(self.orig_domain, names)
147149

150+
def copy(self):
151+
proj = copy.deepcopy(self.proj)
152+
model = type(self)(proj, self.domain.copy(), self.n_components)
153+
model.pre_domain = self.pre_domain.copy()
154+
model.name = self.name
155+
return model
156+
148157

149158
class LinearProjector(Projector):
150159
name = "Linear Projection"

Orange/widgets/tests/base.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1013,12 +1013,18 @@ def test_invalidated_embedding(self, timeout=DEFAULT_TIMEOUT):
10131013
self.widget.graph.update_coordinates.assert_not_called()
10141014
self.widget.graph.update_point_props.assert_called_once()
10151015

1016-
def test_saved_selection(self):
1016+
def test_saved_selection(self, timeout=DEFAULT_TIMEOUT):
10171017
self.send_signal(self.widget.Inputs.data, self.data)
1018+
if self.widget.isBlocking():
1019+
spy = QSignalSpy(self.widget.blockingStateChanged)
1020+
self.assertTrue(spy.wait(timeout))
10181021
self.widget.graph.select_by_indices(list(range(0, len(self.data), 10)))
10191022
settings = self.widget.settingsHandler.pack_data(self.widget)
10201023
w = self.create_widget(self.widget.__class__, stored_settings=settings)
10211024
self.send_signal(self.widget.Inputs.data, self.data, widget=w)
1025+
if w.isBlocking():
1026+
spy = QSignalSpy(w.blockingStateChanged)
1027+
self.assertTrue(spy.wait(timeout))
10221028
self.assertEqual(np.sum(w.graph.selection), 15)
10231029
np.testing.assert_equal(self.widget.graph.selection, w.graph.selection)
10241030

Orange/widgets/utils/concurrent.py

Lines changed: 187 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"""
44
# TODO: Rename the module to something that does not conflict with stdlib
55
# concurrent
6+
from typing import Callable, Any
67
import os
78
import threading
89
import atexit
@@ -11,18 +12,15 @@
1112
import weakref
1213
from functools import partial
1314
import concurrent.futures
14-
1515
from concurrent.futures import Future, TimeoutError
1616
from contextlib import contextmanager
17-
from typing import Callable, Optional, Any, List
1817

1918
from AnyQt.QtCore import (
2019
Qt, QObject, QMetaObject, QThreadPool, QThread, QRunnable, QSemaphore,
21-
QEventLoop, QCoreApplication, QEvent, Q_ARG
20+
QEventLoop, QCoreApplication, QEvent, Q_ARG,
21+
pyqtSignal as Signal, pyqtSlot as Slot
2222
)
2323

24-
from AnyQt.QtCore import pyqtSignal as Signal, pyqtSlot as Slot
25-
2624
_log = logging.getLogger(__name__)
2725

2826

@@ -784,3 +782,187 @@ def __call__(self, *args):
784782
args = [Q_ARG(atype, arg) for atype, arg in zip(self.arg_types, args)]
785783
return QMetaObject.invokeMethod(
786784
self.obj, self.method, self.conntype, *args)
785+
786+
787+
class TaskState(QObject):
788+
789+
status_changed = Signal(str)
790+
_p_status_changed = Signal(str)
791+
792+
progress_changed = Signal(float)
793+
_p_progress_changed = Signal(float)
794+
795+
partial_result_ready = Signal(object)
796+
_p_partial_result_ready = Signal(object)
797+
798+
def __init__(self, *args):
799+
super().__init__(*args)
800+
self.__future = None
801+
self.watcher = FutureWatcher()
802+
self.__interruption_requested = False
803+
self.__progress = 0
804+
# Helpers to route the signal emits via a this object's queue.
805+
# This ensures 'atomic' disconnect from signals for targets/slots
806+
# in the same thread. Requires that the event loop is running in this
807+
# object's thread.
808+
self._p_status_changed.connect(
809+
self.status_changed, Qt.QueuedConnection)
810+
self._p_progress_changed.connect(
811+
self.progress_changed, Qt.QueuedConnection)
812+
self._p_partial_result_ready.connect(
813+
self.partial_result_ready, Qt.QueuedConnection)
814+
815+
@property
816+
def future(self) -> Future:
817+
return self.__future
818+
819+
def set_status(self, text: str):
820+
self._p_status_changed.emit(text)
821+
822+
def set_progress_value(self, value: float):
823+
if round(value, 1) > round(self.__progress, 1):
824+
# Only emit progress when it has changed sufficiently
825+
self._p_progress_changed.emit(value)
826+
self.__progress = value
827+
828+
def set_partial_result(self, value: Any):
829+
self._p_partial_result_ready.emit(value)
830+
831+
def is_interruption_requested(self) -> bool:
832+
return self.__interruption_requested
833+
834+
def start(self, executor: concurrent.futures.Executor,
835+
func: Callable[[], Any] = None) -> Future:
836+
assert self.future is None
837+
assert not self.__interruption_requested
838+
self.__future = executor.submit(func)
839+
self.watcher.setFuture(self.future)
840+
return self.future
841+
842+
def cancel(self) -> bool:
843+
assert not self.__interruption_requested
844+
self.__interruption_requested = True
845+
if self.future is not None:
846+
rval = self.future.cancel()
847+
else:
848+
# not even scheduled yet
849+
rval = True
850+
return rval
851+
852+
853+
class InterruptRequested(BaseException):
854+
pass
855+
856+
857+
class ConcurrentWidgetMixin:
858+
def __init__(self):
859+
self.__executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
860+
self.__task = None # type: Optional[TaskState]
861+
862+
@property
863+
def task(self) -> TaskState:
864+
return self.__task
865+
866+
def on_partial_result(self, result: Any) -> None:
867+
""" Invoked from runner (by state) to send the partial results
868+
The method should handle partial results, i.e. show them in the plot.
869+
870+
:param result: any data structure to hold temporary result
871+
"""
872+
raise NotImplementedError
873+
874+
def on_done(self, result: Any) -> None:
875+
""" Invoked when task is done.
876+
The method should re-set the result (to double check it) and
877+
perform operations with obtained results, eg. send data to the output.
878+
879+
:param result: any data structure to hold temporary result
880+
"""
881+
raise NotImplementedError
882+
883+
def on_exception(self, ex: Exception):
884+
""" Invoked when an exception occurs during the calculation.
885+
Override in order to handle exceptions, eg. show an error
886+
message in the widget.
887+
888+
:param ex: exception
889+
"""
890+
raise ex
891+
892+
def start(self, task: Callable, *args, **kwargs):
893+
""" Call from derived class to start the task.
894+
:param task: runner - a method to run in a thread - should accept
895+
`state` parameter
896+
"""
897+
self.__cancel_task(wait=False)
898+
899+
if self.data is None:
900+
self.__set_state_ready()
901+
return
902+
903+
assert callable(task), "`task` must be callable!"
904+
state = TaskState(self)
905+
task = partial(task, *(args + (state,)), **kwargs)
906+
907+
self.__set_state_busy()
908+
self.__start_task(task, state)
909+
910+
def cancel(self):
911+
""" Call from derived class to stop the task. """
912+
self.__cancel_task(wait=False)
913+
self.__set_state_ready()
914+
915+
def shutdown(self):
916+
""" Call from derived class when the widget is deleted
917+
(in onDeleteWidget).
918+
"""
919+
self.__cancel_task(wait=True)
920+
self.__executor.shutdown(True)
921+
922+
def __set_state_ready(self):
923+
self.progressBarFinished()
924+
self.setBlocking(False)
925+
self.setStatusMessage("")
926+
927+
def __set_state_busy(self):
928+
self.progressBarInit()
929+
self.setBlocking(True)
930+
931+
def __start_task(self, task: Callable[[], Any], state: TaskState):
932+
assert self.__task is None
933+
state.status_changed.connect(self.setStatusMessage)
934+
state.progress_changed.connect(self.progressBarSet)
935+
state.partial_result_ready.connect(self.on_partial_result)
936+
state.watcher.done.connect(self.__on_task_done)
937+
state.start(self.__executor, task)
938+
state.setParent(self)
939+
self.__task = state
940+
941+
def __cancel_task(self, wait: bool = True):
942+
if self.__task is not None:
943+
state, self.__task = self.__task, None
944+
state.cancel()
945+
state.partial_result_ready.disconnect(self.on_partial_result)
946+
state.status_changed.disconnect(self.setStatusMessage)
947+
state.progress_changed.disconnect(self.progressBarSet)
948+
state.watcher.done.disconnect(self.__on_task_done)
949+
if wait:
950+
concurrent.futures.wait([state.future])
951+
state.deleteLater()
952+
else:
953+
w = FutureWatcher(state.future, parent=state)
954+
w.done.connect(state.deleteLater)
955+
956+
def __on_task_done(self, future: Future):
957+
assert future.done()
958+
assert self.__task is not None
959+
assert self.__task.future is future
960+
assert self.__task.watcher.future() is future
961+
self.__task, task = None, self.__task
962+
task.deleteLater()
963+
self.__set_state_ready()
964+
ex = future.exception()
965+
if ex is not None:
966+
self.on_exception(ex)
967+
else:
968+
self.on_done(future.result())

0 commit comments

Comments
 (0)