From 8c2f0ec0b0810a94d4472af491f1f1ea6ec75192 Mon Sep 17 00:00:00 2001 From: Iain Hall Date: Tue, 16 Dec 2025 11:38:35 +0000 Subject: [PATCH 1/3] Updates for trajectory fly scans --- .../i20_1/plans/__init__.py | 12 +- .../i20_1/plans/direct_turbo_slit_movement.py | 247 +++++++++--------- 2 files changed, 127 insertions(+), 132 deletions(-) diff --git a/src/spectroscopy_bluesky/i20_1/plans/__init__.py b/src/spectroscopy_bluesky/i20_1/plans/__init__.py index cc38001..7d4d1b8 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/__init__.py +++ b/src/spectroscopy_bluesky/i20_1/plans/__init__.py @@ -1,3 +1,11 @@ -from .direct_turbo_slit_movement import fly_scan_ts, fly_sweep, seq_non_linear +from .direct_turbo_slit_movement import ( + fly_scan_ts, + fly_sweep, + fly_sweep_both_ways, + seq_non_linear, + seq_table, + trajectory_fly_scan, +) -__all__ = ["fly_scan_ts", "fly_sweep", "seq_non_linear"] +__all__ = ["fly_scan_ts", "fly_sweep", "fly_sweep_both_ways", + "seq_non_linear", "seq_table", "trajectory_fly_scan"] diff --git a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py index 07b4e59..a9d2b56 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py +++ b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py @@ -1,12 +1,13 @@ import asyncio import math as mt +from itertools import pairwise import bluesky.plan_stubs as bps import bluesky.preprocessors as bpp import numpy as np from aioca import caput from bluesky.utils import MsgGenerator -from dodal.beamlines.i20_1 import turbo_slit +from dodal.beamlines.i20_1 import turbo_slit_pmac from dodal.common.coordination import inject from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator from ophyd_async.core import ( @@ -17,9 +18,9 @@ YamlSettingsProvider, wait_for_value, ) +from ophyd_async.epics.core import epics_signal_rw from ophyd_async.epics.motor import Motor from ophyd_async.epics.pmac import ( - PmacIO, PmacTrajectoryTriggerLogic, ) from ophyd_async.fastcs.panda import ( @@ -39,12 +40,20 @@ retrieve_settings, store_settings, ) -from scanspec.specs import Fly, Line +from scanspec.specs import Fly, Line, Spec -PATH = "/dls/i20-1/data/2023/cm33897-5/bluesky/" +from spectroscopy_bluesky.common.quantity_conversion import ( + energy_to_bragg_angle, + si_111_lattice_spacing, +) MRES = -1 / 10000 + # default offset count to be applied when converting user positions to encoder counts. +ENCODER_OFFSET_COUNTS = 0 + +def get_encoder_counts(user_position, offset=ENCODER_OFFSET_COUNTS): + return user_position/MRES + offset class _StaticPcompTriggerLogic(StaticPcompTriggerLogic): """For controlling the PandA `PcompBlock` when flyscanning.""" @@ -73,7 +82,7 @@ def calculate_stuff(start, stop, num): width = (stop - start) / (num - 1) direction_of_sweep = ( PandaPcompDirection.POSITIVE - if width / MRES > 0 + if get_encoder_counts(width, 0) > 0 else PandaPcompDirection.NEGATIVE ) @@ -81,8 +90,8 @@ def calculate_stuff(start, stop, num): def get_pcomp_info(width, start_pos, direction_of_sweep: PandaPcompDirection, num): - start_pos_pcomp = mt.floor(start_pos / MRES) - rising_edge_step = mt.ceil(abs(width / MRES)) + start_pos_pcomp = mt.floor(get_encoder_counts(start_pos)) + rising_edge_step = mt.ceil(abs(get_encoder_counts(width, 0))) panda_pcomp_info = PcompInfo( start_postion=start_pos_pcomp, @@ -94,12 +103,25 @@ def get_pcomp_info(width, start_pos, direction_of_sweep: PandaPcompDirection, nu return panda_pcomp_info +def setup_trajectory_scan_pvs(prefix : str = "BL20J-MO-STEP-06") : + """ + Set PV values on trajectory scan controller needed for scan to work + (axis label to X, and profile name to PMAC6CS3) + """ + cs_axis_label = epics_signal_rw(str, prefix+":M4:CsAxis", name="cs_axis_label") + cs_profile_name= epics_signal_rw(str, prefix+":ProfileCsName", name="cs_profile_name") + yield from ensure_connected(cs_axis_label, cs_profile_name) + + # set the CS axis label and profile names + yield from bps.mv(cs_axis_label, "X", cs_profile_name, "PMAC6CS3") + yield from bps.sleep(0.5) # wait for the records to update def fly_scan_ts( start: int, stop: int, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 ) -> MsgGenerator: panda_pcomp = StandardFlyer(StaticPcompTriggerLogic(panda.pcomp[1])) @@ -108,7 +130,6 @@ def fly_scan_ts( @bpp.run_decorator() @bpp.stage_decorator([panda, panda_pcomp]) def inner_plan(): - motor = turbo_slit().xfine width = (stop - start) / (num - 1) start_pos = start - (width / 2) stop_pos = stop + (width / 2) @@ -118,12 +139,12 @@ def inner_plan(): time_for_move=num * duration, ) panda_pcomp_info = PcompInfo( - start_postion=mt.ceil(start_pos / (MRES)), + start_postion=mt.ceil(get_encoder_counts(start_pos)), pulse_width=1, - rising_edge_step=mt.ceil(abs(width / MRES)), + rising_edge_step=mt.ceil(abs(get_encoder_counts(width, 0))), number_of_pulses=num, direction=PandaPcompDirection.POSITIVE - if width / MRES > 0 + if get_encoder_counts(width) > 0 else PandaPcompDirection.NEGATIVE, ) @@ -150,6 +171,7 @@ def fly_sweep( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, runup: float = 0.0, @@ -174,7 +196,6 @@ def inner_squared_plan(start: float | int, stop: float | int): panda_pcomp_info = get_pcomp_info(width, start_pos, direction_of_sweep, num) - motor = turbo_slit().xfine # move motor to initial position yield from bps.prepare(motor, motor_info, wait=True) @@ -219,6 +240,7 @@ def fly_sweep_both_ways( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, ) -> MsgGenerator: @@ -233,8 +255,6 @@ def inner_squared_plan(start: float, stop: float, panda_pcomp: StandardFlyer): time_for_move=num * duration, ) - motor = turbo_slit().xfine - # move motor to initial position yield from bps.prepare(motor, motor_info, wait=True) @@ -267,7 +287,6 @@ def inner_plan(): time_for_move=num * duration, ) - motor = turbo_slit().xfine yield from bps.prepare(motor, motor_info, wait=True) # prepare both pcomps @@ -305,6 +324,7 @@ def trajectory_fly_scan( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, ) -> MsgGenerator: @@ -314,19 +334,19 @@ def trajectory_fly_scan( panda_pcomp1 = StandardFlyer(_StaticPcompTriggerLogic(panda.pcomp[1])) panda_pcomp2 = StandardFlyer(_StaticPcompTriggerLogic(panda.pcomp[2])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) + pmac = turbo_slit_pmac() yield from ensure_connected(pmac, motor) + yield from setup_trajectory_scan_pvs() + spec = Fly(float(duration) @ (Line(motor, start, stop, num))) trigger_logic = spec pmac_trajectory = PmacTrajectoryTriggerLogic(pmac) pmac_trajectory_flyer = StandardFlyer(pmac_trajectory) + @attach_data_session_metadata_decorator() @bpp.run_decorator() @bpp.stage_decorator([panda, panda_pcomp1, panda_pcomp2]) @@ -365,172 +385,134 @@ def inner_plan(): yield from inner_plan() +# run_plan("seq_table", start=5, stop=10, num=11, duration=0.1, number_of_sweeps=2) def seq_table( start: float, stop: float, - num: int, + num_readouts: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 3, restore: bool = False, -): - # Start the plan by loading the saved design for this scan - if restore: - yield from plan_restore_settings(panda=panda, name="seq_table") - - # Defining the flyers and components of the scan - panda_seq = StandardFlyer(StaticSeqTableTriggerLogic(panda.seq[1])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) - yield from ensure_connected(pmac, motor) - - # Prepare Panda trigger info using sequencer table - direction = SeqTrigger.POSA_GT - if start < stop: - direction = SeqTrigger.POSA_LT +) -> MsgGenerator: # Prepare motor info using trajectory scanning - spec = Fly(duration @ (number_of_sweeps * ~Line(motor, start, stop, num))) + spec = Fly(duration @ (number_of_sweeps * ~Line(motor, start, stop, num_readouts))) - positions = [(x / MRES).astype(int) for x in spec.frames().lower[motor]] + positions = spec.frames().lower[motor] - direction = [ - SeqTrigger.POSA_GT if a * MRES < b * MRES else SeqTrigger.POSA_LT - for a, b in zip( - spec.frames().lower[motor], spec.frames().upper[motor], strict=True - ) - ] + # Sequence table has position triggers for one back-and-forth sweep. + # Use with multiple repetitions of sequence table to capture subsequent sweeps. + num_repeats = 1 + if number_of_sweeps > 2: + positions = positions[:2*num_readouts] + num_repeats = mt.ceil(number_of_sweeps/2) - table = SeqTable() # type: ignore - counter = 0 - for d, p in zip(direction, positions, strict=False): - table += SeqTable.row( - repeats=1, - trigger=d, - position=p, - time1=int(duration / 1e-6) - 1, - outa1=True, - time2=1, - outa2=False, - ) - - counter += 1 - - seq_table_info = SeqTableInfo(sequence_table=table, repeats=1, prescale_as_us=1) - - trigger_logic = spec - pmac_trajectory = PmacTrajectoryTriggerLogic(pmac) - pmac_trajectory_flyer = StandardFlyer(pmac_trajectory) - - # Prepare Panda file writer trigger info - panda_hdf_info = TriggerInfo( - number_of_events=len(positions), - trigger=DetectorTrigger.CONSTANT_GATE, - livetime=duration, - deadtime=1e-5, + table = create_seqtable( + positions, time1=1, outa1=True, time2=1, outa2=False ) - @attach_data_session_metadata_decorator() - @bpp.run_decorator() - @bpp.stage_decorator([panda, panda_seq]) - def inner_plan(): - # Prepare pmac with the trajectory - yield from bps.prepare(pmac_trajectory_flyer, trigger_logic, wait=True) - # prepare sequencer table - yield from bps.prepare(panda_seq, seq_table_info, wait=True) - # prepare panda and hdf writer once, at start of scan - yield from bps.prepare(panda, panda_hdf_info, wait=True) - - # kickoff devices waiting for all of them - yield from bps.kickoff(panda, wait=True) - yield from bps.kickoff(panda_seq, wait=True) - yield from bps.kickoff(pmac_trajectory_flyer, wait=True) - - yield from bps.complete_all(pmac_trajectory_flyer, panda_seq, panda, wait=True) - - yield from inner_plan() - + seq_table_info = SeqTableInfo( + sequence_table=table, repeats=num_repeats, prescale_as_us=1 + ) -def Si111_energies_to_Bragg(energy_array): - angles = np.degrees(np.arcsin(1977.59 / np.asarray(energy_array))) - return angles + yield from seq_table_scan( + spec, seq_table_info, motor=motor, panda=panda, restore=restore + ) +# run_plan("seq_non_linear", ei=6000.0, ef=10000.0, de=100.0, duration=0.1) def seq_non_linear( ei: float, ef: float, de: float, duration: float, - panda: HDFPanda = inject("panda"), # noqa: B008) + motor: Motor = inject("turbo_slit_x"), # noqa: B008 + panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, -): +) -> MsgGenerator: # Start the plan by loading the saved design for this scan - if restore: - yield from plan_restore_settings(panda=panda, name="seq_table") - - # Defining the flyers and components of the scan - panda_seq = StandardFlyer(StaticSeqTableTriggerLogic(panda.seq[1])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) - yield from ensure_connected(pmac, motor, panda) energies = np.arange(ei, ef + de, de) # include Ef as last point in the array print(f"param\nEi = {ei}, Ef = {ef}, dE = {de}\n") - angle = Si111_energies_to_Bragg(energies) - energies = np.arange(ei, ef + de, de) + angle = energy_to_bragg_angle(si_111_lattice_spacing, energies) # Prepare motor info using trajectory scanning - spec = Fly(float(duration) @ (Line(motor, angle[0], angle[-1], len(angle)))) - positions = [(x / MRES).astype(int) for x in angle] + spec = Fly(duration @ (Line(motor, angle[0], angle[-1], len(angle)))) + + table = create_seqtable( + angle, time1=1, time2=1, outa1=True, outb1=True, outa2=False, outb2=True + ) + seq_table_info = SeqTableInfo(sequence_table=table, repeats=1, prescale_as_us=1) + yield from seq_table_scan(spec, seq_table_info, motor, panda, restore=restore) + + +def create_seqtable(positions: list[float], **kwargs) -> SeqTable: + + # convert user positions to encoder positions + enc_count_positions = [get_encoder_counts(x).astype(int) for x in positions] + + #determine direction of each segment direction = [ - SeqTrigger.POSA_GT if a * MRES < b * MRES else SeqTrigger.POSA_LT - for a, b in zip( - spec.frames().lower[motor], spec.frames().upper[motor], strict=True - ) + SeqTrigger.POSA_GT if current < next else SeqTrigger.POSA_LT + for current, next in pairwise(enc_count_positions) ] + direction.append(direction[-1]) - table = SeqTable() # type: ignore - for d, p in zip(direction, positions, strict=True): + table = SeqTable() + for d, p in zip(direction, enc_count_positions, strict=False): table += SeqTable.row( repeats=1, trigger=d, position=p, - time1=1, - outa1=True, - outb1=True, - time2=1, - outa2=False, - outb2=True, + **kwargs ) + return table - # counter += 1 - seq_table_info = SeqTableInfo(sequence_table=table, repeats=1, prescale_as_us=1) +def seq_table_scan( + scan_spec: Spec, + seq_table_info: SeqTableInfo, + motor: Motor, + panda: HDFPanda, + restore: bool = False, +) -> MsgGenerator: + + if restore: + yield from plan_restore_settings(panda=panda, name="seq_table") + + pmac = turbo_slit_pmac() + + yield from ensure_connected(pmac, motor, panda) + + yield from setup_trajectory_scan_pvs() + + # Defining the flyers and components of the scan + panda_seq = StandardFlyer(StaticSeqTableTriggerLogic(panda.seq[1])) - trigger_logic = spec pmac_trajectory = PmacTrajectoryTriggerLogic(pmac) pmac_trajectory_flyer = StandardFlyer(pmac_trajectory) # Prepare Panda file writer trigger info panda_hdf_info = TriggerInfo( - number_of_events=len(angle), + number_of_events=len(seq_table_info.sequence_table), # same as number of rows in sequence table trigger=DetectorTrigger.CONSTANT_GATE, - livetime=duration, + livetime=scan_spec.duration(), deadtime=1e-5, ) + metadata = {"detectors": {panda.name}} @attach_data_session_metadata_decorator() - @bpp.run_decorator() + @bpp.run_decorator(md=metadata) # can pass metadata in as parameter @bpp.stage_decorator([panda, panda_seq]) def inner_plan(): + yield from bps.declare_stream(panda, name="primary") + # Prepare pmac with the trajectory - yield from bps.prepare(pmac_trajectory_flyer, trigger_logic, wait=True) + yield from bps.prepare(pmac_trajectory_flyer, scan_spec, wait=True) # prepare sequencer table yield from bps.prepare(panda_seq, seq_table_info, wait=True) # prepare panda and hdf writer once, at start of scan @@ -542,6 +524,11 @@ def inner_plan(): yield from bps.kickoff(pmac_trajectory_flyer, wait=True) yield from bps.complete_all(pmac_trajectory_flyer, panda_seq, panda, wait=True) + yield from bps.collect( + panda, + return_payload=True, + name="primary", + ) yield from inner_plan() From 2f052da22643275a9931b4a262d3e188e1fa73e4 Mon Sep 17 00:00:00 2001 From: Iain Hall Date: Tue, 16 Dec 2025 12:54:36 +0000 Subject: [PATCH 2/3] Reformat with ruff and fix some type errors --- .../i20_1/plans/direct_turbo_slit_movement.py | 67 +++++++++---------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py index a9d2b56..c380f35 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py +++ b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py @@ -10,6 +10,7 @@ from dodal.beamlines.i20_1 import turbo_slit_pmac from dodal.common.coordination import inject from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator +from numpy.typing import NDArray from ophyd_async.core import ( DetectorTrigger, FlyMotorInfo, @@ -49,11 +50,13 @@ MRES = -1 / 10000 - # default offset count to be applied when converting user positions to encoder counts. +# default offset count to be applied when converting user positions to encoder counts. ENCODER_OFFSET_COUNTS = 0 + def get_encoder_counts(user_position, offset=ENCODER_OFFSET_COUNTS): - return user_position/MRES + offset + return user_position / MRES + offset + class _StaticPcompTriggerLogic(StaticPcompTriggerLogic): """For controlling the PandA `PcompBlock` when flyscanning.""" @@ -103,25 +106,29 @@ def get_pcomp_info(width, start_pos, direction_of_sweep: PandaPcompDirection, nu return panda_pcomp_info -def setup_trajectory_scan_pvs(prefix : str = "BL20J-MO-STEP-06") : + +def setup_trajectory_scan_pvs(prefix: str = "BL20J-MO-STEP-06"): """ - Set PV values on trajectory scan controller needed for scan to work + Set PV values on trajectory scan controller needed for scan to work (axis label to X, and profile name to PMAC6CS3) """ - cs_axis_label = epics_signal_rw(str, prefix+":M4:CsAxis", name="cs_axis_label") - cs_profile_name= epics_signal_rw(str, prefix+":ProfileCsName", name="cs_profile_name") + cs_axis_label = epics_signal_rw(str, prefix + ":M4:CsAxis", name="cs_axis_label") + cs_profile_name = epics_signal_rw( + str, prefix + ":ProfileCsName", name="cs_profile_name" + ) yield from ensure_connected(cs_axis_label, cs_profile_name) # set the CS axis label and profile names yield from bps.mv(cs_axis_label, "X", cs_profile_name, "PMAC6CS3") - yield from bps.sleep(0.5) # wait for the records to update + yield from bps.sleep(0.5) # wait for the records to update + def fly_scan_ts( start: int, stop: int, num: int, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 ) -> MsgGenerator: panda_pcomp = StandardFlyer(StaticPcompTriggerLogic(panda.pcomp[1])) @@ -171,7 +178,7 @@ def fly_sweep( stop: float, num: int, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, runup: float = 0.0, @@ -240,7 +247,7 @@ def fly_sweep_both_ways( stop: float, num: int, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, ) -> MsgGenerator: @@ -324,7 +331,7 @@ def trajectory_fly_scan( stop: float, num: int, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, ) -> MsgGenerator: @@ -346,7 +353,6 @@ def trajectory_fly_scan( pmac_trajectory = PmacTrajectoryTriggerLogic(pmac) pmac_trajectory_flyer = StandardFlyer(pmac_trajectory) - @attach_data_session_metadata_decorator() @bpp.run_decorator() @bpp.stage_decorator([panda, panda_pcomp1, panda_pcomp2]) @@ -391,27 +397,24 @@ def seq_table( stop: float, num_readouts: int, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 3, restore: bool = False, ) -> MsgGenerator: - # Prepare motor info using trajectory scanning spec = Fly(duration @ (number_of_sweeps * ~Line(motor, start, stop, num_readouts))) positions = spec.frames().lower[motor] - # Sequence table has position triggers for one back-and-forth sweep. + # Sequence table has position triggers for one back-and-forth sweep. # Use with multiple repetitions of sequence table to capture subsequent sweeps. num_repeats = 1 if number_of_sweeps > 2: - positions = positions[:2*num_readouts] - num_repeats = mt.ceil(number_of_sweeps/2) + positions = positions[: 2 * num_readouts] + num_repeats = mt.ceil(number_of_sweeps / 2) - table = create_seqtable( - positions, time1=1, outa1=True, time2=1, outa2=False - ) + table = create_seqtable(positions, time1=1, outa1=True, time2=1, outa2=False) seq_table_info = SeqTableInfo( sequence_table=table, repeats=num_repeats, prescale_as_us=1 @@ -428,7 +431,7 @@ def seq_non_linear( ef: float, de: float, duration: float, - motor: Motor = inject("turbo_slit_x"), # noqa: B008 + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, ) -> MsgGenerator: @@ -450,12 +453,11 @@ def seq_non_linear( yield from seq_table_scan(spec, seq_table_info, motor, panda, restore=restore) -def create_seqtable(positions: list[float], **kwargs) -> SeqTable: - +def create_seqtable(positions: NDArray, **kwargs) -> SeqTable: # convert user positions to encoder positions enc_count_positions = [get_encoder_counts(x).astype(int) for x in positions] - #determine direction of each segment + # determine direction of each segment direction = [ SeqTrigger.POSA_GT if current < next else SeqTrigger.POSA_LT for current, next in pairwise(enc_count_positions) @@ -464,23 +466,17 @@ def create_seqtable(positions: list[float], **kwargs) -> SeqTable: table = SeqTable() for d, p in zip(direction, enc_count_positions, strict=False): - table += SeqTable.row( - repeats=1, - trigger=d, - position=p, - **kwargs - ) + table += SeqTable.row(repeats=1, trigger=d, position=p, **kwargs) return table def seq_table_scan( - scan_spec: Spec, + scan_spec: Fly, seq_table_info: SeqTableInfo, motor: Motor, panda: HDFPanda, restore: bool = False, ) -> MsgGenerator: - if restore: yield from plan_restore_settings(panda=panda, name="seq_table") @@ -498,15 +494,16 @@ def seq_table_scan( # Prepare Panda file writer trigger info panda_hdf_info = TriggerInfo( - number_of_events=len(seq_table_info.sequence_table), # same as number of rows in sequence table + number_of_events=len( + seq_table_info.sequence_table + ), # same as number of rows in sequence table trigger=DetectorTrigger.CONSTANT_GATE, livetime=scan_spec.duration(), deadtime=1e-5, ) - metadata = {"detectors": {panda.name}} @attach_data_session_metadata_decorator() - @bpp.run_decorator(md=metadata) # can pass metadata in as parameter + @bpp.run_decorator() @bpp.stage_decorator([panda, panda_seq]) def inner_plan(): yield from bps.declare_stream(panda, name="primary") From da4cd6a96abda8768992b6ba81823596ab4ad5ba Mon Sep 17 00:00:00 2001 From: Iain Hall Date: Mon, 5 Jan 2026 11:52:47 +0000 Subject: [PATCH 3/3] Add some comments to functions, some minor tidying up --- .../i20_1/plans/direct_turbo_slit_movement.py | 36 ++++++++++++++++--- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py index c380f35..5c11d38 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py +++ b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py @@ -41,20 +41,32 @@ retrieve_settings, store_settings, ) -from scanspec.specs import Fly, Line, Spec +from scanspec.specs import Fly, Line from spectroscopy_bluesky.common.quantity_conversion import ( energy_to_bragg_angle, si_111_lattice_spacing, ) +# Motor resolution used to conert between user position and motor encoder counts MRES = -1 / 10000 # default offset count to be applied when converting user positions to encoder counts. ENCODER_OFFSET_COUNTS = 0 -def get_encoder_counts(user_position, offset=ENCODER_OFFSET_COUNTS): +def get_encoder_counts(user_position: float, offset=ENCODER_OFFSET_COUNTS) -> float: + """Convert from user position to motor encoder counts + (using MRES global variable). + + Args: + user_position : user position + offset (optional): Count offset to be added after the conversion. + Defaults to ENCODER_OFFSET_COUNTS (=0). + + Returns: + motor encoder counts + """ return user_position / MRES + offset @@ -391,7 +403,6 @@ def inner_plan(): yield from inner_plan() -# run_plan("seq_table", start=5, stop=10, num=11, duration=0.1, number_of_sweeps=2) def seq_table( start: float, stop: float, @@ -408,7 +419,7 @@ def seq_table( positions = spec.frames().lower[motor] # Sequence table has position triggers for one back-and-forth sweep. - # Use with multiple repetitions of sequence table to capture subsequent sweeps. + # Use multiple repetitions of sequence table to capture subsequent sweeps. num_repeats = 1 if number_of_sweeps > 2: positions = positions[: 2 * num_readouts] @@ -454,6 +465,21 @@ def seq_non_linear( def create_seqtable(positions: NDArray, **kwargs) -> SeqTable: + """ + Create SeqTable with rows setup to do position based triggering. + +
  • Each position in positions NDArray is converted to a row of the sequence table. +
  • Position values are converted to encoder counts using + 'get_encoder_counts' function. +
  • SeqTrigger direction set to GT or LT depending on when encoder values + increase or decrease. + + :param positions: positions in user coordinates. + :param kwargs: additional kwargs to be used when generating each + row of sequence table (e.g. for setting trigger outputs, trigger length etc.) + :return: SeqTable + """ + # convert user positions to encoder positions enc_count_positions = [get_encoder_counts(x).astype(int) for x in positions] @@ -465,7 +491,7 @@ def create_seqtable(positions: NDArray, **kwargs) -> SeqTable: direction.append(direction[-1]) table = SeqTable() - for d, p in zip(direction, enc_count_positions, strict=False): + for d, p in zip(direction, enc_count_positions, strict=True): table += SeqTable.row(repeats=1, trigger=d, position=p, **kwargs) return table