99"""
1010
1111import logging
12- import itertools
12+ import warnings
1313
1414from collections import namedtuple , defaultdict , deque
15- from operator import attrgetter , add
15+ from operator import attrgetter
1616from functools import partial
1717
1818
19- from PyQt4 .QtCore import QObject , QCoreApplication , QEvent
20- from PyQt4 .QtCore import pyqtSignal as Signal
19+ from PyQt4 .QtCore import QObject , QCoreApplication , QEvent , QTimer
20+ from PyQt4 .QtCore import pyqtSignal as Signal , pyqtSlot as Slot
2121
2222
2323from .scheme import SchemeNode , SchemeLink
@@ -83,6 +83,8 @@ def __init__(self, scheme):
8383
8484 # A flag indicating if UpdateRequest event should be rescheduled
8585 self .__reschedule = False
86+ self .__update_timer = QTimer (self , interval = 100 , singleShot = True )
87+ self .__update_timer .timeout .connect (self .__process_next )
8688
8789 def _can_process (self ):
8890 """
@@ -123,6 +125,7 @@ def stop(self):
123125 if self .__state != SignalManager .Stoped :
124126 self .__state = SignalManager .Stoped
125127 self .stateChanged .emit (SignalManager .Stoped )
128+ self .__update_timer .stop ()
126129
127130 def pause (self ):
128131 """
@@ -132,6 +135,7 @@ def pause(self):
132135 if self .__state != SignalManager .Paused :
133136 self .__state = SignalManager .Paused
134137 self .stateChanged .emit (SignalManager .Paused )
138+ self .__update_timer .stop ()
135139
136140 def resume (self ):
137141 if self .__state == SignalManager .Paused :
@@ -141,7 +145,7 @@ def resume(self):
141145
142146 def step (self ):
143147 if self .__state == SignalManager .Paused :
144- self .process_queued (1 )
148+ self .process_queued ()
145149
146150 def state (self ):
147151 """
@@ -294,30 +298,34 @@ def _update_link(self, link):
294298 def process_queued (self , max_nodes = None ):
295299 """
296300 Process queued signals.
301+
302+ Take one node node from the pending input queue and deliver
303+ all scheduled signals.
297304 """
305+ if max_nodes is not None or max_nodes != 1 :
306+ warnings .warn (
307+ "`max_nodes` is deprecated and unused (will always equal 1)" ,
308+ DeprecationWarning , stacklevel = 2 )
309+
298310 if self .__runtime_state == SignalManager .Processing :
299311 raise RuntimeError ("Cannot re-enter 'process_queued'" )
300312
301313 if not self ._can_process ():
302314 raise RuntimeError ("Can't process in state %i" % self .__state )
303315
304- log .info ("Processing queued signals" )
316+ log .info ("SignalManager: Processing queued signals" )
305317
306318 node_update_front = self .node_update_front ()
307-
308- if max_nodes is not None :
309- node_update_front = node_update_front [:max_nodes ]
310-
311- log .debug ("Nodes for update %s" ,
319+ log .debug ("SignalManager: Nodes eligible for update %s" ,
312320 [node .title for node in node_update_front ])
313321
314- self . _set_runtime_state ( SignalManager . Processing )
315- try :
316- # TODO: What if the update front changes in the loop?
317- for node in node_update_front :
322+ if node_update_front :
323+ node = node_update_front [ 0 ]
324+ self . _set_runtime_state ( SignalManager . Processing )
325+ try :
318326 self .process_node (node )
319- finally :
320- self ._set_runtime_state (SignalManager .Waiting )
327+ finally :
328+ self ._set_runtime_state (SignalManager .Waiting )
321329
322330 def process_node (self , node ):
323331 """
@@ -374,9 +382,16 @@ def is_pending(self, node):
374382
375383 def pending_nodes (self ):
376384 """
377- Return a list of pending nodes (in no particular order).
385+ Return a list of pending nodes.
386+
387+ The nodes are returned in the order they were enqueued for
388+ signal delivery.
389+
390+ Returns
391+ -------
392+ nodes : List[SchemeNode]
378393 """
379- return list (set (sig .link .sink_node for sig in self ._input_queue ))
394+ return list (unique (sig .link .sink_node for sig in self ._input_queue ))
380395
381396 def pending_input_signals (self , node ):
382397 """
@@ -422,61 +437,59 @@ def node_update_front(self):
422437 dependents = partial (dependent_nodes , scheme )
423438
424439 blocked_nodes = reduce (set .union ,
425- list ( map (dependents , blocking_nodes ) ),
440+ map (dependents , blocking_nodes ),
426441 set (blocking_nodes ))
427442
428- pending = set (self .pending_nodes ())
429-
443+ pending = self .pending_nodes ()
430444 pending_downstream = reduce (set .union ,
431- list ( map (dependents , pending ) ),
445+ map (dependents , pending ),
432446 set ())
433447
434448 log .debug ("Pending nodes: %s" , pending )
435449 log .debug ("Blocking nodes: %s" , blocking_nodes )
436450
437- return list (pending - pending_downstream - blocked_nodes )
438-
439- def event (self , event ):
440- if event .type () == QEvent .UpdateRequest :
441- if not self .__state == SignalManager .Running :
442- log .debug ("Received 'UpdateRequest' event while not "
443- "in 'Running' state" )
444- event .setAccepted (False )
445- return False
446-
447- if self .__runtime_state == SignalManager .Processing :
448- log .debug ("Received 'UpdateRequest' event while in "
449- "'process_queued'" )
450- # This happens if someone calls QCoreApplication.processEvents
451- # from the signal handlers.
452- self .__reschedule = True
453- event .accept ()
454- return True
455-
456- log .info ("'UpdateRequest' event, queued signals: %i" ,
457- len (self ._input_queue ))
458- if self ._input_queue :
459- self .process_queued (max_nodes = 1 )
460- event .accept ()
461-
462- if self .__reschedule :
463- log .debug ("Rescheduling 'UpdateRequest' event" )
464- self ._update ()
465- self .__reschedule = False
466- elif self .node_update_front ():
467- log .debug ("More nodes are eligible for an update. "
468- "Scheduling another update." )
469- self ._update ()
470-
471- return True
472-
473- return QObject .event (self , event )
451+ noneligible = pending_downstream | blocked_nodes
452+ return [node for node in pending if node not in noneligible ]
453+
454+ @Slot ()
455+ def __process_next (self ):
456+ if not self .__state == SignalManager .Running :
457+ log .debug ("Received 'UpdateRequest' while not in 'Running' state" )
458+ return
459+
460+ if self .__runtime_state == SignalManager .Processing :
461+ # This happens if someone calls QCoreApplication.processEvents
462+ # from the signal handlers.
463+ # A `__process_next` must be rescheduled when exiting
464+ # process_queued.
465+ log .warning ("Received 'UpdateRequest' while in 'process_queued'. "
466+ "An update will be re-scheduled when exiting the "
467+ "current update." )
468+ self .__reschedule = True
469+ return
470+
471+ log .info ("'UpdateRequest' event, queued signals: %i" ,
472+ len (self ._input_queue ))
473+ if self ._input_queue :
474+ self .process_queued ()
475+
476+ if self .__reschedule and self .__state == SignalManager .Running :
477+ self .__reschedule = False
478+ log .debug ("Rescheduling signal update" )
479+ self .__update_timer .start ()
480+
481+ if self .node_update_front ():
482+ log .debug ("More nodes are eligible for an update. "
483+ "Scheduling another update." )
484+ self ._update ()
474485
475486 def _update (self ):
476487 """
477488 Schedule processing at a later time.
478489 """
479- QCoreApplication .postEvent (self , QEvent (QEvent .UpdateRequest ))
490+ if self .__state == SignalManager .Running and \
491+ not self .__update_timer .isActive ():
492+ self .__update_timer .start ()
480493
481494
482495def can_enable_dynamic (link , value ):
@@ -553,3 +566,27 @@ def group_by_all(sequence, key=None):
553566 order_seen .append (item_key )
554567
555568 return [(key , groups [key ]) for key in order_seen ]
569+
570+
571+ def unique (iterable ):
572+ """
573+ Return unique elements of `iterable` while preserving their order.
574+
575+ Parameters
576+ ----------
577+ iterable : Iterable[Hashable]
578+
579+
580+ Returns
581+ -------
582+ unique : Iterable
583+ Unique elements from `iterable`.
584+ """
585+ seen = set ()
586+
587+ def observed (el ):
588+ observed = el in seen
589+ seen .add (el )
590+ return observed
591+
592+ return (el for el in iterable if not observed (el ))
0 commit comments