Skip to content

Commit e247777

Browse files
improvements to creating mqtt clients; adding logic to handle if the BJ subclass's __init__ fails - we gracefully clean up
1 parent cdcd532 commit e247777

File tree

16 files changed

+91
-24
lines changed

16 files changed

+91
-24
lines changed

pioreactor/actions/pump.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(
7777
self.calibration.hz,
7878
experiment=experiment,
7979
unit=unit,
80-
pubsub_client=mqtt_client,
80+
pub_client=mqtt_client,
8181
logger=logger,
8282
)
8383
self.pwm.lock()

pioreactor/automations/dosing/pid_morbidostat.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self, target_growth_rate: float | str, target_normalized_od: float
5555
experiment=self.experiment,
5656
job_name=self.job_name,
5757
target_name="growth_rate",
58+
pub_client=self.pub_client,
5859
)
5960

6061
assert isinstance(self.duration, float)

pioreactor/automations/temperature/thermostat.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self, target_temperature: float | str, **kwargs) -> None:
3333
job_name=self.job_name,
3434
target_name="temperature",
3535
output_limits=(-25, 25), # avoid whiplashing
36+
pub_client=self.pub_client,
3637
)
3738

3839
self.set_target_temperature(target_temperature)
@@ -50,6 +51,10 @@ def _clamp_target_temperature(self, target_temperature: float) -> float:
5051

5152
return clamp(0.0, target_temperature, self.MAX_TARGET_TEMP)
5253

54+
def on_disconnected(self) -> None:
55+
super().on_disconnected()
56+
self.pid.clean_up()
57+
5358
def execute(self) -> UpdatedHeaterDC:
5459
while not hasattr(self, "pid"):
5560
# sometimes when initializing, this execute can run before the subclasses __init__ is resolved.

pioreactor/background_jobs/base.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,22 @@ class _BackgroundJob(metaclass=PostInitCaller):
249249
# See pt.PublishableSetting type
250250
published_settings: dict[str, pt.PublishableSetting] = dict()
251251

252+
def __init_subclass__(cls, **kwargs):
253+
super().__init_subclass__(**kwargs)
254+
orig_init = cls.__init__
255+
256+
def wrapped_init(self, *args, **kwargs):
257+
try:
258+
orig_init(self, *args, **kwargs)
259+
except Exception:
260+
try:
261+
self.clean_up()
262+
except Exception:
263+
pass
264+
raise
265+
266+
cls.__init__ = wrapped_init
267+
252268
def __init__(self, unit: str, experiment: str, source: str = "app") -> None:
253269
if self.job_name in DISALLOWED_JOB_NAMES:
254270
raise ValueError("Job name not allowed.")

pioreactor/background_jobs/stirring.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,8 @@ def __init__(
242242
config.getfloat("stirring.config", "pwm_hz"),
243243
unit=self.unit,
244244
experiment=self.experiment,
245-
pubsub_client=self.pub_client,
245+
pub_client=self.pub_client,
246+
logger=self.logger,
246247
)
247248
self.pwm.start(0)
248249
self.pwm.lock()
@@ -268,6 +269,7 @@ def __init__(
268269
job_name=self.job_name,
269270
target_name="rpm",
270271
output_limits=(-7.5, 7.5), # avoid whiplashing
272+
pub_client=self.pub_client,
271273
)
272274

273275
def action_to_do_before_od_reading(self):
@@ -355,6 +357,8 @@ def on_disconnected(self) -> None:
355357
self.rpm_check_repeated_timer.cancel()
356358
with suppress(AttributeError):
357359
self.pwm.clean_up()
360+
with suppress(AttributeError):
361+
self.pid.clean_up()
358362
with suppress(AttributeError):
359363
if self.rpm_calculator:
360364
self.rpm_calculator.clean_up()

pioreactor/background_jobs/temperature_automation.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,14 @@ def setup_pwm(self) -> PWM:
299299
# impact (mainly: current sink), over the second. Ex: imagine freq=1hz, dc=40%, and the pump needs to run for
300300
# 0.3s. The influence of when the heat is one on the pump can be significant in a power-constrained system.
301301
pin = hardware.PWM_TO_PIN[hardware.HEATER_PWM_TO_PIN]
302-
pwm = PWM(pin, hertz, unit=self.unit, experiment=self.experiment, pubsub_client=self.pub_client)
302+
pwm = PWM(
303+
pin,
304+
hertz,
305+
unit=self.unit,
306+
experiment=self.experiment,
307+
pub_client=self.pub_client,
308+
logger=self.logger,
309+
)
303310
pwm.start(0)
304311
return pwm
305312

pioreactor/pubsub.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ def __init__(self, log_level: str, unit: str, experiment: str) -> None:
346346
allow_retained=False,
347347
client_id=f"{self.unit}_{self.experiment}_{self.log_level}_log_collector",
348348
)
349-
assert self.client.is_connected
350349

351350
def _collect_logs_into_bucket(self, message):
352351
# load the message

pioreactor/tests/test_automation_yamls.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313

1414

1515
def get_specific_yaml(path):
16-
data = get(f"https://raw.githubusercontent.com/Pioreactor/pioreactorui/master/{path}")
16+
r = get(f"https://raw.githubusercontent.com/Pioreactor/pioreactorui/master/{path}")
17+
r.raise_for_status()
18+
data = r.content
1719
return load(data.content, Loader=Loader)
1820

1921

pioreactor/tests/test_background_job.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,32 @@ def update_state(msg: MQTTMessage) -> None:
162162
assert not is_pio_job_running("testjob")
163163

164164

165+
def test_what_happens_when_an_error_occurs_in_init_but_we_dont_catch() -> None:
166+
class TestJob(BackgroundJob):
167+
job_name = "testjob"
168+
169+
def __init__(self, unit: str, experiment: str) -> None:
170+
super(TestJob, self).__init__(unit=unit, experiment=experiment)
171+
raise ZeroDivisionError()
172+
173+
exp = "test_what_happens_when_an_error_occurs_in_init_but_we_dont_catch"
174+
publish(f"pioreactor/unit/{exp}/testjob/$state", None, retain=True)
175+
state = []
176+
177+
def update_state(msg: MQTTMessage) -> None:
178+
state.append(msg.payload.decode())
179+
180+
subscribe_and_callback(update_state, f"pioreactor/unit/{exp}/testjob/$state")
181+
182+
with pytest.raises(ZeroDivisionError):
183+
with TestJob(unit="unit", experiment=exp):
184+
pass
185+
186+
pause()
187+
assert state[-1] == "disconnected"
188+
assert not is_pio_job_running("testjob")
189+
190+
165191
def test_state_transition_callbacks() -> None:
166192
class TestJob(BackgroundJob):
167193
job_name = "testjob"

pioreactor/tests/test_dosing_automation.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1391,9 +1391,8 @@ def execute(self):
13911391
assert len(bucket) >= 1
13921392

13931393

1394-
@pytest.mark.xfail
13951394
def test_a_failing_automation_cleans_duration_attr_in_mqtt_up() -> None:
1396-
experiment = "test_a_failing_automation_cleans_itself_up"
1395+
experiment = "test_a_failing_automation_cleans_duration_attr_in_mqtt_up"
13971396

13981397
pubsub.publish(f"pioreactor/{get_unit_name()}/{experiment}/dosing_automation/duration", None, retain=True)
13991398

0 commit comments

Comments
 (0)