diff --git a/src/spectroscopy_bluesky/i20_1/plans/__init__.py b/src/spectroscopy_bluesky/i20_1/plans/__init__.py index cc38001..7d4d1b8 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/__init__.py +++ b/src/spectroscopy_bluesky/i20_1/plans/__init__.py @@ -1,3 +1,11 @@ -from .direct_turbo_slit_movement import fly_scan_ts, fly_sweep, seq_non_linear +from .direct_turbo_slit_movement import ( + fly_scan_ts, + fly_sweep, + fly_sweep_both_ways, + seq_non_linear, + seq_table, + trajectory_fly_scan, +) -__all__ = ["fly_scan_ts", "fly_sweep", "seq_non_linear"] +__all__ = ["fly_scan_ts", "fly_sweep", "fly_sweep_both_ways", + "seq_non_linear", "seq_table", "trajectory_fly_scan"] diff --git a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py index 07b4e59..5c11d38 100644 --- a/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py +++ b/src/spectroscopy_bluesky/i20_1/plans/direct_turbo_slit_movement.py @@ -1,14 +1,16 @@ import asyncio import math as mt +from itertools import pairwise import bluesky.plan_stubs as bps import bluesky.preprocessors as bpp import numpy as np from aioca import caput from bluesky.utils import MsgGenerator -from dodal.beamlines.i20_1 import turbo_slit +from dodal.beamlines.i20_1 import turbo_slit_pmac from dodal.common.coordination import inject from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator +from numpy.typing import NDArray from ophyd_async.core import ( DetectorTrigger, FlyMotorInfo, @@ -17,9 +19,9 @@ YamlSettingsProvider, wait_for_value, ) +from ophyd_async.epics.core import epics_signal_rw from ophyd_async.epics.motor import Motor from ophyd_async.epics.pmac import ( - PmacIO, PmacTrajectoryTriggerLogic, ) from ophyd_async.fastcs.panda import ( @@ -41,10 +43,32 @@ ) from scanspec.specs import Fly, Line -PATH = "/dls/i20-1/data/2023/cm33897-5/bluesky/" +from spectroscopy_bluesky.common.quantity_conversion import ( + energy_to_bragg_angle, + si_111_lattice_spacing, +) +# Motor resolution used to conert between user position and motor encoder counts MRES = -1 / 10000 +# default offset count to be applied when converting user positions to encoder counts. +ENCODER_OFFSET_COUNTS = 0 + + +def get_encoder_counts(user_position: float, offset=ENCODER_OFFSET_COUNTS) -> float: + """Convert from user position to motor encoder counts + (using MRES global variable). + + Args: + user_position : user position + offset (optional): Count offset to be added after the conversion. + Defaults to ENCODER_OFFSET_COUNTS (=0). + + Returns: + motor encoder counts + """ + return user_position / MRES + offset + class _StaticPcompTriggerLogic(StaticPcompTriggerLogic): """For controlling the PandA `PcompBlock` when flyscanning.""" @@ -73,7 +97,7 @@ def calculate_stuff(start, stop, num): width = (stop - start) / (num - 1) direction_of_sweep = ( PandaPcompDirection.POSITIVE - if width / MRES > 0 + if get_encoder_counts(width, 0) > 0 else PandaPcompDirection.NEGATIVE ) @@ -81,8 +105,8 @@ def calculate_stuff(start, stop, num): def get_pcomp_info(width, start_pos, direction_of_sweep: PandaPcompDirection, num): - start_pos_pcomp = mt.floor(start_pos / MRES) - rising_edge_step = mt.ceil(abs(width / MRES)) + start_pos_pcomp = mt.floor(get_encoder_counts(start_pos)) + rising_edge_step = mt.ceil(abs(get_encoder_counts(width, 0))) panda_pcomp_info = PcompInfo( start_postion=start_pos_pcomp, @@ -95,11 +119,28 @@ def get_pcomp_info(width, start_pos, direction_of_sweep: PandaPcompDirection, nu return panda_pcomp_info +def setup_trajectory_scan_pvs(prefix: str = "BL20J-MO-STEP-06"): + """ + Set PV values on trajectory scan controller needed for scan to work + (axis label to X, and profile name to PMAC6CS3) + """ + cs_axis_label = epics_signal_rw(str, prefix + ":M4:CsAxis", name="cs_axis_label") + cs_profile_name = epics_signal_rw( + str, prefix + ":ProfileCsName", name="cs_profile_name" + ) + yield from ensure_connected(cs_axis_label, cs_profile_name) + + # set the CS axis label and profile names + yield from bps.mv(cs_axis_label, "X", cs_profile_name, "PMAC6CS3") + yield from bps.sleep(0.5) # wait for the records to update + + def fly_scan_ts( start: int, stop: int, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 ) -> MsgGenerator: panda_pcomp = StandardFlyer(StaticPcompTriggerLogic(panda.pcomp[1])) @@ -108,7 +149,6 @@ def fly_scan_ts( @bpp.run_decorator() @bpp.stage_decorator([panda, panda_pcomp]) def inner_plan(): - motor = turbo_slit().xfine width = (stop - start) / (num - 1) start_pos = start - (width / 2) stop_pos = stop + (width / 2) @@ -118,12 +158,12 @@ def inner_plan(): time_for_move=num * duration, ) panda_pcomp_info = PcompInfo( - start_postion=mt.ceil(start_pos / (MRES)), + start_postion=mt.ceil(get_encoder_counts(start_pos)), pulse_width=1, - rising_edge_step=mt.ceil(abs(width / MRES)), + rising_edge_step=mt.ceil(abs(get_encoder_counts(width, 0))), number_of_pulses=num, direction=PandaPcompDirection.POSITIVE - if width / MRES > 0 + if get_encoder_counts(width) > 0 else PandaPcompDirection.NEGATIVE, ) @@ -150,6 +190,7 @@ def fly_sweep( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, runup: float = 0.0, @@ -174,7 +215,6 @@ def inner_squared_plan(start: float | int, stop: float | int): panda_pcomp_info = get_pcomp_info(width, start_pos, direction_of_sweep, num) - motor = turbo_slit().xfine # move motor to initial position yield from bps.prepare(motor, motor_info, wait=True) @@ -219,6 +259,7 @@ def fly_sweep_both_ways( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 5, ) -> MsgGenerator: @@ -233,8 +274,6 @@ def inner_squared_plan(start: float, stop: float, panda_pcomp: StandardFlyer): time_for_move=num * duration, ) - motor = turbo_slit().xfine - # move motor to initial position yield from bps.prepare(motor, motor_info, wait=True) @@ -267,7 +306,6 @@ def inner_plan(): time_for_move=num * duration, ) - motor = turbo_slit().xfine yield from bps.prepare(motor, motor_info, wait=True) # prepare both pcomps @@ -305,6 +343,7 @@ def trajectory_fly_scan( stop: float, num: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, ) -> MsgGenerator: @@ -314,13 +353,12 @@ def trajectory_fly_scan( panda_pcomp1 = StandardFlyer(_StaticPcompTriggerLogic(panda.pcomp[1])) panda_pcomp2 = StandardFlyer(_StaticPcompTriggerLogic(panda.pcomp[2])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) + pmac = turbo_slit_pmac() yield from ensure_connected(pmac, motor) + yield from setup_trajectory_scan_pvs() + spec = Fly(float(duration) @ (Line(motor, start, stop, num))) trigger_logic = spec @@ -368,160 +406,125 @@ def inner_plan(): def seq_table( start: float, stop: float, - num: int, + num_readouts: int, duration: float, + motor: Motor = inject("turbo_slit_x"), # noqa: B008 panda: HDFPanda = inject("panda"), # noqa: B008 number_of_sweeps: int = 3, restore: bool = False, -): - # Start the plan by loading the saved design for this scan - if restore: - yield from plan_restore_settings(panda=panda, name="seq_table") - - # Defining the flyers and components of the scan - panda_seq = StandardFlyer(StaticSeqTableTriggerLogic(panda.seq[1])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) - yield from ensure_connected(pmac, motor) - - # Prepare Panda trigger info using sequencer table - direction = SeqTrigger.POSA_GT - if start < stop: - direction = SeqTrigger.POSA_LT - +) -> MsgGenerator: # Prepare motor info using trajectory scanning - spec = Fly(duration @ (number_of_sweeps * ~Line(motor, start, stop, num))) + spec = Fly(duration @ (number_of_sweeps * ~Line(motor, start, stop, num_readouts))) - positions = [(x / MRES).astype(int) for x in spec.frames().lower[motor]] + positions = spec.frames().lower[motor] - direction = [ - SeqTrigger.POSA_GT if a * MRES < b * MRES else SeqTrigger.POSA_LT - for a, b in zip( - spec.frames().lower[motor], spec.frames().upper[motor], strict=True - ) - ] + # Sequence table has position triggers for one back-and-forth sweep. + # Use multiple repetitions of sequence table to capture subsequent sweeps. + num_repeats = 1 + if number_of_sweeps > 2: + positions = positions[: 2 * num_readouts] + num_repeats = mt.ceil(number_of_sweeps / 2) - table = SeqTable() # type: ignore - counter = 0 - for d, p in zip(direction, positions, strict=False): - table += SeqTable.row( - repeats=1, - trigger=d, - position=p, - time1=int(duration / 1e-6) - 1, - outa1=True, - time2=1, - outa2=False, - ) - - counter += 1 - - seq_table_info = SeqTableInfo(sequence_table=table, repeats=1, prescale_as_us=1) + table = create_seqtable(positions, time1=1, outa1=True, time2=1, outa2=False) - trigger_logic = spec - pmac_trajectory = PmacTrajectoryTriggerLogic(pmac) - pmac_trajectory_flyer = StandardFlyer(pmac_trajectory) - - # Prepare Panda file writer trigger info - panda_hdf_info = TriggerInfo( - number_of_events=len(positions), - trigger=DetectorTrigger.CONSTANT_GATE, - livetime=duration, - deadtime=1e-5, + seq_table_info = SeqTableInfo( + sequence_table=table, repeats=num_repeats, prescale_as_us=1 ) - @attach_data_session_metadata_decorator() - @bpp.run_decorator() - @bpp.stage_decorator([panda, panda_seq]) - def inner_plan(): - # Prepare pmac with the trajectory - yield from bps.prepare(pmac_trajectory_flyer, trigger_logic, wait=True) - # prepare sequencer table - yield from bps.prepare(panda_seq, seq_table_info, wait=True) - # prepare panda and hdf writer once, at start of scan - yield from bps.prepare(panda, panda_hdf_info, wait=True) - - # kickoff devices waiting for all of them - yield from bps.kickoff(panda, wait=True) - yield from bps.kickoff(panda_seq, wait=True) - yield from bps.kickoff(pmac_trajectory_flyer, wait=True) - - yield from bps.complete_all(pmac_trajectory_flyer, panda_seq, panda, wait=True) - - yield from inner_plan() - - -def Si111_energies_to_Bragg(energy_array): - angles = np.degrees(np.arcsin(1977.59 / np.asarray(energy_array))) - return angles + yield from seq_table_scan( + spec, seq_table_info, motor=motor, panda=panda, restore=restore + ) +# run_plan("seq_non_linear", ei=6000.0, ef=10000.0, de=100.0, duration=0.1) def seq_non_linear( ei: float, ef: float, de: float, duration: float, - panda: HDFPanda = inject("panda"), # noqa: B008) + motor: Motor = inject("turbo_slit_x"), # noqa: B008 + panda: HDFPanda = inject("panda"), # noqa: B008 restore: bool = False, -): +) -> MsgGenerator: # Start the plan by loading the saved design for this scan - if restore: - yield from plan_restore_settings(panda=panda, name="seq_table") - - # Defining the flyers and components of the scan - panda_seq = StandardFlyer(StaticSeqTableTriggerLogic(panda.seq[1])) - motor = Motor(prefix="BL20J-OP-PCHRO-01:TS:XFINE", name="X") - pmac = PmacIO( - prefix="BL20J-MO-STEP-06:", raw_motors=[motor], coord_nums=[3], name="pmac" - ) - yield from ensure_connected(pmac, motor, panda) energies = np.arange(ei, ef + de, de) # include Ef as last point in the array print(f"param\nEi = {ei}, Ef = {ef}, dE = {de}\n") - angle = Si111_energies_to_Bragg(energies) - energies = np.arange(ei, ef + de, de) + angle = energy_to_bragg_angle(si_111_lattice_spacing, energies) # Prepare motor info using trajectory scanning - spec = Fly(float(duration) @ (Line(motor, angle[0], angle[-1], len(angle)))) - positions = [(x / MRES).astype(int) for x in angle] + spec = Fly(duration @ (Line(motor, angle[0], angle[-1], len(angle)))) + + table = create_seqtable( + angle, time1=1, time2=1, outa1=True, outb1=True, outa2=False, outb2=True + ) + seq_table_info = SeqTableInfo(sequence_table=table, repeats=1, prescale_as_us=1) + yield from seq_table_scan(spec, seq_table_info, motor, panda, restore=restore) + + +def create_seqtable(positions: NDArray, **kwargs) -> SeqTable: + """ + Create SeqTable with rows setup to do position based triggering. + +