Skip to content

Commit dc0b3e4

Browse files
committed
Additional system tests for supervisor
1 parent 975e064 commit dc0b3e4

File tree

4 files changed

+288
-101
lines changed

4 files changed

+288
-101
lines changed

src/mx_bluesky/hyperion/plan_runner.py

Lines changed: 50 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,13 @@ class PlanError(Exception):
4040

4141

4242
class PlanRunner(BaseRunner):
43+
EXTERNAL_CALLBACK_POLL_INTERVAL_S = 1
44+
EXTERNAL_CALLBACK_WATCHDOG_TIMER_S = 60
45+
4346
def __init__(self, context: BlueskyContext, dev_mode: bool):
4447
super().__init__(context)
48+
self._callbacks_started = False
49+
self._callback_watchdog_expiry = time.monotonic()
4550
self.is_dev_mode = dev_mode
4651

4752
@abstractmethod
@@ -50,27 +55,63 @@ def decode_and_execute(
5055
) -> MsgGenerator:
5156
pass
5257

53-
@abstractmethod
5458
def reset_callback_watchdog_timer(self):
55-
pass
59+
"""Called periodically to reset the watchdog timer when the external callbacks ping us."""
60+
self._callbacks_started = True
61+
self._callback_watchdog_expiry = (
62+
time.monotonic() + self.EXTERNAL_CALLBACK_WATCHDOG_TIMER_S
63+
)
5664

5765
@property
5866
@abstractmethod
5967
def current_status(self) -> Status:
6068
pass
6169

70+
def check_external_callbacks_are_alive(self):
71+
callback_expiry = time.monotonic() + self.EXTERNAL_CALLBACK_WATCHDOG_TIMER_S
72+
while time.monotonic() < callback_expiry:
73+
if self._callbacks_started:
74+
break
75+
# If on first launch the external callbacks aren't started yet, wait until they are
76+
LOGGER.info("Waiting for external callbacks to start")
77+
yield from bps.sleep(self.EXTERNAL_CALLBACK_POLL_INTERVAL_S)
78+
else:
79+
raise RuntimeError("External callbacks not running - try restarting")
80+
81+
if not self._external_callbacks_are_alive():
82+
raise RuntimeError(
83+
"External callback watchdog timer expired, check external callbacks are running."
84+
)
85+
86+
def request_run_engine_abort(self):
87+
"""Asynchronously request an abort from the run engine. This cannot be done from
88+
inside the main thread."""
89+
90+
def issue_abort():
91+
try:
92+
# abort() causes the run engine to throw a RequestAbort exception
93+
# inside the plan, which will propagate through the contingency wrappers.
94+
# When the plan returns, the run engine will raise RunEngineInterrupted
95+
self.run_engine.abort()
96+
except Exception as e:
97+
LOGGER.warning(
98+
"Exception encountered when issuing abort() to RunEngine:",
99+
exc_info=e,
100+
)
101+
102+
stopping_thread = threading.Thread(target=issue_abort)
103+
stopping_thread.start()
104+
105+
def _external_callbacks_are_alive(self) -> bool:
106+
return time.monotonic() < self._callback_watchdog_expiry
107+
62108

63109
class InProcessRunner(PlanRunner):
64110
"""Runner that executes experiments from inside a running Bluesky plan"""
65111

66-
EXTERNAL_CALLBACK_WATCHDOG_TIMER_S = 60
67-
EXTERNAL_CALLBACK_POLL_INTERVAL_S = 1
68-
69112
def __init__(self, context: BlueskyContext, dev_mode: bool) -> None:
70113
super().__init__(context, dev_mode)
71114
self._current_status: Status = Status.IDLE
72-
self._callbacks_started = False
73-
self._callback_watchdog_expiry = time.monotonic()
74115

75116
def decode_and_execute(
76117
self, current_visit: str | None, parameter_list: Sequence[MxBlueskyParameters]
@@ -115,20 +156,7 @@ def execute_plan(
115156
self._current_status = Status.BUSY
116157

117158
try:
118-
callback_expiry = time.monotonic() + self.EXTERNAL_CALLBACK_WATCHDOG_TIMER_S
119-
while time.monotonic() < callback_expiry:
120-
if self._callbacks_started:
121-
break
122-
# If on first launch the external callbacks aren't started yet, wait until they are
123-
LOGGER.info("Waiting for external callbacks to start")
124-
yield from bps.sleep(self.EXTERNAL_CALLBACK_POLL_INTERVAL_S)
125-
else:
126-
raise RuntimeError("External callbacks not running - try restarting")
127-
128-
if not self._external_callbacks_are_alive():
129-
raise RuntimeError(
130-
"External callback watchdog timer expired, check external callbacks are running."
131-
)
159+
yield from self.check_external_callbacks_are_alive()
132160
yield from experiment()
133161
self._current_status = Status.IDLE
134162
except WarningError as e:
@@ -147,35 +175,12 @@ def shutdown(self):
147175
"""Performs a prompt shutdown. Aborts the run engine and terminates the loop
148176
waiting for messages."""
149177

150-
def issue_abort():
151-
try:
152-
# abort() causes the run engine to throw a RequestAbort exception
153-
# inside the plan, which will propagate through the contingency wrappers.
154-
# When the plan returns, the run engine will raise RunEngineInterrupted
155-
self.run_engine.abort()
156-
except Exception as e:
157-
LOGGER.warning(
158-
"Exception encountered when issuing abort() to RunEngine:",
159-
exc_info=e,
160-
)
161-
162178
LOGGER.info("Shutting down: Stopping the run engine gracefully")
163179
if self.current_status != Status.ABORTING:
164180
self._current_status = Status.ABORTING
165-
stopping_thread = threading.Thread(target=issue_abort)
166-
stopping_thread.start()
181+
self.request_run_engine_abort()
167182
return
168183

169-
def reset_callback_watchdog_timer(self):
170-
"""Called periodically to reset the watchdog timer when the external callbacks ping us."""
171-
self._callbacks_started = True
172-
self._callback_watchdog_expiry = (
173-
time.monotonic() + self.EXTERNAL_CALLBACK_WATCHDOG_TIMER_S
174-
)
175-
176-
def _external_callbacks_are_alive(self) -> bool:
177-
return time.monotonic() < self._callback_watchdog_expiry
178-
179184
@property
180185
def current_status(self) -> Status:
181186
return self._current_status

src/mx_bluesky/hyperion/supervisor/_supervisor.py

Lines changed: 66 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from pathlib import Path
33

44
from blueapi.client.client import BlueapiClient
5+
from blueapi.client.event_bus import BlueskyStreamingError
56
from blueapi.config import ApplicationConfig, ConfigLoader
67
from blueapi.core import BlueskyContext
78
from blueapi.service.model import TaskRequest
@@ -13,7 +14,7 @@
1314
from mx_bluesky.common.utils.log import LOGGER
1415
from mx_bluesky.hyperion.parameters.components import UDCCleanup, UDCDefaultState, Wait
1516
from mx_bluesky.hyperion.parameters.load_centre_collect import LoadCentreCollect
16-
from mx_bluesky.hyperion.plan_runner import PlanRunner
17+
from mx_bluesky.hyperion.plan_runner import PlanError, PlanRunner
1718

1819

1920
def create_context() -> BlueskyContext:
@@ -36,49 +37,72 @@ def __init__(
3637
):
3738
super().__init__(bluesky_context, dev_mode)
3839
self.blueapi_client = BlueapiClient.from_config(client_config)
40+
self._current_status = Status.IDLE
3941

4042
def decode_and_execute(
4143
self, current_visit: str | None, parameter_list: Sequence[MxBlueskyParameters]
4244
) -> MsgGenerator:
45+
try:
46+
yield from self.check_external_callbacks_are_alive()
47+
except Exception as e:
48+
raise PlanError(f"Exception raised during plan execution: {e}") from e
4349
# TODO determine what is the instrument session for udc_default_state etc.
4450
instrument_session = current_visit or "NO_VISIT"
45-
for parameters in parameter_list:
46-
LOGGER.info(
47-
f"Executing plan with parameters: {parameters.model_dump_json(indent=2)}"
48-
)
49-
match parameters:
50-
case LoadCentreCollect(): # TODO
51-
pass
52-
case Wait():
53-
yield from bps.sleep(parameters.duration_s)
54-
case UDCDefaultState():
55-
task_request = TaskRequest(
56-
name="move_to_udc_default_state",
57-
params={},
58-
instrument_session=instrument_session,
59-
)
60-
self.blueapi_client.run_task(task_request)
61-
yield from bps.null()
62-
case UDCCleanup():
63-
task_request = TaskRequest(
64-
name="clean_up_udc",
65-
params={"visit": current_visit},
66-
instrument_session=instrument_session,
67-
)
68-
self.blueapi_client.run_task(task_request)
69-
yield from bps.null()
70-
case _:
71-
raise AssertionError(
72-
f"Unsupported instruction decoded from agamemnon {type(parameters)}"
73-
)
51+
try:
52+
if self._current_status == Status.ABORTING:
53+
raise PlanError("Plan execution cancelled, supervisor is shutting down")
54+
self._current_status = Status.BUSY
55+
for parameters in parameter_list:
56+
LOGGER.info(
57+
f"Executing plan with parameters: {parameters.model_dump_json(indent=2)}"
58+
)
59+
match parameters:
60+
case LoadCentreCollect(): # TODO
61+
pass
62+
case Wait():
63+
yield from bps.sleep(parameters.duration_s)
64+
case UDCDefaultState():
65+
task_request = TaskRequest(
66+
name="move_to_udc_default_state",
67+
params={},
68+
instrument_session=instrument_session,
69+
)
70+
self.blueapi_client.run_task(task_request)
71+
yield from bps.null()
72+
case UDCCleanup():
73+
task_request = TaskRequest(
74+
name="clean_up_udc",
75+
params={"visit": current_visit},
76+
instrument_session=instrument_session,
77+
)
78+
try:
79+
self.blueapi_client.run_task(task_request)
80+
except BlueskyStreamingError as e:
81+
# We will receive a BlueskyStreamingError if the remote server
82+
# processed an abort during plan execution, but this is not
83+
# the only possible cause.
84+
if self.current_status == Status.ABORTING:
85+
LOGGER.info("Aborting local runner...")
86+
self.request_run_engine_abort()
87+
else:
88+
raise PlanError(
89+
f"Exception raised during plan execution: {e}"
90+
) from e
91+
yield from bps.null()
92+
case _:
93+
raise AssertionError(
94+
f"Unsupported instruction decoded from agamemnon {type(parameters)}"
95+
)
96+
except:
97+
self._current_status = Status.FAILED
98+
raise
99+
else:
100+
self._current_status = Status.IDLE
74101
return current_visit
75102

76-
def reset_callback_watchdog_timer(self):
77-
pass
78-
79103
@property
80104
def current_status(self) -> Status:
81-
return Status.FAILED # TODO
105+
return self._current_status
82106

83107
def is_connected(self) -> bool:
84108
try:
@@ -89,4 +113,11 @@ def is_connected(self) -> bool:
89113
return True
90114

91115
def shutdown(self):
92-
pass
116+
LOGGER.info(
117+
"Hyperion supervisor received shutdown request, signalling abort to BlueAPI server..."
118+
)
119+
if self.current_status != Status.BUSY:
120+
self.request_run_engine_abort()
121+
else:
122+
self._current_status = Status.ABORTING
123+
self.blueapi_client.abort()

tests/system_tests/hyperion/supervisor/dummy_plans.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from dodal.devices.robot import BartRobot
88
from dodal.devices.smargon import Smargon
99

10+
from mx_bluesky.common.utils.exceptions import WarningError
11+
1012

1113
def publish_event(plan_name: str):
1214
yield from bps.open_run(md={"plan_name": plan_name})
@@ -23,3 +25,11 @@ def clean_up_udc(
2325
detector_motion: DetectorMotion = inject("detector_motion"),
2426
) -> MsgGenerator:
2527
yield from publish_event("clean_up_udc")
28+
match visit:
29+
case "raise_warning_error":
30+
raise WarningError("Test warning error")
31+
case "raise_other_error":
32+
raise RuntimeError("Test unexpected error")
33+
case "wait_for_abort":
34+
while True:
35+
yield from bps.sleep(1)

0 commit comments

Comments
 (0)