Skip to content

Commit c37a8b8

Browse files
committed
inefficient...
1 parent 3e1d819 commit c37a8b8

File tree

2 files changed

+66
-88
lines changed

2 files changed

+66
-88
lines changed

bobocep/cep/engine/engine.py

Lines changed: 66 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
CEP engine.
77
"""
88

9-
from queue import Queue
9+
from queue import Queue, Full as QueueFull
1010
from threading import RLock
1111
from typing import Any, List, Optional, Tuple
1212

@@ -25,6 +25,7 @@
2525

2626
_EXC_QUEUE_FULL_IN = "incoming queue is full (max size: {})"
2727
_EXC_QUEUE_FULL_OUT = "outgoing queue is full (max size: {})"
28+
_EXC_RUNSERIAL_PHENOM_N0T_FOUND = "run serial phenomenon name {} not found"
2829

2930

3031
class BoboEngineError(BoboError):
@@ -46,8 +47,7 @@ def __init__(
4647
gen_data: Optional[BoboGenData] = None,
4748
max_size_in: int = 0,
4849
max_size_out: int = 0,
49-
fail_on_queue_full: bool = False,
50-
empty_queue_priority: bool = True,
50+
fail_on_queue_full: bool = True,
5151
actions_on_local_only: bool = True
5252
):
5353
super().__init__()
@@ -62,10 +62,11 @@ def __init__(
6262
self._gen_data: Optional[BoboGenData] = gen_data
6363

6464
self._queue_in: Queue[Any] = Queue(max(0, max_size_in))
65-
self._queue_out: Queue[Tuple[BoboRunSerial, bool]] = Queue(max(0, max_size_out))
65+
self._queue_out: Queue[Tuple[BoboRunSerial, bool]] = (
66+
Queue(max(0, max_size_out))
67+
)
6668

6769
self._fail_on_queue_full: bool = fail_on_queue_full
68-
self._empty_queue_priority: bool = empty_queue_priority
6970
self._local_only: bool = actions_on_local_only
7071

7172
self._decider.subscribe(self)
@@ -103,29 +104,8 @@ def run(self) -> None:
103104
"""
104105
Runs the engine until it is closed. This is a blocking operation.
105106
"""
106-
while True:
107-
with self._lock:
108-
if self._closed:
109-
return
110-
111-
self.update()
112-
113-
def add_data(self, data: Any) -> None:
114-
"""
115-
:param data: Data to add to the incoming queue.
116-
117-
:raises BoboEngineError: Incoming queue is full.
118-
"""
119-
with self._lock:
120-
if self._closed:
121-
return
122-
123-
if not self._queue_in.full():
124-
self._queue_in.put(data)
125-
126-
elif self._fail_on_queue_full:
127-
raise BoboEngineError(
128-
_EXC_QUEUE_FULL_IN.format(self._queue_in.maxsize))
107+
while self.update():
108+
pass
129109

130110
def update(self) -> bool:
131111
"""
@@ -141,56 +121,35 @@ def update(self) -> bool:
141121
if self._closed:
142122
return False
143123

144-
# Receiver
145-
self._rec_update()
146-
if self._empty_queue_priority:
147-
while self._rec_update():
148-
pass
149-
150-
# Producer
151-
self._pro_update()
152-
if self._empty_queue_priority:
153-
while not self._queue_out.empty():
154-
self._pro_update()
124+
self._rec_dec_process()
125+
self._pro_process()
126+
self._for_process()
155127

156-
# Forwarder
157-
self._for_update()
158-
if self._empty_queue_priority:
159-
while not self._handler.empty():
160-
self._for_update()
161-
162-
return True
163-
164-
def _rec_update(self) -> bool:
165-
data_in = None
166-
167-
# Process data from incoming queue
168-
if not self._queue_in.empty():
169-
data_in = self._queue_in.get_nowait()
170-
self._rec_update_process(data_in)
171-
172-
# Process data from data generator
173-
if self._gen_data is not None:
174-
data_gen = self._gen_data.maybe_generate()
128+
return True
175129

176-
if data_gen is not None:
177-
self._rec_update_process(data_gen)
130+
def add_data(self, data: Any) -> None:
131+
"""
132+
:param data: Data to add to the incoming queue.
178133
179-
return data_in is not None
134+
:raises BoboEngineError: Incoming queue is full.
135+
"""
136+
with self._lock:
137+
if self._closed:
138+
return
180139

181-
def _rec_update_process(self, data: Any) -> bool:
182140
# Ensure data are valid
183-
if self._validator is not None and not self._validator.is_valid(data):
184-
for s in self._subscribers:
185-
s.on_invalid_data_received(data)
186-
187-
return False
188-
189-
for s in self._subscribers:
190-
s.on_valid_data_received(data)
191-
192-
# Pass to decider
193-
return self.decider.process(data)
141+
if (
142+
self._validator is not None and
143+
not self._validator.is_valid(data)
144+
):
145+
return
146+
147+
try:
148+
self._queue_in.put_nowait(data)
149+
except QueueFull:
150+
if self._fail_on_queue_full:
151+
raise BoboEngineError(
152+
_EXC_QUEUE_FULL_IN.format(self._queue_in.maxsize))
194153

195154
def on_decider_update(
196155
self,
@@ -202,14 +161,37 @@ def on_decider_update(
202161
if self._closed:
203162
return
204163

205-
for run in completed:
206-
if not self._queue_out.full():
207-
self._queue_out.put((run, local))
208-
else:
209-
raise BoboEngineError(
210-
_EXC_QUEUE_FULL_OUT.format(self._queue_out.maxsize))
164+
for runserial in completed:
165+
try:
166+
self._queue_out.put_nowait((runserial, local))
167+
except QueueFull:
168+
if self._fail_on_queue_full:
169+
raise BoboEngineError(
170+
_EXC_QUEUE_FULL_OUT.format(self._queue_out.maxsize))
171+
172+
def _rec_dec_process(self) -> bool:
173+
data_in = None
174+
data_gen = None
211175

212-
def _pro_update(self) -> bool:
176+
# Process data from incoming queue
177+
if not self._queue_in.empty():
178+
data_in = self._queue_in.get_nowait()
179+
180+
for s in self._subscribers:
181+
s.on_valid_data_received(data_in)
182+
183+
self.decider.process(data_in)
184+
185+
# Process data from data generator
186+
if self._gen_data is not None:
187+
data_gen = self._gen_data.maybe_generate()
188+
189+
if data_gen is not None:
190+
self.decider.process(data_gen)
191+
192+
return data_in is not None or data_gen is not None
193+
194+
def _pro_process(self) -> bool:
213195
# Process data from outgoing queue
214196
if not self._queue_out.empty():
215197
runserial, local = self._queue_out.get_nowait()
@@ -218,7 +200,9 @@ def _pro_update(self) -> bool:
218200
runserial.phenomenon_name)
219201

220202
if phenom is None:
221-
raise BoboEngineError(runserial.phenomenon_name)
203+
raise BoboEngineError(
204+
_EXC_RUNSERIAL_PHENOM_N0T_FOUND.format(
205+
runserial.phenomenon_name))
222206

223207
event = BoboEventComplex(
224208
event_id=self._decider.gen_event_id.generate(),
@@ -241,7 +225,7 @@ def _pro_update(self) -> bool:
241225

242226
return False
243227

244-
def _for_update(self) -> bool:
228+
def _for_process(self) -> bool:
245229
hres: Optional[BoboHandlerResponse] = \
246230
self._handler.get_handler_response()
247231

bobocep/cep/engine/pubsub.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ def on_valid_data_received(self, data: Any) -> None:
2121
:param data: Valid data received by engine.
2222
"""
2323

24-
@abstractmethod
25-
def on_invalid_data_received(self, data: Any) -> None:
26-
"""
27-
:param data: Invalid data received by engine.
28-
"""
29-
3024

3125
class BoboEnginePublisher(ABC):
3226
"""

0 commit comments

Comments
 (0)