Skip to content

Commit abfaf39

Browse files
committed
back to list to prevent having to change everything for now...
1 parent ac21cfd commit abfaf39

File tree

4 files changed

+40
-27
lines changed

4 files changed

+40
-27
lines changed

bobocep/cep/engine/decider/decider.py

Lines changed: 33 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -228,12 +228,12 @@ def update(self) -> bool:
228228
rl_completed, rl_halted, rl_updated = \
229229
self._process_event(self._queue.get_nowait())
230230

231-
completed: Tuple[BoboRunSerial, ...] = \
232-
tuple(run_c.serialize() for run_c in rl_completed)
233-
halted: Tuple[BoboRunSerial, ...] = \
234-
tuple(run_h.serialize() for run_h in rl_halted)
235-
updated: Tuple[BoboRunSerial, ...] = \
236-
tuple(run_u.serialize() for run_u in rl_updated)
231+
completed: List[BoboRunSerial] = \
232+
[run_c.serialize() for run_c in rl_completed]
233+
halted: List[BoboRunSerial] = \
234+
[run_h.serialize() for run_h in rl_halted]
235+
updated: List[BoboRunSerial] = \
236+
[run_u.serialize() for run_u in rl_updated]
237237

238238
# Cache local changes
239239
self._maybe_cache(completed, halted)
@@ -268,7 +268,11 @@ def snapshot(self) -> Tuple[
268268
if self._closed:
269269
return [], [], []
270270

271-
if self._caching:
271+
if (
272+
self._caching and
273+
self._cache_completed is not None and
274+
self._cache_halted is not None
275+
):
272276
# Get completed from cache
273277
r_completed = [c for c in self._cache_completed]
274278

@@ -290,9 +294,9 @@ def snapshot(self) -> Tuple[
290294

291295
def on_distributed_update(
292296
self,
293-
completed: Tuple[BoboRunSerial, ...],
294-
halted: Tuple[BoboRunSerial, ...],
295-
updated: Tuple[BoboRunSerial, ...]) -> None:
297+
completed: List[BoboRunSerial],
298+
halted: List[BoboRunSerial],
299+
updated: List[BoboRunSerial]) -> None:
296300
"""
297301
:param completed: Completed runs.
298302
:param halted: Halted runs.
@@ -305,7 +309,10 @@ def on_distributed_update(
305309
remove_indices_completed = []
306310
remove_indices_halted = []
307311
remove_indices_updated = []
308-
runlocal: Optional[BoboRun] = None # here because of mypy...
312+
313+
# here because of mypy...
314+
runlocal: Optional[BoboRun] = None
315+
runs: Optional[List[BoboRun]] = None
309316

310317
# Remove any invalid remote changes
311318
completed, halted, updated = \
@@ -335,7 +342,7 @@ def on_distributed_update(
335342

336343
continue
337344

338-
runs: List[BoboRun] = self.runs_pattern(
345+
runs = self.runs_pattern(
339346
runremote.phenomenon_name,
340347
pattern.name
341348
)
@@ -379,7 +386,7 @@ def on_distributed_update(
379386

380387
if pattern.singleton:
381388
# If singleton, use active run if exists...
382-
runs: List[BoboRun] = self.runs_pattern(
389+
runs = self.runs_pattern(
383390
runremote.phenomenon_name,
384391
pattern.name
385392
)
@@ -429,10 +436,6 @@ def on_distributed_update(
429436
del remlist[i]
430437

431438
# Notify subscribers
432-
completed = tuple(completed)
433-
halted = tuple(halted)
434-
updated = tuple(updated)
435-
436439
for subscriber in self._subscribers:
437440
subscriber.on_decider_update(
438441
completed=completed,
@@ -630,7 +633,11 @@ def _maybe_cache(
630633
:param completed: Completed runs.
631634
:param halted: Halted runs.
632635
"""
633-
if self._caching:
636+
if (
637+
self._caching and
638+
self._cache_completed is not None and
639+
self._cache_halted is not None
640+
):
634641
# Cache runs that have been locally completed
635642
for c in completed:
636643
self._cache_completed.append(c)
@@ -641,9 +648,9 @@ def _maybe_cache(
641648

642649
def _maybe_check_against_cache(
643650
self,
644-
completed: Tuple[BoboRunSerial, ...],
645-
halted: Tuple[BoboRunSerial, ...],
646-
updated: Tuple[BoboRunSerial, ...]) \
651+
completed: List[BoboRunSerial],
652+
halted: List[BoboRunSerial],
653+
updated: List[BoboRunSerial]) \
647654
-> Tuple[
648655
List[BoboRunSerial],
649656
List[BoboRunSerial],
@@ -661,7 +668,11 @@ def _maybe_check_against_cache(
661668
(2) halted runs kept if not halted locally; and
662669
(3) updated runs kept if not completed or halted locally.
663670
"""
664-
if self._caching:
671+
if (
672+
self._caching and
673+
self._cache_completed is not None and
674+
self._cache_halted is not None
675+
):
665676
# Keep completed IDs if not completed locally
666677
# Complete takes precedent over halt and update
667678
completed = [

bobocep/cep/engine/decider/pubsub.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ class BoboDeciderSubscriber(ABC):
2020
@abstractmethod
2121
def on_decider_update(
2222
self,
23-
completed: Tuple[BoboRunSerial, ...],
24-
halted: Tuple[BoboRunSerial, ...],
25-
updated: Tuple[BoboRunSerial, ...],
23+
completed: List[BoboRunSerial],
24+
halted: List[BoboRunSerial],
25+
updated: List[BoboRunSerial],
2626
local: bool
2727
) -> None:
2828
"""

bobocep/dist/devman.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,9 @@ def clear_last(self) -> None:
138138
self._last_attempt = 0
139139

140140
def stash(self) -> Tuple[
141-
List[BoboRunSerial], List[BoboRunSerial], List[BoboRunSerial]
141+
List[BoboRunSerial],
142+
List[BoboRunSerial],
143+
List[BoboRunSerial]
142144
]:
143145
"""
144146
:return: The device's stash.

bobocep/setup/simple.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(
4949
super().__init__()
5050

5151
self._phenomena: List[BoboPhenomenon] = phenomena
52-
self._validator: BoboValidator = validator
52+
self._validator: Optional[BoboValidator] = validator
5353
self._handler: BoboActionHandler = handler
5454
self._gen_event: Optional[BoboGenEvent] = gen_event
5555
self._urn: Optional[str] = urn

0 commit comments

Comments
 (0)