Skip to content

Commit e3236b2

Browse files
committed
signalmanager: Better schedule ordering
Process pending nodes in FIFO order
1 parent 832bbb7 commit e3236b2

File tree

1 file changed

+58
-23
lines changed

1 file changed

+58
-23
lines changed

Orange/canvas/scheme/signalmanager.py

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
"""
1010

1111
import logging
12-
import itertools
12+
import warnings
1313

1414
from collections import namedtuple, defaultdict, deque
15-
from operator import attrgetter, add
15+
from operator import attrgetter
1616
from functools import partial
1717

1818

@@ -145,7 +145,7 @@ def resume(self):
145145

146146
def step(self):
147147
if self.__state == SignalManager.Paused:
148-
self.process_queued(1)
148+
self.process_queued()
149149

150150
def state(self):
151151
"""
@@ -298,30 +298,34 @@ def _update_link(self, link):
298298
def process_queued(self, max_nodes=None):
299299
"""
300300
Process queued signals.
301+
302+
Take one node node from the pending input queue and deliver
303+
all scheduled signals.
301304
"""
305+
if max_nodes is not None or max_nodes != 1:
306+
warnings.warn(
307+
"`max_nodes` is deprecated and unused (will always equal 1)",
308+
DeprecationWarning, stacklevel=2)
309+
302310
if self.__runtime_state == SignalManager.Processing:
303311
raise RuntimeError("Cannot re-enter 'process_queued'")
304312

305313
if not self._can_process():
306314
raise RuntimeError("Can't process in state %i" % self.__state)
307315

308-
log.info("Processing queued signals")
316+
log.info("SignalManager: Processing queued signals")
309317

310318
node_update_front = self.node_update_front()
311-
312-
if max_nodes is not None:
313-
node_update_front = node_update_front[:max_nodes]
314-
315-
log.debug("Nodes for update %s",
319+
log.debug("SignalManager: Nodes eligible for update %s",
316320
[node.title for node in node_update_front])
317321

318-
self._set_runtime_state(SignalManager.Processing)
319-
try:
320-
# TODO: What if the update front changes in the loop?
321-
for node in node_update_front:
322+
if node_update_front:
323+
node = node_update_front[0]
324+
self._set_runtime_state(SignalManager.Processing)
325+
try:
322326
self.process_node(node)
323-
finally:
324-
self._set_runtime_state(SignalManager.Waiting)
327+
finally:
328+
self._set_runtime_state(SignalManager.Waiting)
325329

326330
def process_node(self, node):
327331
"""
@@ -378,9 +382,16 @@ def is_pending(self, node):
378382

379383
def pending_nodes(self):
380384
"""
381-
Return a list of pending nodes (in no particular order).
385+
Return a list of pending nodes.
386+
387+
The nodes are returned in the order they were enqueued for
388+
signal delivery.
389+
390+
Returns
391+
-------
392+
nodes : List[SchemeNode]
382393
"""
383-
return list(set(sig.link.sink_node for sig in self._input_queue))
394+
return list(unique(sig.link.sink_node for sig in self._input_queue))
384395

385396
def pending_input_signals(self, node):
386397
"""
@@ -426,19 +437,19 @@ def node_update_front(self):
426437
dependents = partial(dependent_nodes, scheme)
427438

428439
blocked_nodes = reduce(set.union,
429-
list(map(dependents, blocking_nodes)),
440+
map(dependents, blocking_nodes),
430441
set(blocking_nodes))
431442

432-
pending = set(self.pending_nodes())
433-
443+
pending = self.pending_nodes()
434444
pending_downstream = reduce(set.union,
435-
list(map(dependents, pending)),
445+
map(dependents, pending),
436446
set())
437447

438448
log.debug("Pending nodes: %s", pending)
439449
log.debug("Blocking nodes: %s", blocking_nodes)
440450

441-
return list(pending - pending_downstream - blocked_nodes)
451+
noneligible = pending_downstream | blocked_nodes
452+
return [node for node in pending if node not in noneligible]
442453

443454
@Slot()
444455
def __process_next(self):
@@ -460,7 +471,7 @@ def __process_next(self):
460471
log.info("'UpdateRequest' event, queued signals: %i",
461472
len(self._input_queue))
462473
if self._input_queue:
463-
self.process_queued(max_nodes=1)
474+
self.process_queued()
464475

465476
if self.__reschedule and self.__state == SignalManager.Running:
466477
self.__reschedule = False
@@ -555,3 +566,27 @@ def group_by_all(sequence, key=None):
555566
order_seen.append(item_key)
556567

557568
return [(key, groups[key]) for key in order_seen]
569+
570+
571+
def unique(iterable):
572+
"""
573+
Return unique elements of `iterable` while preserving their order.
574+
575+
Parameters
576+
----------
577+
iterable : Iterable[Hashable]
578+
579+
580+
Returns
581+
-------
582+
unique : Iterable
583+
Unique elements from `iterable`.
584+
"""
585+
seen = set()
586+
587+
def observed(el):
588+
observed = el in seen
589+
seen.add(el)
590+
return observed
591+
592+
return (el for el in iterable if not observed(el))

0 commit comments

Comments
 (0)