Skip to content

Commit c869613

Browse files
an event based turbidostat
1 parent 875437b commit c869613

File tree

3 files changed

+332
-0
lines changed

3 files changed

+332
-0
lines changed

core/pioreactor/automations/dosing/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@
88
from .fed_batch import FedBatch
99
from .pid_morbidostat import PIDMorbidostat
1010
from .silent import Silent as DosingSilent
11+
from .turbidostat import EventBasedTurbidostat
1112
from .turbidostat import Turbidostat

core/pioreactor/automations/dosing/turbidostat.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
# -*- coding: utf-8 -*-
2+
from threading import Lock
3+
from threading import Thread
24
from typing import cast
35
from typing import Optional
46

@@ -138,3 +140,186 @@ def _execute_target_nod(self) -> Optional[events.DilutionEvent]:
138140
)
139141
else:
140142
return None
143+
144+
145+
class EventBasedTurbidostat(DosingAutomationJob):
146+
"""
147+
Event-based turbidostat - react immediately to new OD messages instead of polling on a timer.
148+
149+
Experimental!
150+
151+
Algorithm:
152+
1. Subscribe to OD updates via AutomationJob callbacks.
153+
2. On each new OD (or nOD) message, update internal readings.
154+
3. If targeting that signal, attempt to acquire a lock and run once.
155+
4. If a run is already in progress, skip this trigger (no queuing).
156+
"""
157+
158+
automation_name = "turbidostat_event"
159+
published_settings = {
160+
"exchange_volume_ml": {"datatype": "float", "settable": True, "unit": "mL"},
161+
"target_normalized_od": {"datatype": "float", "settable": True, "unit": "AU"},
162+
"target_od": {"datatype": "float", "settable": True, "unit": "OD"},
163+
"duration": {"datatype": "float", "settable": False, "unit": "min"},
164+
}
165+
target_od = None
166+
target_normalized_od = None
167+
168+
def __init__(
169+
self,
170+
exchange_volume_ml: float | str,
171+
target_normalized_od: Optional[float | str] = None,
172+
target_od: Optional[float | str] = None,
173+
**kwargs,
174+
) -> None:
175+
self._run_lock = Lock() # serialize event-triggered runs across MQTT callbacks
176+
self._event_based_ready = False # guard against callbacks firing before init completes
177+
super().__init__(**kwargs)
178+
179+
with local_persistent_storage("active_calibrations") as cache:
180+
if "media_pump" not in cache:
181+
raise CalibrationError("Media pump calibration must be performed first.")
182+
elif "waste_pump" not in cache:
183+
raise CalibrationError("Waste pump calibration must be performed first.")
184+
185+
if target_normalized_od is not None and target_od is not None:
186+
raise ValueError("Only provide target nOD or target OD, not both.")
187+
elif target_normalized_od is None and target_od is None:
188+
raise ValueError("Provide a target nOD or target OD.")
189+
190+
if target_normalized_od is not None:
191+
self.target_normalized_od = float(target_normalized_od)
192+
elif target_od is not None:
193+
self.target_od = float(target_od)
194+
195+
self.exchange_volume_ml = float(exchange_volume_ml)
196+
self.ema_od = ExponentialMovingAverage(
197+
config.getfloat("turbidostat.config", "od_smoothing_ema", fallback=0.5)
198+
)
199+
self._event_based_ready = True
200+
201+
def set_duration(self, value: float | None) -> None:
202+
# disable periodic polling in event-based mode
203+
self.duration = 0.0
204+
try:
205+
self.run_thread.cancel() # type: ignore[attr-defined]
206+
except AttributeError:
207+
pass
208+
209+
@property
210+
def is_targeting_nOD(self) -> bool:
211+
return self.target_normalized_od is not None
212+
213+
@property
214+
def _od_channel(self) -> pt.PdChannel:
215+
return cast(
216+
pt.PdChannel,
217+
config.get("turbidostat.config", "signal_channel", fallback="2"),
218+
)
219+
220+
def execute(self) -> Optional[events.DilutionEvent]:
221+
if self.is_targeting_nOD:
222+
return self._execute_target_nod()
223+
else:
224+
return self._execute_target_od()
225+
226+
def set_target_normalized_od(self, new_target: float) -> None:
227+
if not self.is_targeting_nOD:
228+
self.logger.warning("You are currently targeting OD, and can only change that.")
229+
else:
230+
self.target_normalized_od = float(new_target)
231+
232+
def set_target_od(self, new_target: float) -> None:
233+
if self.is_targeting_nOD:
234+
self.logger.warning("You are currently targeting nOD, and can only change that.")
235+
else:
236+
self.target_od = float(new_target)
237+
238+
def _execute_target_od(self) -> Optional[events.DilutionEvent]:
239+
assert self.target_od is not None
240+
smoothed_od = self.ema_od.update(self.latest_od[self._od_channel])
241+
if smoothed_od >= self.target_od:
242+
self.ema_od.clear()
243+
latest_od_before_dosing = smoothed_od
244+
target_od_before_dosing = self.target_od
245+
246+
results = self.execute_io_action(
247+
media_ml=self.exchange_volume_ml, waste_ml=self.exchange_volume_ml
248+
)
249+
250+
data = {
251+
"latest_od": latest_od_before_dosing,
252+
"target_od": target_od_before_dosing,
253+
"exchange_volume_ml": self.exchange_volume_ml,
254+
"volume_actually_moved_ml": results["media_ml"],
255+
}
256+
257+
return events.DilutionEvent(
258+
f"Latest OD = {latest_od_before_dosing:.2f} ≥ Target OD = {target_od_before_dosing:.2f}; cycled {results['media_ml']:.2f} mL",
259+
data,
260+
)
261+
else:
262+
return None
263+
264+
def _execute_target_nod(self) -> Optional[events.DilutionEvent]:
265+
assert self.target_normalized_od is not None
266+
if self.latest_normalized_od >= self.target_normalized_od:
267+
latest_normalized_od_before_dosing = self.latest_normalized_od
268+
target_normalized_od_before_dosing = self.target_normalized_od
269+
results = self.execute_io_action(
270+
media_ml=self.exchange_volume_ml, waste_ml=self.exchange_volume_ml
271+
)
272+
273+
data = {
274+
"latest_normalized_od": latest_normalized_od_before_dosing,
275+
"target_normalized_od": target_normalized_od_before_dosing,
276+
"exchange_volume_ml": self.exchange_volume_ml,
277+
"volume_actually_moved_ml": results["media_ml"],
278+
}
279+
280+
return events.DilutionEvent(
281+
f"Latest Normalized OD = {latest_normalized_od_before_dosing:.2f} ≥ Target nOD = {target_normalized_od_before_dosing:.2f}; cycled {results['media_ml']:.2f} mL",
282+
data,
283+
)
284+
else:
285+
return None
286+
287+
def _queue_event_based_run(self) -> None:
288+
if not self._event_based_ready:
289+
return
290+
291+
if self.state != self.READY:
292+
return
293+
294+
if not self._run_lock.acquire(blocking=False):
295+
return
296+
297+
def _runner() -> None:
298+
try:
299+
self.run(timeout=5.0)
300+
finally:
301+
self._run_lock.release()
302+
303+
Thread(target=_runner, daemon=True).start()
304+
305+
def _set_ods(self, message: pt.MQTTMessage) -> None:
306+
if not message.payload:
307+
return
308+
309+
super()._set_ods(message)
310+
311+
if self.is_targeting_nOD or message.retain:
312+
return
313+
314+
self._queue_event_based_run()
315+
316+
def _set_normalized_od(self, message: pt.MQTTMessage) -> None:
317+
if not message.payload:
318+
return
319+
320+
super()._set_normalized_od(message)
321+
322+
if (not self.is_targeting_nOD) or message.retain:
323+
return
324+
325+
self._queue_event_based_run()

core/tests/test_turbidostat.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
# -*- coding: utf-8 -*-
2+
import time
3+
from datetime import datetime
4+
from datetime import timezone
5+
from typing import Callable
6+
7+
from msgspec.json import encode
8+
from pioreactor import structs
9+
from pioreactor.automations.dosing.turbidostat import EventBasedTurbidostat
10+
from pioreactor.utils import get_running_pio_job_id
11+
from pioreactor.utils.job_manager import JobManager
12+
from pioreactor.utils.timing import current_utc_datetime
13+
from pioreactor.whoami import get_unit_name
14+
15+
16+
unit = get_unit_name()
17+
18+
19+
def wait_for(predicate: Callable[[], bool], timeout: float = 5.0, check_interval: float = 0.01) -> bool:
20+
deadline = time.time() + timeout
21+
while time.time() < deadline:
22+
try:
23+
if predicate():
24+
return True
25+
except Exception:
26+
pass
27+
time.sleep(check_interval)
28+
return False
29+
30+
31+
class DummyMessage:
32+
def __init__(self, payload: bytes, retain: bool = False, topic: str = "") -> None:
33+
self.payload = payload
34+
self.retain = retain
35+
self.topic = topic
36+
37+
38+
def setup_function() -> None:
39+
job_id = get_running_pio_job_id("dosing_automation")
40+
if job_id is not None:
41+
with JobManager() as jm:
42+
jm.set_not_running(job_id)
43+
44+
cal = structs.SimplePeristalticPumpCalibration(
45+
calibration_name="setup_function",
46+
curve_data_=structs.PolyFitCoefficients(coefficients=[1.0, 0.0]),
47+
recorded_data={"x": [], "y": []},
48+
dc=60,
49+
hz=100,
50+
created_at=datetime(2010, 1, 1, tzinfo=timezone.utc),
51+
voltage=-1.0,
52+
calibrated_on_pioreactor_unit=unit,
53+
)
54+
cal.set_as_active_calibration_for_device("media_pump")
55+
cal.set_as_active_calibration_for_device("alt_media_pump")
56+
cal.set_as_active_calibration_for_device("waste_pump")
57+
58+
59+
def test_event_based_turbidostat_targeting_nod() -> None:
60+
experiment = "test_event_based_turbidostat_targeting_nod"
61+
target_nod = 1.0
62+
queued_runs = {"count": 0}
63+
64+
def queue_run() -> None:
65+
queued_runs["count"] += 1
66+
67+
with EventBasedTurbidostat(
68+
target_normalized_od=target_nod,
69+
exchange_volume_ml=0.1,
70+
unit=unit,
71+
experiment=experiment,
72+
) as algo:
73+
assert wait_for(lambda: algo.state == algo.READY, timeout=5.0)
74+
algo._queue_event_based_run = queue_run # type: ignore[method-assign]
75+
76+
algo._set_normalized_od(
77+
DummyMessage(
78+
payload=encode(structs.ODFiltered(od_filtered=1.05, timestamp=current_utc_datetime()))
79+
)
80+
)
81+
assert queued_runs["count"] == 1
82+
83+
algo._set_ods(
84+
DummyMessage(
85+
payload=encode(
86+
structs.ODReadings(
87+
timestamp=current_utc_datetime(),
88+
ods={
89+
"2": structs.RawODReading(
90+
ir_led_intensity=80.0,
91+
timestamp=current_utc_datetime(),
92+
angle="45",
93+
od=0.5,
94+
channel="2",
95+
)
96+
},
97+
)
98+
)
99+
)
100+
)
101+
assert queued_runs["count"] == 1
102+
103+
104+
def test_event_based_turbidostat_targeting_od() -> None:
105+
experiment = "test_event_based_turbidostat_targeting_od"
106+
target_od = 0.1
107+
queued_runs = {"count": 0}
108+
109+
def queue_run() -> None:
110+
queued_runs["count"] += 1
111+
112+
with EventBasedTurbidostat(
113+
target_od=target_od,
114+
exchange_volume_ml=0.1,
115+
unit=unit,
116+
experiment=experiment,
117+
) as algo:
118+
assert wait_for(lambda: algo.state == algo.READY, timeout=5.0)
119+
algo._queue_event_based_run = queue_run # type: ignore[method-assign]
120+
121+
algo._set_ods(
122+
DummyMessage(
123+
payload=encode(
124+
structs.ODReadings(
125+
timestamp=current_utc_datetime(),
126+
ods={
127+
"2": structs.RawODReading(
128+
ir_led_intensity=80.0,
129+
timestamp=current_utc_datetime(),
130+
angle="45",
131+
od=0.5,
132+
channel="2",
133+
)
134+
},
135+
)
136+
)
137+
)
138+
)
139+
assert queued_runs["count"] == 1
140+
141+
algo._set_normalized_od(
142+
DummyMessage(
143+
payload=encode(structs.ODFiltered(od_filtered=1.2, timestamp=current_utc_datetime()))
144+
)
145+
)
146+
assert queued_runs["count"] == 1

0 commit comments

Comments
 (0)