1717from pioreactor import types as pt
1818from pioreactor .config import config
1919from pioreactor .config import leader_hostname
20+ from pioreactor .exc import DodgingTimingError
2021from pioreactor .exc import JobPresentError
2122from pioreactor .exc import NotActiveWorkerError
2223from pioreactor .logging import create_logger
@@ -1033,6 +1034,44 @@ def _noop():
10331034 pass
10341035
10351036
1037+ def compute_od_timing (
1038+ * ,
1039+ interval : float ,
1040+ first_od_obs_time : float ,
1041+ now : float ,
1042+ od_duration : float ,
1043+ pre_delay : float ,
1044+ post_delay : float ,
1045+ after_action : float ,
1046+ ) -> dict [str , float ]:
1047+ """
1048+ Compute the time budget between OD readings.
1049+
1050+ The OD job runs every `interval` seconds, taking `od_duration` seconds of that window. We also
1051+ reserve `pre_delay` seconds before the next OD and `post_delay` seconds after the previous OD.
1052+ Whatever time is left, minus the runtime of the post-OD action (`after_action`), is the
1053+ "wait window" where the main activity can run normally.
1054+
1055+ wait_window = interval - od_duration - (pre_delay + post_delay) - after_action
1056+
1057+ If the wait window is non-positive, dodging is impossible with the current timings.
1058+
1059+ time_to_next_od aligns the next timer fire with the OD schedule based on the first observation
1060+ timestamp and the current clock.
1061+ """
1062+
1063+ wait_window = interval - od_duration - (pre_delay + post_delay ) - after_action
1064+
1065+ if wait_window <= 0 :
1066+ raise DodgingTimingError (
1067+ f"Insufficient time budget: interval={ interval } , od_duration={ od_duration } , pre_delay={ pre_delay } , post_delay={ post_delay } , after_action={ after_action } "
1068+ )
1069+
1070+ time_to_next_od = interval - ((now - first_od_obs_time ) % interval )
1071+
1072+ return {"wait_window" : wait_window , "time_to_next_od" : time_to_next_od }
1073+
1074+
10361075class BackgroundJobWithDodging (_BackgroundJob ):
10371076 """
10381077 This utility class allows for a change in behaviour when an OD reading is about to taken. Example: shutting
@@ -1102,6 +1141,7 @@ def __init__(self, *args, source="app", enable_dodging_od=False, **kwargs) -> No
11021141 self .add_to_published_settings ("enable_dodging_od" , {"datatype" : "boolean" , "settable" : True })
11031142 self .add_to_published_settings ("currently_dodging_od" , {"datatype" : "boolean" , "settable" : False })
11041143 self ._event_is_dodging_od = threading .Event ()
1144+ self ._dodging_init_called_once = False
11051145 self .enable_dodging_od = enable_dodging_od
11061146
11071147 def __post__init__ (self ):
@@ -1115,37 +1155,67 @@ def __post__init__(self):
11151155 )
11161156 super ().__post__init__ () # set ready
11171157
1158+ def _desired_dodging_mode (self , enable_dodging_od : bool , od_state : str | None ) -> bool :
1159+ """Return True if we should dodge based on enable flag and OD state."""
1160+ if not enable_dodging_od :
1161+ return False
1162+ # enable_dodging_od is true - user wants it on
1163+ if od_state is None :
1164+ return False
1165+ if od_state in {self .READY , self .SLEEPING , self .INIT }:
1166+ return True
1167+ if od_state in {self .LOST , self .DISCONNECTED }:
1168+ return False
1169+ return False
1170+
11181171 def set_currently_dodging_od (self , value : bool ):
1172+ """
1173+ Recall: currently_dodging_od is read-only. This function is called when other settings & variables are satisfied (it's "computed").
1174+ """
11191175 if self .state not in (self .READY , self .INIT ):
11201176 return
11211177
1178+ if self ._dodging_init_called_once and self .currently_dodging_od == value :
1179+ # noop
1180+ return
1181+
11221182 self .currently_dodging_od = value
1183+ self ._dodging_init_called_once = True
11231184 if self .currently_dodging_od :
1185+ self .logger .debug ("Dodging enabled." )
11241186 self ._event_is_dodging_od .clear ()
11251187 self .initialize_dodging_operation () # user defined
11261188 self ._action_to_do_before_od_reading = self .action_to_do_before_od_reading
11271189 self ._action_to_do_after_od_reading = self .action_to_do_after_od_reading
11281190 self ._setup_timer ()
11291191 else :
1192+ self .logger .debug ("Dodging disabled; running continuously." )
11301193 self ._event_is_dodging_od .set ()
1131- self .sneak_in_timer .cancel ()
1194+ try :
1195+ self .sneak_in_timer .cancel ()
1196+ except AttributeError :
1197+ pass
11321198 self .initialize_continuous_operation () # user defined
11331199 self ._action_to_do_before_od_reading = _noop
11341200 self ._action_to_do_after_od_reading = _noop
11351201
11361202 def set_enable_dodging_od (self , value : bool ):
1203+ """Turn dodging on/off based on user intent, then align mode with current OD state."""
11371204 self .enable_dodging_od = value
1138- if self .enable_dodging_od :
1139- if is_pio_job_running ("od_reading" ):
1140- self .logger .debug ("Will attempt to dodge OD readings." )
1141- self .set_currently_dodging_od (True )
1142- else :
1143- self .logger .debug ("Will attempt to dodge OD readings when they start." )
1144- self .set_currently_dodging_od (False )
1145- else :
1146- if is_pio_job_running ("od_reading" ):
1147- self .logger .debug ("Running continuously through OD readings." )
1148- self .set_currently_dodging_od (False )
1205+ od_running = is_pio_job_running ("od_reading" )
1206+ od_state = self .READY if od_running else self .DISCONNECTED
1207+
1208+ desired = self ._desired_dodging_mode (self .enable_dodging_od , od_state )
1209+ self .set_currently_dodging_od (desired )
1210+
1211+ def _od_reading_changed_status (self , state_msg : pt .MQTTMessage ) -> None :
1212+ """React to OD job state changes by flipping dodging mode when needed."""
1213+ if not self .enable_dodging_od :
1214+ return
1215+
1216+ new_state = state_msg .payload .decode ()
1217+ desired = self ._desired_dodging_mode (self .enable_dodging_od , new_state )
1218+ self .set_currently_dodging_od (desired )
11491219
11501220 def action_to_do_after_od_reading (self ) -> None :
11511221 pass
@@ -1159,22 +1229,6 @@ def initialize_dodging_operation(self) -> None:
11591229 def initialize_continuous_operation (self ) -> None :
11601230 pass
11611231
1162- def _od_reading_changed_status (self , state_msg : pt .MQTTMessage ) -> None :
1163- if self .enable_dodging_od :
1164- new_state = state_msg .payload .decode ()
1165- # only act if our internal state is discordant with the external state
1166- if new_state in {self .READY , self .SLEEPING , self .INIT } and not self .currently_dodging_od :
1167- # turned off
1168- self .logger .debug ("OD reading present. Dodging!" )
1169- self .set_currently_dodging_od (True )
1170- elif new_state in {self .LOST , self .DISCONNECTED } and self .currently_dodging_od :
1171- self .logger .debug ("OD reading turned off. Stop dodging." )
1172- self .set_currently_dodging_od (False )
1173- return
1174- else :
1175- # ignore
1176- return
1177-
11781232 def _setup_timer (self ) -> None :
11791233 self .sneak_in_timer .cancel ()
11801234
@@ -1187,7 +1241,7 @@ def _setup_timer(self) -> None:
11871241 if pre_delay < 0.25 :
11881242 self .logger .warning ("For optimal OD readings, keep `pre_delay_duration` more than 0.25 seconds." )
11891243
1190- def sneak_in (ads_interval : float , post_delay : float , pre_delay : float ) -> None :
1244+ def sneak_in () -> None :
11911245 if self .state != self .READY or not self .currently_dodging_od :
11921246 return
11931247
@@ -1196,17 +1250,25 @@ def sneak_in(ads_interval: float, post_delay: float, pre_delay: float) -> None:
11961250
11971251 action_after_duration = timer ()
11981252
1199- if ads_interval - self .OD_READING_DURATION - (post_delay + pre_delay ) - action_after_duration < 0 :
1200- raise ValueError (
1201- "samples_per_second is too high, or post_delay is too high, or pre_delay is too high, or action_to_do_after_od_reading takes too long."
1253+ try :
1254+ timing = compute_od_timing (
1255+ interval = ads_interval ,
1256+ first_od_obs_time = ads_start_time ,
1257+ now = time (),
1258+ od_duration = self .OD_READING_DURATION ,
1259+ pre_delay = pre_delay ,
1260+ post_delay = post_delay ,
1261+ after_action = action_after_duration ,
12021262 )
1263+ except DodgingTimingError as e :
1264+ self .logger .error (e )
1265+ self .clean_up ()
1266+ return
12031267
12041268 if self .state != self .READY or not self .currently_dodging_od :
12051269 return
12061270
1207- self ._event_is_dodging_od .wait (
1208- ads_interval - self .OD_READING_DURATION - (post_delay + pre_delay ) - action_after_duration
1209- ) # we use an Event here to allow for quick stopping of the timer.
1271+ self ._event_is_dodging_od .wait (timing ["wait_window" ]) # allow quick stopping of timer.
12101272
12111273 if self .state != self .READY or not self .currently_dodging_od :
12121274 return
@@ -1236,7 +1298,6 @@ def sneak_in(ads_interval: float, post_delay: float, pre_delay: float) -> None:
12361298 ads_interval ,
12371299 sneak_in ,
12381300 job_name = self .job_name ,
1239- args = (ads_interval , post_delay , pre_delay ),
12401301 run_immediately = True ,
12411302 run_after = time_to_next_ads_reading + (post_delay + self .OD_READING_DURATION ),
12421303 logger = self .logger ,
0 commit comments