|
| 1 | +import asyncio |
| 2 | +from math import radians |
| 3 | + |
| 4 | +from bluesky.protocols import Movable |
| 5 | +from bluesky.utils import maybe_await |
| 6 | +from ophyd_async.core import ( |
| 7 | + SignalR, |
| 8 | + SignalRW, |
| 9 | + derived_signal_rw, |
| 10 | +) |
| 11 | + |
| 12 | +from dodal.common.maths import rotate_clockwise, rotate_counter_clockwise |
| 13 | + |
| 14 | + |
| 15 | +async def _get_angle_deg(angle_deg: SignalR[float] | float) -> float: |
| 16 | + if isinstance(angle_deg, SignalR): |
| 17 | + return await angle_deg.get_value() |
| 18 | + return angle_deg |
| 19 | + |
| 20 | + |
| 21 | +def create_rotational_ij_component_signals( |
| 22 | + i_read: SignalR[float], |
| 23 | + j_read: SignalR[float], |
| 24 | + i_write: Movable[float], |
| 25 | + j_write: Movable[float], |
| 26 | + angle_deg: float | SignalR[float], |
| 27 | + clockwise_frame: bool = True, |
| 28 | +) -> tuple[SignalRW[float], SignalRW[float]]: |
| 29 | + """Create virtual i/j axes representing a Cartesian coordinate frame |
| 30 | + that is rotated by a given angle relative to the underlying equipment axes. |
| 31 | +
|
| 32 | + The returned signals expose the position of the system in a *rotated frame |
| 33 | + of reference* (e.g. the sample or stage frame), while transparently mapping |
| 34 | + reads and writes onto the real i/j signals in the fixed equipment (lab) frame. |
| 35 | +
|
| 36 | + From the user's point of view, i and j behave like ordinary orthogonal |
| 37 | + Cartesian axes attached to the rotating object. Internally, all reads apply |
| 38 | + a rotation to the real motor positions, and all writes apply the inverse |
| 39 | + rotation so that the requested motion is achieved in the rotated frame. |
| 40 | +
|
| 41 | + Args: |
| 42 | + i_read (SignalR): SignalR representing the i motor readback. |
| 43 | + j_read (SignalR): representing the j motor readback. |
| 44 | + i_write (Movable): object for setting the i position. |
| 45 | + j_write (Movable): object for setting the j position. |
| 46 | + angle_deg (float | SignalR): Rotation angle in degrees. |
| 47 | + clockwise_frame (boolean): If True, the rotated frame is defined using a |
| 48 | + clockwise rotation; otherwise, a counter-clockwise rotation is used. |
| 49 | +
|
| 50 | + Returns: |
| 51 | + tuple[SignalRW[float], SignalRW[float]] Two virtual read/write signals |
| 52 | + corresponding to the rotated i and j components. |
| 53 | + """ |
| 54 | + rotate = rotate_clockwise if clockwise_frame else rotate_counter_clockwise |
| 55 | + inverse_rotate = rotate_counter_clockwise if clockwise_frame else rotate_clockwise |
| 56 | + |
| 57 | + async def _read_rotated() -> tuple[float, float, float]: |
| 58 | + i, j, ang = await asyncio.gather( |
| 59 | + i_read.get_value(), |
| 60 | + j_read.get_value(), |
| 61 | + _get_angle_deg(angle_deg), |
| 62 | + ) |
| 63 | + return (*rotate(radians(ang), i, j), ang) |
| 64 | + |
| 65 | + async def _write_rotated(i_rot: float, j_rot: float, ang: float) -> None: |
| 66 | + i_new, j_new = inverse_rotate(radians(ang), i_rot, j_rot) |
| 67 | + await asyncio.gather( |
| 68 | + maybe_await(i_write.set(i_new)), |
| 69 | + maybe_await(j_write.set(j_new)), |
| 70 | + ) |
| 71 | + |
| 72 | + def _read_i(i: float, j: float, ang: float) -> float: |
| 73 | + return rotate(radians(ang), i, j)[0] |
| 74 | + |
| 75 | + async def _set_i(value: float) -> None: |
| 76 | + i_rot, j_rot, ang = await _read_rotated() |
| 77 | + await _write_rotated(value, j_rot, ang) |
| 78 | + |
| 79 | + def _read_j(i: float, j: float, ang: float) -> float: |
| 80 | + return rotate(radians(ang), i, j)[1] |
| 81 | + |
| 82 | + async def _set_j(value: float) -> None: |
| 83 | + i_rot, j_rot, ang = await _read_rotated() |
| 84 | + await _write_rotated(i_rot, value, ang) |
| 85 | + |
| 86 | + return ( |
| 87 | + derived_signal_rw(_read_i, _set_i, i=i_read, j=j_read, ang=angle_deg), |
| 88 | + derived_signal_rw(_read_j, _set_j, i=i_read, j=j_read, ang=angle_deg), |
| 89 | + ) |
0 commit comments