diff --git a/src/genie_python/genie.py b/src/genie_python/genie.py index b32ccfbf..18aec8b7 100644 --- a/src/genie_python/genie.py +++ b/src/genie_python/genie.py @@ -44,7 +44,7 @@ ) from genie_python.version import VERSION # noqa E402 -PVBaseValue = bool | int | float | str +PVBaseValue = bool | int | float | str | bytes PVValue = PVBaseValue | list[PVBaseValue] | npt.NDArray | None # pyright: ignore # because we don't want to make PVValue generic @@ -2554,3 +2554,15 @@ def get_detector_table() -> str | None: """ assert _genie_api.dae is not None return _genie_api.dae.get_table_path("Detector") + + +@usercommand +@log_command_and_handle_exception +def change_autosave(freq: float) -> None: + """Change the rate of ICP autosave + + Args: + freq (float): frequency of autosave in Hz. + """ + assert _genie_api.dae is not None + _genie_api.dae.change_autosave_freq(freq) diff --git a/src/genie_python/genie_change_cache.py b/src/genie_python/genie_change_cache.py index 3ba0a83a..3a904651 100644 --- a/src/genie_python/genie_change_cache.py +++ b/src/genie_python/genie_change_cache.py @@ -1,14 +1,19 @@ +import xml.etree.ElementTree as ET from builtins import object, str +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from genie_python.genie import PVValue class ChangeCache(object): - def __init__(self): + def __init__(self) -> None: self.wiring = None self.detector = None self.spectra = None - self.mon_spect = None - self.mon_from = None - self.mon_to = None + self.mon_spect: int | None = None + self.mon_from: float | None = None + self.mon_to: float | None = None self.dae_sync = None self.tcb_file = None self.tcb_tables = [] @@ -30,13 +35,14 @@ def __init__(self): self.periods_seq = None self.periods_delay = None self.periods_settings = [] + self.autosave_freq: float | None = None - def set_monitor(self, spec, low, high): + def set_monitor(self, spec: int | None, low: float | None, high: float | None) -> None: self.mon_spect = spec self.mon_from = low self.mon_to = high - def clear_vetos(self): + def clear_vetos(self) -> None: self.smp_veto = 0 self.ts2_veto = 0 self.hz50_veto = 0 @@ -45,12 +51,12 @@ def clear_vetos(self): self.ext2_veto = 0 self.ext3_veto = 0 - def set_fermi(self, enable, delay=1.0, width=1.0): + def set_fermi(self, enable: int | bool, delay: float = 1.0, width: float = 1.0) -> None: self.fermi_veto = 1 if enable else 0 self.fermi_delay = delay self.fermi_width = width - def change_dae_settings(self, root): + def change_dae_settings(self, root: ET.Element) -> bool: changed = self._change_xml(root, "String", "Wiring Table", self.wiring) changed |= self._change_xml(root, "String", "Detector Table", self.detector) changed |= self._change_xml(root, "String", "Spectra Table", self.spectra) @@ -68,7 +74,7 @@ def change_dae_settings(self, root): changed |= self._change_vetos(root) return changed - def _change_vetos(self, root): + def _change_vetos(self, root: ET.Element) -> bool: changed = self._change_xml(root, "EW", "SMP (Chopper) Veto", self.smp_veto) changed |= self._change_xml(root, "EW", " TS2 Pulse Veto", self.ts2_veto) changed |= self._change_xml(root, "EW", " ISIS 50Hz Veto", self.hz50_veto) @@ -78,17 +84,17 @@ def _change_vetos(self, root): changed |= self._change_xml(root, "EW", "Veto 3", self.ext3_veto) return changed - def change_tcb_calculation_method(self, root): + def change_tcb_calculation_method(self, root: ET.Element) -> bool: changed = self._change_xml(root, "U16", "Calculation Method", self.tcb_calculation_method) return changed - def change_tcb_settings(self, root): + def change_tcb_settings(self, root: ET.Element) -> bool: changed = self._change_xml(root, "String", "Time Channel File", self.tcb_file) changed |= self.change_tcb_calculation_method(root) changed |= self._change_tcb_table(root) return changed - def _change_tcb_table(self, root): + def _change_tcb_table(self, root: ET.Element) -> bool: changed = False for row in self.tcb_tables: regime = str(row[0]) @@ -101,7 +107,7 @@ def _change_tcb_table(self, root): changed |= self.change_tcb_calculation_method(root) return changed - def change_period_settings(self, root): + def change_period_settings(self, root: ET.Element) -> bool: changed = self._change_xml(root, "EW", "Period Type", self.periods_type) changed |= self._change_xml( root, "I32", "Number Of Software Periods", self.periods_soft_num @@ -113,7 +119,7 @@ def change_period_settings(self, root): changed |= self._change_period_table(root) return changed - def _change_period_table(self, root): + def _change_period_table(self, root: ET.Element) -> bool: changed = False for row in self.periods_settings: period = row[0] @@ -127,7 +133,11 @@ def _change_period_table(self, root): changed |= self._change_xml(root, "String", "Label %s" % period, label) return changed - def _change_xml(self, xml, node, name, value): + def change_autosave_settings(self, root: ET.Element) -> bool: + changed = self._change_xml(root, "U32", " Frequency", self.autosave_freq) + return changed + + def _change_xml(self, xml: ET.Element, node: str, name: str, value: "PVValue") -> bool: """ Helper func to change the xml. Will not be set if the input is None. diff --git a/src/genie_python/genie_dae.py b/src/genie_python/genie_dae.py index 26c41ab4..936e6e84 100644 --- a/src/genie_python/genie_dae.py +++ b/src/genie_python/genie_dae.py @@ -3,6 +3,7 @@ import json import os import re +import typing import xml.etree.ElementTree as ET import zlib from binascii import hexlify @@ -127,6 +128,8 @@ "specintegrals_size": "DAE:SPECINTEGRALS.NORD", "specdata": "DAE:SPECDATA", "specdata_size": "DAE:SPECDATA.NORD", + "updatesettings": "DAE:UPDATESETTINGS", + "updatesettings_sp": "DAE:UPDATESETTINGS:SP", } DAE_CONFIG_FILE_PATHS = [ @@ -1201,6 +1204,7 @@ def change_finish(self) -> None: self._change_dae_settings() self._change_tcb_settings() self._change_period_settings() + self._change_autosave_freq() self.change_cache = ChangeCache() def change_tables( @@ -1884,6 +1888,20 @@ def _change_period_settings(self) -> None: "set a number that is too large for the DAE memory. Try a smaller number!" ) + def _change_autosave_freq(self) -> None: + update_settings = typing.cast( + bytes, self._get_pv_value(self._get_dae_pv_name("updatesettings"), to_string=True) + ) + root = ET.fromstring(update_settings) + + changed = self.change_cache.change_autosave_settings(root) + if changed: + update_settings_sp: bytes = ET.tostring(root).strip() + + self._set_pv_value( + self._get_dae_pv_name("updatesettings_sp"), update_settings_sp, wait=True + ) + def get_spectrum( self, spectrum: int, period: int = 1, dist: bool = True, use_numpy: bool | None = None ) -> "_GetspectrumReturn": @@ -2234,3 +2252,19 @@ def integrate_spectrum( # run sum of terms, note in the case that the high and low partials # are in the same bin this still works return full_count + partial_count_high - partial_count_low + + def change_autosave_freq(self, freq: float) -> None: + """Change the rate of ICP autosave + + Args: + freq (float): frequency of autosave + """ + did_change = False + if not self.in_change: + self.change_start() + did_change = True + + self.change_cache.autosave_freq = freq + + if did_change: + self.change_finish() diff --git a/src/genie_python/genie_simulate_impl.py b/src/genie_python/genie_simulate_impl.py index 50709756..08927fa4 100644 --- a/src/genie_python/genie_simulate_impl.py +++ b/src/genie_python/genie_simulate_impl.py @@ -116,6 +116,7 @@ def __init__(self) -> None: self.periods_seq = None self.periods_delay = None self.periods_settings = [] + self.autosave_freq: float | None = None def set_monitor(self, spec: int, low: float, high: float) -> None: self.mon_spect = spec @@ -257,6 +258,10 @@ def _change_period_table(self, root: ET.Element, changed: bool) -> bool: changed = True return changed + def change_autosave_settings(self, root: ET.Element) -> bool: + self._change_xml(root, "U32", " Frequency", self.autosave_freq) + return True + def _change_xml( self, xml: ET.Element, node: str, name: str, value: str | int | float | None ) -> None: @@ -1091,6 +1096,22 @@ def get_table_path(self, table_type: str) -> str: if table_type == "Spectra": return self.change_cache.spectra + def change_autosave_freq(self, freq: float) -> None: + """Change the rate of ICP autosave + + Args: + freq (float): frequency of autosave + """ + did_change = False + if not self.in_change: + self.change_start() + did_change = True + + self.change_cache.autosave_freq = freq + + if did_change: + self.change_finish() + class API(object): def __init__( diff --git a/tests/test_genie_dae.py b/tests/test_genie_dae.py index ebff76ba..c3bbe48c 100644 --- a/tests/test_genie_dae.py +++ b/tests/test_genie_dae.py @@ -112,6 +112,16 @@ """ +UPDATE_SETTINGS_XML = """ + DAE Updates + 3 + + Frequency + 5000 + + +""" + YC_RETURN = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0] Y_RETURN = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0] YC_NORD_RETURN = 4 @@ -123,13 +133,19 @@ SPECDATA = [1.0, 2.0, 3.0, 4.0] -def get_mock_pv_value(pv_name, to_string, use_numpy): +def get_mock_pv_value( + name: str, + to_string: bool = False, + attempts: int = 3, + is_local: bool = False, + use_numpy: bool | None = None, +): """ Mock method for testing changes to DAE settings. It returns example XML data if the pv name is one of DAESETTINGS, TCBSETTINGS or HARDWAREPERIODS. Args: - pv_name: the name of the pv - to_string: whether to convert the value to a string. Not used in this method, but included since the method + name: the name of the pv + all other args: Not used in this method, but included since the method it is mocking is called with this keyword argument. Returns: @@ -139,8 +155,9 @@ def get_mock_pv_value(pv_name, to_string, use_numpy): "DAE:DAESETTINGS": DAE_SETTINGS_XML, "DAE:TCBSETTINGS": compress_and_hex(TCB_SETTINGS_XML), "DAE:HARDWAREPERIODS": PERIOD_SETTINGS_XML, + "DAE:UPDATESETTINGS": UPDATE_SETTINGS_XML, } - return mock_data[pv_name] + return mock_data[name] class TestGenieDAE(unittest.TestCase): @@ -408,6 +425,25 @@ def test_WHEN_change_vetos_called_with_clearall_false_THEN_nothing_happens(self) def test_WHEN_change_vetos_called_with_unknown_veto_THEN_exception_thrown(self): self.assertRaises(Exception, self.dae.change_vetos, bad_veto=True) + def test_WHEN_change_autosave_frequency_called_THEN_freq_is_set(self): + self.dae.in_change = False + self.dae.get_run_state = MagicMock(return_value="SETUP") + self.dae.in_transition = MagicMock(return_value=False) + self.dae.api.get_pv_value = get_mock_pv_value + + self.dae.change_autosave_freq(1.0) + + func = self.api.set_pv_value + + check_xml = ( + b"\n DAE Updates\n 3\n" + b" \n Frequency\n 1.0" + b"\n \n" + ) + + func.assert_called_with("DAE:UPDATESETTINGS:SP", check_xml, True) + self.assertEqual(self.dae.in_change, False) + def test_WHEN_fifo_veto_enabled_at_runtime_THEN_correct_PV_set_with_correct_value(self): self.dae.change_vetos(fifo=True)