diff --git a/src/dodal/beamlines/i09_1.py b/src/dodal/beamlines/i09_1.py index 24120649d2..52fc02dfeb 100644 --- a/src/dodal/beamlines/i09_1.py +++ b/src/dodal/beamlines/i09_1.py @@ -10,6 +10,11 @@ from dodal.devices.electron_analyser import EnergySource from dodal.devices.electron_analyser.specs import SpecsDetector from dodal.devices.i09_1 import LensMode, PsuMode +from dodal.devices.i09_1_shared.hard_energy import HardEnergy, HardInsertionDeviceEnergy +from dodal.devices.i09_1_shared.hard_undulator_functions import ( + calculate_energy_i09_hu, + calculate_gap_i09_hu, +) from dodal.devices.synchrotron import Synchrotron from dodal.devices.undulator import UndulatorInMm, UndulatorOrder from dodal.log import set_beamline as set_log_beamline @@ -58,3 +63,24 @@ def undulator() -> UndulatorInMm: @device_factory() def harmonics() -> UndulatorOrder: return UndulatorOrder(name="harmonics") + + +@device_factory() +def hu_id_energy() -> HardInsertionDeviceEnergy: + return HardInsertionDeviceEnergy( + undulator_order=harmonics(), + undulator=undulator(), + lut={}, # Placeholder, will be set later_ + gap_to_energy_func=calculate_energy_i09_hu, + energy_to_gap_func=calculate_gap_i09_hu, + name="hu_id_energy", + ) + + +@device_factory() +def hu_energy() -> HardEnergy: + return HardEnergy( + dcm=dcm(), + undulator_energy=hu_id_energy(), + name="hu_energy", + ) diff --git a/src/dodal/devices/i09_1_shared/__init__.py b/src/dodal/devices/i09_1_shared/__init__.py index f42dcdb252..174e219ecb 100644 --- a/src/dodal/devices/i09_1_shared/__init__.py +++ b/src/dodal/devices/i09_1_shared/__init__.py @@ -1,7 +1,14 @@ +from .hard_energy import HardEnergy, HardInsertionDeviceEnergy from .hard_undulator_functions import ( calculate_energy_i09_hu, calculate_gap_i09_hu, get_hu_lut_as_dict, ) -__all__ = ["calculate_gap_i09_hu", "get_hu_lut_as_dict", "calculate_energy_i09_hu"] +__all__ = [ + "calculate_gap_i09_hu", + "get_hu_lut_as_dict", + "calculate_energy_i09_hu", + "HardInsertionDeviceEnergy", + "HardEnergy", +] diff --git a/src/dodal/devices/i09_1_shared/hard_energy.py b/src/dodal/devices/i09_1_shared/hard_energy.py new file mode 100644 index 0000000000..8ec3daeac6 --- /dev/null +++ b/src/dodal/devices/i09_1_shared/hard_energy.py @@ -0,0 +1,111 @@ +from asyncio import gather +from collections.abc import Callable + +from bluesky.protocols import Locatable, Location, Movable +from numpy import ndarray +from ophyd_async.core import ( + AsyncStatus, + Reference, + StandardReadable, + StandardReadableFormat, + derived_signal_rw, + soft_signal_rw, +) + +from dodal.devices.common_dcm import DoubleCrystalMonochromatorBase +from dodal.devices.i09_1_shared.hard_undulator_functions import ( + MAX_ENERGY_COLUMN, + MIN_ENERGY_COLUMN, +) +from dodal.devices.undulator import UndulatorInMm, UndulatorOrder + + +class HardInsertionDeviceEnergy(StandardReadable, Movable[float]): + """ + Compound device to link hard x-ray undulator gap and order to photon energy. + Setting the energy adjusts the undulator gap accordingly. + """ + + def __init__( + self, + undulator_order: UndulatorOrder, + undulator: UndulatorInMm, + lut: dict[int, ndarray], + gap_to_energy_func: Callable[..., float], + energy_to_gap_func: Callable[..., float], + name: str = "", + ) -> None: + self._lut = lut + self.gap_to_energy_func = gap_to_energy_func + self.energy_to_gap_func = energy_to_gap_func + self._undulator_order = Reference(undulator_order) + self._undulator = Reference(undulator) + + self.add_readables([undulator_order, undulator.current_gap]) + with self.add_children_as_readables(StandardReadableFormat.HINTED_SIGNAL): + self.energy_demand = soft_signal_rw(float) + self.energy = derived_signal_rw( + raw_to_derived=self._read_energy, + set_derived=self._set_energy, + current_gap=self._undulator().gap_motor.user_readback, + current_order=self._undulator_order().value, + derived_units="eV", + ) + super().__init__(name=name) + + def _read_energy(self, current_gap: float, current_order: int) -> float: + return self.gap_to_energy_func( + gap=current_gap, + look_up_table=self._lut, + order=current_order, + ) + + async def _set_energy(self, energy: float): + current_order = await self._undulator_order().value.get_value() + min_energy, max_energy = self._lut[current_order][ + MIN_ENERGY_COLUMN : MAX_ENERGY_COLUMN + 1 + ] + if not (min_energy <= energy <= max_energy): + raise ValueError( + f"Requested energy {energy} keV is out of range for harmonic {current_order}: " + f"[{min_energy}, {max_energy}] keV" + ) + + target_gap = self.energy_to_gap_func( + photon_energy_kev=energy, look_up_table=self._lut, order=current_order + ) + await self._undulator().set(target_gap) + + @AsyncStatus.wrap + async def set(self, value: float) -> None: + self.energy_demand.set(value) + await self.energy.set(value) + + +class HardEnergy(StandardReadable, Locatable[float]): + """ + Energy compound device that provides combined change of both DCM energy and undulator gap accordingly. + """ + + def __init__( + self, + dcm: DoubleCrystalMonochromatorBase, + undulator_energy: HardInsertionDeviceEnergy, + name: str = "", + ) -> None: + self._dcm = Reference(dcm) + self._undulator_energy = Reference(undulator_energy) + self.add_readables([undulator_energy, dcm.energy_in_keV]) + super().__init__(name=name) + + @AsyncStatus.wrap + async def set(self, value: float): + await gather( + self._dcm().energy_in_keV.set(value), self._undulator_energy().set(value) + ) + + async def locate(self) -> Location[float]: + return Location( + setpoint=await self._dcm().energy_in_keV.user_setpoint.get_value(), + readback=await self._dcm().energy_in_keV.user_readback.get_value(), + ) diff --git a/tests/devices/i09_1_shared/test_hard_energy.py b/tests/devices/i09_1_shared/test_hard_energy.py new file mode 100644 index 0000000000..b0155f8a92 --- /dev/null +++ b/tests/devices/i09_1_shared/test_hard_energy.py @@ -0,0 +1,235 @@ +import re + +import pytest +from bluesky.plan_stubs import mv +from bluesky.run_engine import RunEngine +from ophyd_async.core import init_devices +from ophyd_async.testing import ( + assert_reading, + partial_reading, + set_mock_value, +) + +from dodal.devices.common_dcm import ( + DoubleCrystalMonochromatorWithDSpacing, + PitchAndRollCrystal, + StationaryCrystal, +) +from dodal.devices.i09_1_shared import ( + HardEnergy, + HardInsertionDeviceEnergy, + calculate_energy_i09_hu, + calculate_gap_i09_hu, + get_hu_lut_as_dict, +) +from dodal.devices.undulator import UndulatorInMm, UndulatorOrder +from dodal.testing.setup import patch_all_motors +from tests.devices.i09_1_shared.test_data import TEST_HARD_UNDULATOR_LUT + + +@pytest.fixture +async def lut_dictionary() -> dict: + return await get_hu_lut_as_dict(TEST_HARD_UNDULATOR_LUT) + + +@pytest.fixture +def undulator_order() -> UndulatorOrder: + with init_devices(mock=True): + order = UndulatorOrder(name="undulator_order") + set_mock_value(order.value, 3) + return order + + +@pytest.fixture +async def dcm() -> DoubleCrystalMonochromatorWithDSpacing: + async with init_devices(mock=True): + dcm = DoubleCrystalMonochromatorWithDSpacing( + "TEST-MO-DCM-01:", PitchAndRollCrystal, StationaryCrystal + ) + patch_all_motors(dcm) + return dcm + + +@pytest.fixture +async def undulator_in_mm() -> UndulatorInMm: + async with init_devices(mock=True): + undulator_mm = UndulatorInMm("UND-02") + patch_all_motors(undulator_mm) + return undulator_mm + + +@pytest.fixture +async def hu_id_energy( + undulator_order: UndulatorOrder, + undulator_in_mm: UndulatorInMm, + lut_dictionary: dict, +) -> HardInsertionDeviceEnergy: + hu = HardInsertionDeviceEnergy( + undulator_order=undulator_order, + undulator=undulator_in_mm, + lut=lut_dictionary, + gap_to_energy_func=calculate_energy_i09_hu, + energy_to_gap_func=calculate_gap_i09_hu, + name="hu_id_energy", + ) + # patch_all_motors(hu) + return hu + + +@pytest.fixture +async def hu_energy( + hu_id_energy: HardInsertionDeviceEnergy, + dcm: DoubleCrystalMonochromatorWithDSpacing, +) -> HardEnergy: + async with init_devices(mock=True): + hu_energy_device = HardEnergy( + dcm=dcm, + undulator_energy=hu_id_energy, + name="hu_energy", + ) + patch_all_motors(hu_energy_device) + return hu_energy_device + + +async def test_hu_id_energy_reading_includes_read_fields( + run_engine: RunEngine, + hu_id_energy: HardInsertionDeviceEnergy, +): + # need to set correct order to avoid errors in reading + await hu_id_energy._undulator_order().value.set(3) + energy_value = 3.1416 + run_engine(mv(hu_id_energy, energy_value)) + await assert_reading( + hu_id_energy, + { + "hu_id_energy-energy": partial_reading(energy_value), + "hu_id_energy-energy_demand": partial_reading(energy_value), + "undulator_mm-current_gap": partial_reading(0.0), + "undulator_order-value": partial_reading(3.0), + }, + ) + + +async def test_hu_id_energy_set_energy_fails( + hu_id_energy: HardInsertionDeviceEnergy, +): + value = 100.5 + with pytest.raises( + ValueError, + match=re.escape(f"Requested energy {value} keV is out of range for harmonic 3"), + ): + await hu_id_energy.set(value) + + +async def test_hu_id_energy_energy_demand_initialised_correctly( + hu_id_energy: HardInsertionDeviceEnergy, + run_engine: RunEngine, +): + value = 3.456 + run_engine(mv(hu_id_energy, value)) + assert await hu_id_energy.energy_demand.get_value() == pytest.approx( + value, abs=0.001 + ) + + +@pytest.mark.parametrize( + "energy_value, order_value, expected_gap", + [ + (2.130, 1, 12.814), + (2.780, 3, 6.053), + (6.240, 5, 7.956), + ], +) +async def test_hu_id_energy_set_energy_updates_gap( + run_engine: RunEngine, + hu_id_energy: HardInsertionDeviceEnergy, + energy_value: float, + order_value: int, + expected_gap: float, +): + await hu_id_energy._undulator_order().value.set(order_value) + assert await hu_id_energy._undulator_order().value.get_value() == order_value + run_engine(mv(hu_id_energy, energy_value)) + assert await hu_id_energy.energy_demand.get_value() == pytest.approx( + energy_value, abs=0.001 + ) + assert await hu_id_energy.energy.get_value() == pytest.approx( + energy_value, abs=0.001 + ) + assert ( + await hu_id_energy._undulator().gap_motor.user_readback.get_value() + == pytest.approx(expected_gap, abs=0.001) + ) + + +async def test_hu_energy_read_include_read_fields( + hu_energy: HardEnergy, + run_engine: RunEngine, +): + await hu_energy._undulator_energy()._undulator_order().value.set(3) + energy_value = 3.1416 + run_engine(mv(hu_energy, energy_value)) + await assert_reading( + hu_energy, + { + "dcm-energy_in_keV": partial_reading(energy_value), + "hu_id_energy-energy": partial_reading(energy_value), + "hu_id_energy-energy_demand": partial_reading(energy_value), + "undulator_mm-current_gap": partial_reading(0.0), + "undulator_order-value": partial_reading(3.0), + }, + ) + + +async def test_hu_energy_set_both_dcm_and_id_energy( + run_engine: RunEngine, + hu_energy: HardEnergy, +): + energy_value = 3.1415 + run_engine(mv(hu_energy, energy_value)) + assert ( + await hu_energy._undulator_energy().energy_demand.get_value() + == pytest.approx(energy_value, abs=0.00001) + ) + assert ( + await hu_energy._dcm().energy_in_keV.user_readback.get_value() + == pytest.approx(energy_value, abs=0.00001) + ) + + +@pytest.mark.parametrize( + "energy_value, order_value, expected_gap", + [ + (2.781, 3, 6.055), + (6.241, 5, 7.957), + ], +) +async def test_hu_energy_set_moves_gap( + run_engine: RunEngine, + hu_energy: HardEnergy, + energy_value: float, + order_value: int, + expected_gap: float, +): + await hu_energy._undulator_energy()._undulator_order().value.set(order_value) + assert ( + await hu_energy._undulator_energy()._undulator_order().value.get_value() + == pytest.approx(order_value, abs=0.00001) + ) + run_engine(mv(hu_energy, energy_value)) + assert ( + await hu_energy._undulator_energy() + ._undulator() + .gap_motor.user_readback.get_value() + == pytest.approx(expected_gap, abs=0.001) + ) + + +async def test_hu_energy_locate( + run_engine: RunEngine, + hu_energy: HardEnergy, +): + energy_value = 3.1415 + run_engine(mv(hu_energy, energy_value)) + located_position = await hu_energy.locate() + assert located_position == {"readback": energy_value, "setpoint": energy_value}