Skip to content

Commit f39d763

Browse files
committed
everything moved over but untested
- needs big clean-up - improve interfaces - lots of testing new features going forward - change phenomena dynamically (for non distributed only)
1 parent 0a89691 commit f39d763

File tree

6 files changed

+265
-153
lines changed

6 files changed

+265
-153
lines changed

bobocep/cep/action/handler.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ def get_handler_response(self) -> Optional[BoboHandlerResponse]:
117117
return queue.get_nowait()
118118
return None
119119

120+
def empty(self) -> bool:
121+
with self._lock:
122+
return self._get_queue().empty()
123+
120124
def size(self) -> int:
121125
"""
122126
:return: The size of the handler queue.

bobocep/cep/engine/decider/decider.py

Lines changed: 106 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ class BoboDeciderError(BoboEngineTaskError):
3737
"""
3838

3939

40-
class BoboDecider(BoboDeciderPublisher,
41-
BoboReceiverSubscriber,
42-
BoboDistributedSubscriber):
40+
class BoboDecider(BoboDeciderPublisher, BoboDistributedSubscriber):
4341
"""
4442
A decider task.
4543
"""
@@ -85,6 +83,111 @@ def __init__(self,
8583
self._cache_halted: Optional[Deque[BoboRunSerial]] = \
8684
deque(maxlen=max_cache) if self._caching else None
8785

86+
@property
87+
def gen_event_id(self):
88+
"""
89+
:return: Event ID generator.
90+
"""
91+
return self._gen_event_id
92+
93+
@property
94+
def gen_run_id(self):
95+
"""
96+
:return: Run ID generator.
97+
"""
98+
return self._gen_run_id
99+
100+
@property
101+
def gen_timestamp(self):
102+
"""
103+
:return: Timestamp generator.
104+
"""
105+
return self._gen_timestamp
106+
107+
@property
108+
def phenomena(self) -> Tuple[BoboPhenomenon, ...]:
109+
"""
110+
:return: All phenomena under consideration by the decider.
111+
"""
112+
with self._lock:
113+
return tuple(*self._phenomena.values())
114+
115+
def phenomenon(self, name: str) -> Optional[BoboPhenomenon]:
116+
"""
117+
:param name: Name of the phenomenon.
118+
119+
:return: `BoboPhenomenon` that matches the name; `None` if not found.
120+
"""
121+
with self._lock:
122+
if name not in self._phenomena:
123+
return None
124+
return self._phenomena[name]
125+
126+
def all_runs(self) -> Tuple[BoboRun, ...]:
127+
"""
128+
:return: All active runs in the decider.
129+
"""
130+
with self._lock:
131+
runs: List[BoboRun] = []
132+
for phenomenon_name, dict_patterns in self._runs.items():
133+
for pattern_name, dict_runs in dict_patterns.items():
134+
for _, drun in dict_runs.items():
135+
runs.append(drun)
136+
return tuple(runs)
137+
138+
def runs_from(self,
139+
phenomenon_name: str,
140+
pattern_name: str) -> Tuple[BoboRun, ...]:
141+
"""
142+
:param phenomenon_name: A phenomenon name.
143+
:param pattern_name: A pattern name.
144+
:return: The runs associated with the given
145+
phenomenon and pattern name.
146+
"""
147+
with self._lock:
148+
if (
149+
phenomenon_name in self._runs and
150+
pattern_name in self._runs[phenomenon_name]
151+
):
152+
return tuple(
153+
*self._runs[phenomenon_name][pattern_name].values()
154+
)
155+
return tuple()
156+
157+
def run_at(self,
158+
phenomenon_name: str,
159+
pattern_name: str,
160+
run_id: str) -> Optional[BoboRun]:
161+
"""
162+
:param phenomenon_name: A phenomenon name.
163+
:param pattern_name: A pattern name.
164+
:param run_id: A run ID.
165+
:return: A run associated with the given phenomenon and pattern name;
166+
or None if no such run exists.
167+
"""
168+
with self._lock:
169+
if (
170+
phenomenon_name in self._runs and
171+
pattern_name in self._runs[phenomenon_name] and
172+
run_id in self._runs[phenomenon_name][pattern_name]
173+
):
174+
return self._runs[phenomenon_name][pattern_name][run_id]
175+
return None
176+
177+
def close(self) -> None:
178+
"""
179+
Closes the Decider.
180+
"""
181+
with self._lock:
182+
self._closed = True
183+
184+
def is_closed(self) -> bool:
185+
"""
186+
:return: `True` if decider is set to close; `False` otherwise.
187+
"""
188+
with self._lock:
189+
return self._closed
190+
88191
def subscribe(self, subscriber: BoboDeciderSubscriber) -> None:
89192
"""
90193
:param subscriber: Subscriber to Decider data.
@@ -406,78 +509,6 @@ def _get_pattern(self,
406509
return pattern
407510
return None
408511

409-
def phenomena(self) -> Tuple[BoboPhenomenon, ...]:
410-
"""
411-
:return: All phenomena under consideration by the decider.
412-
"""
413-
with self._lock:
414-
return tuple(*self._phenomena.values())
415-
416-
def all_runs(self) -> Tuple[BoboRun, ...]:
417-
"""
418-
:return: All active runs in the decider.
419-
"""
420-
with self._lock:
421-
runs: List[BoboRun] = []
422-
for phenomenon_name, dict_patterns in self._runs.items():
423-
for pattern_name, dict_runs in dict_patterns.items():
424-
for _, drun in dict_runs.items():
425-
runs.append(drun)
426-
return tuple(runs)
427-
428-
def runs_from(self,
429-
phenomenon_name: str,
430-
pattern_name: str) -> Tuple[BoboRun, ...]:
431-
"""
432-
:param phenomenon_name: A phenomenon name.
433-
:param pattern_name: A pattern name.
434-
:return: The runs associated with the given
435-
phenomenon and pattern name.
436-
"""
437-
with self._lock:
438-
if (
439-
phenomenon_name in self._runs and
440-
pattern_name in self._runs[phenomenon_name]
441-
):
442-
return tuple(
443-
*self._runs[phenomenon_name][pattern_name].values()
444-
)
445-
return tuple()
446-
447-
def run_at(self,
448-
phenomenon_name: str,
449-
pattern_name: str,
450-
run_id: str) -> Optional[BoboRun]:
451-
"""
452-
:param phenomenon_name: A phenomenon name.
453-
:param pattern_name: A pattern name.
454-
:param run_id: A run ID.
455-
:return: A run associated with the given phenomenon and pattern name;
456-
or None if no such run exists.
457-
"""
458-
with self._lock:
459-
if (
460-
phenomenon_name in self._runs and
461-
pattern_name in self._runs[phenomenon_name] and
462-
run_id in self._runs[phenomenon_name][pattern_name]
463-
):
464-
return self._runs[phenomenon_name][pattern_name][run_id]
465-
return None
466-
467-
def close(self) -> None:
468-
"""
469-
Closes the Decider.
470-
"""
471-
with self._lock:
472-
self._closed = True
473-
474-
def is_closed(self) -> bool:
475-
"""
476-
:return: `True` if decider is set to close; `False` otherwise.
477-
"""
478-
with self._lock:
479-
return self._closed
480-
481512
def _process_event(self, data: Any, timestamp: int) -> \
482513
Tuple[List[BoboRun], List[BoboRun], List[BoboRun]]:
483514
"""

0 commit comments

Comments
 (0)