Skip to content

Commit ea8ab0b

Browse files
run mqtt publish in another thread so we aren't blocking the main checks
1 parent a25f17c commit ea8ab0b

File tree

2 files changed

+86
-60
lines changed

2 files changed

+86
-60
lines changed

pioreactor/actions/pump.py

Lines changed: 83 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
from pioreactor.logging import create_logger
2424
from pioreactor.logging import CustomLogger
2525
from pioreactor.pubsub import Client
26-
from pioreactor.pubsub import QOS
2726
from pioreactor.types import PumpCalibrationDevices
2827
from pioreactor.utils.pwm import PWM
2928
from pioreactor.utils.timing import catchtime
@@ -53,7 +52,7 @@ def is_default_calibration(cal: structs.SimplePeristalticPumpCalibration):
5352

5453
# Initialize the thread pool with a worker threads.
5554
# a pool is needed to avoid eventual memory overflow when multiple threads are created and allocated over time.
56-
_thread_pool = ThreadPoolExecutor(max_workers=3) # one for each pump
55+
_thread_pool = ThreadPoolExecutor(max_workers=3)
5756

5857

5958
class PWMPump:
@@ -105,6 +104,7 @@ def by_volume(self, ml: pt.mL, block: bool = True) -> None:
105104
if ml < 0:
106105
raise ValueError("ml must be greater than or equal to 0")
107106
if ml == 0:
107+
self.stop()
108108
return
109109
seconds = self.ml_to_duration(ml)
110110
self.by_duration(seconds, block=block)
@@ -113,6 +113,7 @@ def by_duration(self, seconds: pt.Seconds, block: bool = True) -> None:
113113
if seconds < 0:
114114
raise ValueError("seconds must be >= 0")
115115
if seconds == 0:
116+
self.stop() # need to set the interrupt!
116117
return
117118
if block:
118119
self.start(self.calibration.dc)
@@ -151,26 +152,8 @@ def _get_calibration(pump_device: PumpCalibrationDevices) -> structs.SimplePeris
151152
return cal
152153

153154

154-
def _publish_pump_action(
155-
pump_action: str,
156-
ml: pt.mL,
157-
unit: str,
158-
experiment: str,
159-
mqtt_client: Client,
160-
source_of_event: Optional[str] = None,
161-
) -> structs.DosingEvent:
162-
dosing_event = structs.DosingEvent(
163-
volume_change=ml,
164-
event=pump_action,
165-
source_of_event=source_of_event,
166-
timestamp=current_utc_datetime(),
167-
)
168-
169-
mqtt_client.publish(
170-
f"pioreactor/{unit}/{experiment}/dosing_events",
171-
encode(dosing_event),
172-
)
173-
return dosing_event
155+
def publish_async(client, topic, payload, **kwargs):
156+
_thread_pool.submit(client.publish, topic, payload, **kwargs)
174157

175158

176159
def _to_human_readable_action(
@@ -309,52 +292,56 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:
309292
duration = pt.Seconds(duration)
310293
ml = pt.mL(ml)
311294

312-
dosing_event = structs.DosingEvent(
295+
empty_dosing_event = structs.DosingEvent(
313296
volume_change=0.0,
314297
event=action_name,
315298
source_of_event=source_of_event,
316299
timestamp=current_utc_datetime(),
317300
)
318301

319302
if manually:
320-
dosing_event = replace(dosing_event, volume_change=ml)
321-
303+
publish_async(
304+
mqtt_client,
305+
f"pioreactor/{unit}/{experiment}/dosing_events",
306+
encode(replace(empty_dosing_event, volume_change=ml)),
307+
)
322308
return 0.0
323309

324310
with PWMPump(
325311
unit, experiment, pin, calibration=calibration, mqtt_client=mqtt_client, logger=logger
326312
) as pump:
313+
sub_duration = 1.0
314+
volume_moved_ml = 0.0
315+
327316
pump_start_time = time.monotonic()
328317

329318
if not continuously:
330-
sub_duration = 1.0
331-
volume_moved_ml = 0.0
332-
333319
pump.by_duration(duration, block=False) # start pump
334320

335321
while not pump.interrupt.is_set():
336322
sub_volume_moved_ml = 0.0
337323
time_left = duration - (time.monotonic() - pump_start_time)
338-
339-
if time_left < 0:
324+
if time_left <= 0:
325+
# this is an edge case where the time has surpassed, but the interrupt isn't set yet.
340326
pump.interrupt.wait()
341327
break
342328

343-
elif time_left >= sub_duration > 0:
329+
elif time_left >= sub_duration:
344330
sub_volume_moved_ml = pump.duration_to_ml(sub_duration)
345-
dosing_event = replace(
346-
dosing_event, timestamp=current_utc_datetime(), volume_change=sub_volume_moved_ml
347-
)
348-
volume_moved_ml += sub_volume_moved_ml
349331

350-
elif sub_duration > time_left > 0:
332+
elif sub_duration > time_left:
333+
# last remaining bit.
351334
sub_volume_moved_ml = ml - volume_moved_ml
352-
dosing_event = replace(
353-
dosing_event, timestamp=current_utc_datetime(), volume_change=sub_volume_moved_ml
354-
)
355-
volume_moved_ml += sub_volume_moved_ml
356335

357-
mqtt_client.publish(
336+
dosing_event = replace(
337+
empty_dosing_event,
338+
timestamp=current_utc_datetime(),
339+
volume_change=sub_volume_moved_ml,
340+
)
341+
volume_moved_ml += sub_volume_moved_ml
342+
343+
publish_async(
344+
mqtt_client,
358345
f"pioreactor/{unit}/{experiment}/dosing_events",
359346
encode(dosing_event),
360347
)
@@ -364,40 +351,70 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:
364351
pump_stop_time = time.monotonic()
365352

366353
# ended early. We should calculate how much _wasnt_ added, and update that.
367-
logger.info(f"Stopped {pump_device} early.")
368354
actual_volume_moved_ml = pump.duration_to_ml(pump_stop_time - pump_start_time)
369355
correction_factor = (
370356
actual_volume_moved_ml - volume_moved_ml
371357
) # reported too much since we log first before dosing
372358

373359
dosing_event = replace(
374-
dosing_event, timestamp=current_utc_datetime(), volume_change=correction_factor
360+
empty_dosing_event,
361+
timestamp=current_utc_datetime(),
362+
volume_change=correction_factor,
375363
)
376-
mqtt_client.publish(
364+
publish_async(
365+
mqtt_client,
377366
f"pioreactor/{unit}/{experiment}/dosing_events",
378367
encode(dosing_event),
379368
)
380369

370+
logger.info(f"Stopped {pump_device} early.")
381371
return actual_volume_moved_ml
382372

383373
return volume_moved_ml
384374

385375
else:
386-
# continously path
387-
pump.continuously(block=False)
388-
389-
# we only break out of this while loop via a interrupt or MQTT signal => event.set()
390-
while not state.exit_event.wait(duration):
391-
# republish information
392-
dosing_event = replace(dosing_event, timestamp=current_utc_datetime())
393-
mqtt_client.publish(
376+
pump.continuously(block=False) # start pump
377+
378+
while True:
379+
sub_volume_moved_ml = pump.duration_to_ml(sub_duration)
380+
381+
dosing_event = replace(
382+
empty_dosing_event,
383+
timestamp=current_utc_datetime(),
384+
volume_change=sub_volume_moved_ml,
385+
)
386+
volume_moved_ml += sub_volume_moved_ml
387+
388+
publish_async(
389+
mqtt_client,
394390
f"pioreactor/{unit}/{experiment}/dosing_events",
395391
encode(dosing_event),
396392
)
397-
pump.stop()
398-
logger.info(f"Stopped {pump_device}.")
399393

400-
return ml
394+
if state.exit_event.wait(sub_duration):
395+
# this is the only way it stops?
396+
pump.interrupt.set()
397+
pump_stop_time = time.monotonic()
398+
399+
actual_volume_moved_ml = pump.duration_to_ml(pump_stop_time - pump_start_time)
400+
401+
correction_factor = (
402+
actual_volume_moved_ml - volume_moved_ml
403+
) # reported too much since we log first before dosing
404+
405+
dosing_event = replace(
406+
empty_dosing_event,
407+
timestamp=current_utc_datetime(),
408+
volume_change=correction_factor,
409+
)
410+
publish_async(
411+
mqtt_client,
412+
f"pioreactor/{unit}/{experiment}/dosing_events",
413+
encode(dosing_event),
414+
)
415+
416+
logger.info(f"Stopped {pump_device}.")
417+
return actual_volume_moved_ml
401418

402419

403420
def _liquid_circulation(
@@ -485,7 +502,17 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:
485502
logger.info("Running waste continuously.")
486503

487504
# assume they run it long enough such that the waste efflux position is reached.
488-
_publish_pump_action("remove_waste", 20, unit, experiment, mqtt_client, source_of_event)
505+
dosing_event = structs.DosingEvent(
506+
volume_change=20,
507+
event="remove_waste",
508+
source_of_event=source_of_event,
509+
timestamp=current_utc_datetime(),
510+
)
511+
publish_async(
512+
mqtt_client,
513+
f"pioreactor/{unit}/{experiment}/dosing_events",
514+
encode(dosing_event),
515+
)
489516

490517
with catchtime() as running_waste_duration:
491518
waste_pump.continuously(block=False)

pioreactor/tests/test_pumps.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,9 @@ def collect_updates(msg):
150150
resulting_ml = t.join()
151151

152152
assert resulting_ml < expected_ml
153-
assert len(volume_updates) == 2
154-
print(volume_updates[0])
155-
assert volume_updates[0]["volume_change"] == expected_ml
156-
assert -expected_ml < volume_updates[1]["volume_change"] < 0 # fire off a negative volume change
153+
154+
assert volume_updates[0]["volume_change"] > 0
155+
assert -expected_ml < volume_updates[-1]["volume_change"] < 0 # fire off a negative volume change
157156

158157

159158
def test_continuously_running_pump_will_disconnect_via_mqtt() -> None:

0 commit comments

Comments
 (0)