1010from collections import deque
1111from queue import Queue
1212from threading import RLock
13- from typing import Tuple , Dict , List , Optional , Deque
13+ from typing import Tuple , Dict , List , Optional , Deque , Any
1414
1515from bobocep .cep .engine .decider .pubsub import BoboDeciderPublisher , \
1616 BoboDeciderSubscriber
1717from bobocep .cep .engine .decider .run import BoboRun
1818from bobocep .cep .engine .decider .runserial import BoboRunSerial
1919from bobocep .cep .engine .receiver .pubsub import BoboReceiverSubscriber
2020from bobocep .cep .engine .task import BoboEngineTaskError , BoboEngineTask
21- from bobocep .cep .event import BoboHistory , BoboEvent
21+ from bobocep .cep .event import BoboHistory , BoboEvent , BoboEventSimple
22+ from bobocep .cep .gen import BoboGenTimestamp
2223from bobocep .cep .gen .event_id import BoboGenEventID
2324from bobocep .cep .phenom .pattern .pattern import BoboPattern
2425from bobocep .cep .phenom .phenom import BoboPhenomenon
@@ -36,8 +37,7 @@ class BoboDeciderError(BoboEngineTaskError):
3637 """
3738
3839
39- class BoboDecider (BoboEngineTask ,
40- BoboDeciderPublisher ,
40+ class BoboDecider (BoboDeciderPublisher ,
4141 BoboReceiverSubscriber ,
4242 BoboDistributedSubscriber ):
4343 """
@@ -48,16 +48,14 @@ def __init__(self,
4848 phenomena : List [BoboPhenomenon ],
4949 gen_event_id : BoboGenEventID ,
5050 gen_run_id : BoboGenEventID ,
51- max_cache : int = 0 ,
52- max_size : int = 0 ):
51+ gen_timestamp : BoboGenTimestamp ,
52+ max_cache : int = 0 ):
5353 """
5454 :param phenomena: List of phenomena.
5555 :param gen_event_id: Event ID generator.
5656 :param gen_run_id: Run ID generator.
5757 :param max_cache: Max cache size (<=0 means no caching).
5858 Default: 0.
59- :param max_size: Max queue size.
60- Default: 0 (unbounded).
6159 """
6260 super ().__init__ ()
6361
@@ -76,11 +74,10 @@ def __init__(self,
7674
7775 self ._gen_event_id : BoboGenEventID = gen_event_id
7876 self ._gen_run_id : BoboGenEventID = gen_run_id
77+ self ._gen_timestamp : BoboGenTimestamp = gen_timestamp
7978 # Phenomenon Name => Pattern Name => Run ID => Run
8079 self ._runs : Dict [str , Dict [str , Dict [str , BoboRun ]]] = {}
8180 self ._stub_history : BoboHistory = BoboHistory ({})
82- self ._max_size : int = max (0 , max_size )
83- self ._queue : Queue [BoboEvent ] = Queue (self ._max_size )
8481
8582 self ._caching : bool = max_cache > 0
8683 self ._cache_completed : Optional [Deque [BoboRunSerial ]] = \
@@ -96,7 +93,7 @@ def subscribe(self, subscriber: BoboDeciderSubscriber) -> None:
9693 if subscriber not in self ._subscribers :
9794 self ._subscribers .append (subscriber )
9895
99- def update (self ) -> bool :
96+ def process (self , data : Any ) -> bool :
10097 """
10198 Performs an update cycle of the decider that takes an event from its
10299 queue and checks it against phenomena and existing runs.
@@ -108,10 +105,12 @@ def update(self) -> bool:
108105 if self ._closed :
109106 return False
110107
111- if not self ._queue .empty ():
108+ if data is not None :
109+ timestamp : int = self ._gen_timestamp .generate ()
110+
112111 # Process event and collect changes to decider
113112 rl_completed , rl_halted , rl_updated = \
114- self ._process_event (self . _queue . get_nowait () )
113+ self ._process_event (data , timestamp )
115114
116115 completed : List [BoboRunSerial ] = \
117116 [run_c .serialize () for run_c in rl_completed ]
@@ -200,20 +199,6 @@ def _maybe_cache(
200199 for h in halted :
201200 self ._cache_halted .append (h )
202201
203- def on_receiver_update (self , event : BoboEvent ) -> None :
204- """
205- :param event: Event from Receiver.
206- """
207- with self ._lock :
208- if self ._closed :
209- return
210-
211- if not self ._queue .full ():
212- self ._queue .put (event )
213- else :
214- raise BoboDeciderError (
215- _EXC_QUEUE_FULL .format (self ._max_size ))
216-
217202 def _maybe_check_against_cache (
218203 self ,
219204 completed : List [BoboRunSerial ],
@@ -380,7 +365,9 @@ def on_distributed_update(
380365 phenomenon_name = runremote .phenomenon_name ,
381366 pattern = pattern ,
382367 block_index = runremote .block_index ,
383- history = runremote .history )
368+ history = runremote .history ,
369+ gen_event_id = self ._gen_event_id
370+ )
384371
385372 self ._add_run (
386373 runremote .phenomenon_name ,
@@ -424,7 +411,7 @@ def phenomena(self) -> Tuple[BoboPhenomenon, ...]:
424411 :return: All phenomena under consideration by the decider.
425412 """
426413 with self ._lock :
427- return tuple (self ._phenomena .values ())
414+ return tuple (* self ._phenomena .values ())
428415
429416 def all_runs (self ) -> Tuple [BoboRun , ...]:
430417 """
@@ -453,7 +440,8 @@ def runs_from(self,
453440 pattern_name in self ._runs [phenomenon_name ]
454441 ):
455442 return tuple (
456- self ._runs [phenomenon_name ][pattern_name ].values ())
443+ * self ._runs [phenomenon_name ][pattern_name ].values ()
444+ )
457445 return tuple ()
458446
459447 def run_at (self ,
@@ -476,13 +464,6 @@ def run_at(self,
476464 return self ._runs [phenomenon_name ][pattern_name ][run_id ]
477465 return None
478466
479- def size (self ) -> int :
480- """
481- :return: The total number of events in the decider's queue.
482- """
483- with self ._lock :
484- return self ._queue .qsize ()
485-
486467 def close (self ) -> None :
487468 """
488469 Closes the Decider.
@@ -497,19 +478,19 @@ def is_closed(self) -> bool:
497478 with self ._lock :
498479 return self ._closed
499480
500- def _process_event (self , event : BoboEvent ) -> \
481+ def _process_event (self , data : Any , timestamp : int ) -> \
501482 Tuple [List [BoboRun ], List [BoboRun ], List [BoboRun ]]:
502483 """
503484 :param event: An event.
504485
505486 :return: Runs that had a state change due to the event.
506487 """
507- r_halt_com , r_halt_incom , r_upd = self ._check_against_runs (event )
508- p_halt_com , p_upd = self ._check_against_patterns (event )
488+ r_halt_com , r_halt_incom , r_upd = self ._check_against_runs (data , timestamp )
489+ p_halt_com , p_upd = self ._check_against_patterns (data , timestamp )
509490
510491 return (r_halt_com + p_halt_com ), r_halt_incom , (r_upd + p_upd )
511492
512- def _check_against_runs (self , event : BoboEvent ) -> \
493+ def _check_against_runs (self , data : Any , timestamp : int ) -> \
513494 Tuple [List [BoboRun ], List [BoboRun ], List [BoboRun ]]:
514495 """
515496 :param event: An event.
@@ -528,7 +509,7 @@ def _check_against_runs(self, event: BoboEvent) -> \
528509 # If an internal state change occurs in the run...
529510 run_eval : bool
530511 try :
531- run_eval = run .process (event )
512+ run_eval = run .process (data , timestamp )
532513 except (Exception ,):
533514 continue
534515
@@ -549,7 +530,7 @@ def _check_against_runs(self, event: BoboEvent) -> \
549530
550531 return runs_halted_complete , runs_halted_incomplete , runs_updated
551532
552- def _check_against_patterns (self , event : BoboEvent ) -> \
533+ def _check_against_patterns (self , data : Any , timestamp : int ) -> \
553534 Tuple [List [BoboRun ], List [BoboRun ]]:
554535 """
555536 :param event: An event.
@@ -565,22 +546,34 @@ def _check_against_patterns(self, event: BoboEvent) -> \
565546 any_eval : bool = False
566547 for predicate in pattern .blocks [0 ].predicates :
567548 try :
568- if predicate .evaluate (event , self ._stub_history ):
549+ if predicate .evaluate (
550+ data , timestamp , self ._stub_history
551+ ):
569552 any_eval = True
570553 break
571554 except (Exception ,):
572555 pass
573556
574557 # ...create a run.
575558 if any_eval :
559+ if isinstance (data , BoboEvent ):
560+ event = data
561+ else :
562+ event = BoboEventSimple (
563+ event_id = self ._gen_event_id .generate (),
564+ timestamp = timestamp ,
565+ data = data )
566+
576567 newrun = BoboRun (
577568 run_id = self ._gen_run_id .generate (),
578569 phenomenon_name = phenomenon .name ,
579570 pattern = pattern ,
580571 block_index = 1 ,
581572 history = BoboHistory ({
582573 pattern .blocks [0 ].group : [event ]
583- }))
574+ }),
575+ gen_event_id = self ._gen_event_id
576+ )
584577
585578 if newrun .is_halted () and newrun .is_complete ():
586579 runs_halted_complete .append (newrun )
0 commit comments