Skip to content

Commit 6f46abd

Browse files
Murko: continue thaw if redis is down (#1437)
* Check redis connection and thaw if connection fails * Clean up tests * Small change * Add test * Move connection check into murko_callback * Fix * Fix and add tests * Move warning into function * Pin dodal * Check connection in start rather than callback init * Improve test * Add test * Update dls-dodal dependency to use main branch * Fix typo --------- Co-authored-by: Dominic Oram <[email protected]>
1 parent c1346e4 commit 6f46abd

File tree

5 files changed

+98
-8
lines changed

5 files changed

+98
-8
lines changed

src/mx_bluesky/beamlines/i04/callbacks/murko_callback.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from dodal.log import LOGGER
88
from event_model.documents import Event, RunStart, RunStop
99
from redis import StrictRedis
10+
from redis.exceptions import ConnectionError
1011

1112
FORWARDING_COMPLETE_MESSAGE = "image_forwarding_complete"
1213

@@ -57,7 +58,20 @@ def __init__(self, redis_host: str, redis_password: str, redis_db: int = 0):
5758
self.last_uuid = None
5859
self.previous_omegas: list[OmegaReading] = []
5960

61+
def _check_redis_connection(self):
62+
try:
63+
self.redis_client.ping()
64+
return True
65+
except ConnectionError:
66+
LOGGER.warning(
67+
f"Failed to connect to redis: {self.redis_client}. Murko callback will not run"
68+
)
69+
return False
70+
6071
def start(self, doc: RunStart) -> RunStart | None:
72+
self.redis_connected = self._check_redis_connection()
73+
if not self.redis_connected:
74+
return doc
6175
self.murko_metadata: dict = {"sample_id": doc.get("sample_id")}
6276
self.last_uuid = None
6377
self.previous_omegas = []
@@ -67,6 +81,8 @@ def start(self, doc: RunStart) -> RunStart | None:
6781
return doc
6882

6983
def event(self, doc: Event) -> Event:
84+
if not self.redis_connected:
85+
return doc
7086
data = doc["data"]
7187
for prefix in ("oav", "oav_full_screen"):
7288
if f"{prefix}-beam_centre_j" in data:
@@ -114,6 +130,8 @@ def call_murko(self, uuid: str, omega: float):
114130
self.redis_client.publish("murko", json.dumps(metadata))
115131

116132
def stop(self, doc: RunStop) -> RunStop | None:
133+
if not self.redis_connected:
134+
return doc
117135
LOGGER.info(f"Finished streaming {self.murko_metadata['sample_id']} to murko")
118136
LOGGER.info(
119137
f"Publishing forwarding complete message: {FORWARDING_COMPLETE_MESSAGE}"

src/mx_bluesky/beamlines/i04/thawing_plan.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def thaw_and_murko_centre(
8888
initial_zoom_level = yield from bps.rd(oav_fs.zoom_controller.level)
8989
initial_velocity = yield from bps.rd(smargon.omega.velocity)
9090
new_velocity = abs(rotation / time_to_thaw) * 2.0
91+
9192
murko_callback = MurkoCallback(
9293
RedisConstants.REDIS_HOST,
9394
RedisConstants.REDIS_PASSWORD,

tests/unit_tests/beamlines/i04/callbacks/test_murko_callback.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from unittest.mock import MagicMock
2+
from unittest.mock import MagicMock, patch
33

44
import pytest
55
from event_model import Event
@@ -275,3 +275,26 @@ def test_when_murko_called_with_full_screen_and_roi_event_then_metadata_updates_
275275
)
276276
def test_extrapolate_omega(latest_omega, previous_omega, now, expected):
277277
assert extrapolate_omega(latest_omega, previous_omega, now) == expected
278+
279+
280+
@patch(
281+
"mx_bluesky.beamlines.i04.callbacks.murko_callback.MurkoCallback._check_redis_connection"
282+
)
283+
def test_if_redis_connection_fails_then_there_is_no_error(
284+
mock_check_redis_connection: MagicMock,
285+
):
286+
mock_check_redis_connection.return_value = False
287+
callback = MurkoCallback("", "")
288+
doc = {}
289+
callback.start(doc)
290+
callback.event(doc)
291+
callback.stop(doc)
292+
293+
294+
def test_warning_is_logged_if_redis_connection_fails(caplog):
295+
callback = MurkoCallback("", "")
296+
doc = {}
297+
callback.start(doc)
298+
log_message = caplog.records[-1]
299+
assert log_message.levelname == "WARNING"
300+
assert "Failed to connect to redis: " in log_message.message

tests/unit_tests/beamlines/i04/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@
99
def murko_callback() -> MurkoCallback:
1010
callback = MurkoCallback("", "")
1111
callback.redis_client = MagicMock()
12+
callback.redis_connected = True
1213
return callback

tests/unit_tests/beamlines/i04/test_thawing.py

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@ def test_thaw_and_stream_adds_murko_callback_and_produces_expected_messages(
263263

264264
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.stop")
265265
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback.call_murko")
266+
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection")
266267
def test_thaw_and_stream_will_produce_events_that_call_murko(
268+
patch_check_redis_connection: MagicMock,
267269
patch_murko_call: MagicMock,
268270
patch_stop_call: MagicMock,
269271
smargon: Smargon,
@@ -272,6 +274,8 @@ def test_thaw_and_stream_will_produce_events_that_call_murko(
272274
oav_forwarder: OAVToRedisForwarder,
273275
run_engine: RunEngine,
274276
):
277+
patch_check_redis_connection.return_value = True
278+
275279
class StopPlanError(Exception):
276280
pass
277281

@@ -367,7 +371,7 @@ def _run_thaw_and_stream_and_assert_zoom_changes(
367371

368372
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
369373
def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back(
370-
patch_murko_callback,
374+
patch_murko_callback: MagicMock,
371375
smargon: Smargon,
372376
thawer: Thawer,
373377
oav_forwarder: OAVToRedisForwarder,
@@ -383,8 +387,8 @@ def test_given_thaw_succeeds_then_thaw_and_stream_sets_zoom_to_1_and_back(
383387
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
384388
@patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor")
385389
def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back(
386-
mock__thaw,
387-
patch_murko_callback,
390+
mock__thaw: MagicMock,
391+
patch_murko_callback: MagicMock,
388392
smargon: Smargon,
389393
thawer: Thawer,
390394
oav_forwarder: OAVToRedisForwarder,
@@ -403,8 +407,8 @@ def test_given_thaw_fails_then_thaw_and_stream_sets_zoom_to_1_and_back(
403407
"mx_bluesky.beamlines.i04.thawing_plan._rotate_in_one_direction_and_stream_to_redis"
404408
)
405409
def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice(
406-
mock_rotate_and_stream,
407-
patch_murko_callback,
410+
mock_rotate_and_stream: MagicMock,
411+
patch_murko_callback: MagicMock,
408412
smargon: Smargon,
409413
thawer: Thawer,
410414
oav_forwarder: OAVToRedisForwarder,
@@ -424,8 +428,8 @@ def test_thaw_and_murko_centre_stages_and_unstages_murko_results_twice(
424428
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback")
425429
@patch("mx_bluesky.beamlines.i04.thawing_plan.bps.monitor")
426430
def test_given_thaw_and_murko_centre_errors_then_murko_results_still_unstaged(
427-
mock__thaw,
428-
patch_murko_callback,
431+
mock__thaw: MagicMock,
432+
patch_murko_callback: MagicMock,
429433
smargon: Smargon,
430434
thawer: Thawer,
431435
oav_forwarder: OAVToRedisForwarder,
@@ -583,3 +587,46 @@ def test_thawing_plan_with_murko_callback_puts_correct_metadata_into_redis(
583587
assert publish_call_args_list[1].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE)
584588
assert publish_call_args_list[2].args[1] == json.dumps(expected_roi_md)
585589
assert publish_call_args_list[3].args[1] == json.dumps(FORWARDING_COMPLETE_MESSAGE)
590+
591+
592+
@patch("mx_bluesky.beamlines.i04.thawing_plan.MurkoCallback._check_redis_connection")
593+
def test_plans_carry_on_thaw_if_redis_connection_check_fails(
594+
patch_callback_check_redis_connection: MagicMock,
595+
smargon: Smargon,
596+
thawer: Thawer,
597+
robot: BartRobot,
598+
oav_forwarder: OAVToRedisForwarder,
599+
run_engine: RunEngine,
600+
):
601+
patch_callback_check_redis_connection.return_value = False
602+
murko_results = MurkoResultsDevice()
603+
murko_results._check_redis_connection = AsyncMock(return_value=False)
604+
for plan in (
605+
thaw_and_murko_centre(
606+
10,
607+
360,
608+
thawer=thawer,
609+
smargon=smargon,
610+
robot=robot,
611+
oav_to_redis_forwarder=oav_forwarder,
612+
murko_results=murko_results,
613+
),
614+
thaw_and_stream_to_redis(
615+
10,
616+
360,
617+
thawer=thawer,
618+
smargon=smargon,
619+
robot=robot,
620+
oav_to_redis_forwarder=oav_forwarder,
621+
),
622+
):
623+
run_engine(plan)
624+
625+
omega_put = get_mock_put(smargon.omega.user_setpoint)
626+
627+
assert omega_put.call_args_list == [
628+
call(360.0, wait=True),
629+
call(0.0, wait=True),
630+
]
631+
632+
omega_put.reset_mock()

0 commit comments

Comments
 (0)