@@ -169,7 +169,6 @@ def _publish_pump_action(
169169 mqtt_client .publish (
170170 f"pioreactor/{ unit } /{ experiment } /dosing_events" ,
171171 encode (dosing_event ),
172- qos = QOS .EXACTLY_ONCE ,
173172 )
174173 return dosing_event
175174
@@ -310,29 +309,79 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:
310309 duration = pt .Seconds (duration )
311310 ml = pt .mL (ml )
312311
312+ dosing_event = structs .DosingEvent (
313+ volume_change = 0.0 ,
314+ event = action_name ,
315+ source_of_event = source_of_event ,
316+ timestamp = current_utc_datetime (),
317+ )
318+
313319 if manually :
314- _publish_pump_action ( action_name , ml , unit , experiment , mqtt_client , source_of_event )
320+ dosing_event = replace ( dosing_event , volume_change = ml )
315321
316322 return 0.0
317323
318324 with PWMPump (
319325 unit , experiment , pin , calibration = calibration , mqtt_client = mqtt_client , logger = logger
320326 ) as pump :
321- # publish this first, as downstream jobs need to know about it.
322- dosing_event = _publish_pump_action (
323- action_name , ml , unit , experiment , mqtt_client , source_of_event
324- )
325-
326327 pump_start_time = time .monotonic ()
327328
328329 if not continuously :
329- pump .by_duration (duration , block = False )
330- # how does this work? What's up with the (or True)?
331- # exit_event.wait returns True iff the event is set, i.e by an interrupt.
332- # If we timeout (good path), then we eval (False or True), hence we break out of this while loop.
330+ sub_duration = 1.0
331+ volume_moved_ml = 0.0
332+
333+ pump .by_duration (duration , block = False ) # start pump
334+
335+ while not pump .interrupt .is_set ():
336+ sub_volume_moved_ml = 0.0
337+ time_left = duration - (time .monotonic () - pump_start_time )
338+
339+ if time_left < 0 :
340+ pump .interrupt .wait ()
341+ break
342+
343+ elif time_left >= sub_duration > 0 :
344+ sub_volume_moved_ml = pump .duration_to_ml (sub_duration )
345+ dosing_event = replace (
346+ dosing_event , timestamp = current_utc_datetime (), volume_change = sub_volume_moved_ml
347+ )
348+ volume_moved_ml += sub_volume_moved_ml
349+
350+ elif sub_duration > time_left > 0 :
351+ sub_volume_moved_ml = ml - volume_moved_ml
352+ dosing_event = replace (
353+ dosing_event , timestamp = current_utc_datetime (), volume_change = sub_volume_moved_ml
354+ )
355+ volume_moved_ml += sub_volume_moved_ml
356+
357+ mqtt_client .publish (
358+ f"pioreactor/{ unit } /{ experiment } /dosing_events" ,
359+ encode (dosing_event ),
360+ )
361+
362+ if state .exit_event .wait (min (sub_duration , time_left )):
363+ pump .interrupt .set ()
364+ pump_stop_time = time .monotonic ()
365+
366+ # ended early. We should calculate how much _wasnt_ added, and update that.
367+ logger .info (f"Stopped { pump_device } early." )
368+ actual_volume_moved_ml = pump .duration_to_ml (pump_stop_time - pump_start_time )
369+ correction_factor = (
370+ actual_volume_moved_ml - volume_moved_ml
371+ ) # reported too much since we log first before dosing
372+
373+ dosing_event = replace (
374+ dosing_event , timestamp = current_utc_datetime (), volume_change = correction_factor
375+ )
376+ mqtt_client .publish (
377+ f"pioreactor/{ unit } /{ experiment } /dosing_events" ,
378+ encode (dosing_event ),
379+ )
380+
381+ return actual_volume_moved_ml
382+
383+ return volume_moved_ml
333384
334- while not (state .exit_event .wait (duration ) or True ):
335- pump .interrupt .set ()
336385 else :
337386 # continously path
338387 pump .continuously (block = False )
@@ -344,23 +393,11 @@ def _get_pump_action(pump_device: PumpCalibrationDevices) -> str:
344393 mqtt_client .publish (
345394 f"pioreactor/{ unit } /{ experiment } /dosing_events" ,
346395 encode (dosing_event ),
347- qos = QOS .AT_MOST_ONCE , # we don't need the same level of accuracy here
348396 )
349397 pump .stop ()
350398 logger .info (f"Stopped { pump_device } ." )
351399
352- if state .exit_event .is_set ():
353- # ended early. We should calculate how much _wasnt_ added, and update that.
354- shortened_duration = time .monotonic () - pump_start_time
355- actually_dosed = pump .duration_to_ml (shortened_duration )
356- logger .info (f"Stopped { pump_device } early." )
357-
358- # need to reverse the data that was fired, dosing _can_ be negative, so we publish actually_dosed - ml s.t. ml + (actually_dosed - ml) = actually_dosed.
359- _publish_pump_action (
360- action_name , actually_dosed - ml , unit , experiment , mqtt_client , source_of_event
361- )
362- return actually_dosed
363- return ml
400+ return ml
364401
365402
366403def _liquid_circulation (
0 commit comments