Skip to content
Draft
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/dodal/plan_stubs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
from .wrapped import move, move_relative, set_absolute, set_relative, sleep, wait
from .wrapped import (
move,
move_relative,
rd,
set_absolute,
set_relative,
sleep,
stop,
wait,
)

__all__ = ["move", "move_relative", "set_absolute", "set_relative", "sleep", "wait"]
__all__ = [
"move",
"move_relative",
"rd",
"set_absolute",
"set_relative",
"sleep",
"stop",
"wait",
]
26 changes: 25 additions & 1 deletion src/dodal/plan_stubs/wrapped.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Annotated, TypeVar

import bluesky.plan_stubs as bps
from bluesky.protocols import Movable
from bluesky.protocols import Movable, Readable, Stoppable
from bluesky.utils import MsgGenerator

"""
Expand Down Expand Up @@ -146,3 +146,27 @@ def wait(
"""

return (yield from bps.wait(group, timeout=timeout))


def rd(readable: Readable) -> MsgGenerator:
"""Reads a single-value non-triggered object, wrapper for `bp.rd`.

Args:
readable (Readable): The device to be read

Returns:
Iterator[MsgGenerator]: Bluesky messages
"""
return (yield from bps.rd(readable))


def stop(stoppable: Stoppable) -> MsgGenerator:
"""Stop a device, wrapper for `bp.stop`.

Args:
stoppable (Stoppable): Device to be stopped

Returns:
Iterator[MsgGenerator]: Bluesky messages
"""
return (yield from bps.stop(stoppable))
25 changes: 23 additions & 2 deletions src/dodal/plans/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
from .scanspec import spec_scan
from .wrapped import count
from .wrapped import (
count,
grid_num_rscan,
grid_num_scan,
list_grid_rscan,
list_grid_scan,
list_rscan,
list_scan,
num_rscan,
num_scan,
)

__all__ = ["count", "spec_scan"]
__all__ = [
"count",
"grid_num_rscan",
"grid_num_scan",
"list_grid_rscan",
"list_grid_scan",
"list_rscan",
"list_scan",
"num_rscan",
"num_scan",
"spec_scan",
]
280 changes: 278 additions & 2 deletions src/dodal/plans/wrapped.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import itertools
from collections.abc import Sequence
from typing import Annotated, Any

import bluesky.plans as bp
from bluesky.protocols import Readable
from bluesky.protocols import Movable, Readable
from ophyd_async.core import AsyncReadable
from pydantic import Field, NonNegativeFloat, validate_call

from dodal.common import MsgGenerator
from dodal.devices.motors import Motor
from dodal.plan_stubs.data_session import attach_data_session_metadata_decorator

"""This module wraps plan(s) from bluesky.plans until required handling for them is
Expand All @@ -27,7 +30,7 @@
@validate_call(config={"arbitrary_types_allowed": True})
def count(
detectors: Annotated[
set[Readable],
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
Expand Down Expand Up @@ -55,3 +58,276 @@ def count(
metadata = metadata or {}
metadata["shape"] = (num,)
yield from bp.count(tuple(detectors), num, delay=delay, md=metadata)


def _make_args(
movers: Sequence[Movable | Motor],
params: list[Any] | Sequence[Any],
num_params: int,
):
movers_len = len(movers)
params_len = len(params)
if params_len % movers_len != 0 or params_len % num_params != 0:
raise ValueError(f"params must contain {num_params} values for each movable")

args = []
it = iter(params)
param_chunks = iter(lambda: tuple(itertools.islice(it, num_params)), ())
for movable, param_chunk in zip(movers, param_chunks, strict=False):
args.append(movable)
args.extend(param_chunk)
return args


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def num_scan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[float],
Field(
description="Start and stop points for each movable, 'start1, stop1, ...,"
"startN, stopN' for every movable in `movers`."
),
],
num: Annotated[int, Field(description="Number of points")],
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over one multi-motor trajectory.
Wraps bluesky.plans.scan(det, *args, num, md=metadata)"""
metadata = metadata or {}
metadata["shape"] = (num,)
args = _make_args(movers=movers, params=params, num_params=2)
yield from bp.scan(tuple(detectors), *args, num=num, md=metadata)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def num_rscan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[float],
Field(
description="Start and stop points for each movable, 'start1, stop1, ...,"
"startN, stopN' for every movable in `movers`."
),
],
num: Annotated[int, Field(description="Number of points")],
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over one multi-motor trajectory, relative to current position.
Wraps bluesky.plans.rel_scan(det, *args, num, md=metadata)"""
metadata = metadata or {}
metadata["shape"] = (num,)
args = _make_args(movers=movers, params=params, num_params=2)
yield from bp.rel_scan(tuple(detectors), *args, num=num, md=metadata)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def grid_num_scan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
Sequence[float | int],
Field(
description="Start and stop points for each movable, 'start1, stop1, ...,"
"startN, stopN' for every movable in `movers`."
),
],
snake_axes: list | bool | None = None,
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over a mesh; each motor is on an independent trajectory.
Wraps bluesky.plans.grid_scan(det, *args, snake_axes, md=metadata)"""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=3)
yield from bp.grid_scan(tuple(detectors), *args, snake_axes=snake_axes, md=metadata)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def grid_num_rscan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
Sequence[float | int],
Field(
description="Start and stop points for each movable, 'start1, stop1, ...,"
"startN, stopN' for every movable in `movers`."
),
],
snake_axes: list | bool | None = None,
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over a mesh relative to current position.
Wraps bluesky.plans.rel_grid_scan(det, *args, snake_axes, md=metadata)"""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=3)
yield from bp.rel_grid_scan(
tuple(detectors), *args, snake_axes=snake_axes, md=metadata
)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def list_scan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[list[Any]],
Field(
description="List of points for each movable, '[point1, point2, ..., ], "
"[point1, point2, ...], ...' for every movable in `movers`."
),
],
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over one or more variables in steps simultaneously.
Wraps bluesky.plans.list_scan(det, *args, md=metadata)."""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=1)
yield from bp.list_scan(tuple(detectors), *args, md=metadata)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def list_rscan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[list[Any]],
Field(
description="List of points for each movable, '[point1, point2, ..., ], "
"[point1, point2, ...], ...' for every movable in `movers`."
),
],
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over one or more variables simultaneously relative to current position.
Wraps bluesky.plans.rel_list_scan(det, *args, md=metadata)."""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=1)
yield from bp.rel_list_scan(tuple(detectors), *args, md=metadata)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def list_grid_scan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[list[Any]],
Field(
description="List of points for each movable, '[point1, point2, ..., ], "
"[point1, point2, ...], ...' for every movable in `movers`."
),
],
snake_axes: bool = False, # Currently specifying axes to snake is not supported
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over one or more variables for each given point on independent trajectories.
Wraps bluesky.plans.list_grid_scan(det, *args, md=metadata)."""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=1)
yield from bp.list_grid_scan(
tuple(detectors), *args, snake_axes=snake_axes, md=metadata
)


@attach_data_session_metadata_decorator()
@validate_call(config={"arbitrary_types_allowed": True})
def list_grid_rscan(
detectors: Annotated[
Sequence[Readable | AsyncReadable],
Field(
description="Set of readable devices, will take a reading at each point",
min_length=1,
),
],
movers: Annotated[
Sequence[Movable | Motor],
Field(description="One or more movable to move during the scan."),
],
params: Annotated[
list[list[Any]],
Field(
description="List of points for each movable, '[point1, point2, ..., ], "
"[point1, point2, ...], ...' for every movable in `movers`."
),
],
snake_axes: bool = False, # Currently specifying axes to snake is not supported
metadata: dict[str, Any] | None = None,
) -> MsgGenerator:
"""Scan over some variables for each given point relative to current position.
Wraps bluesky.plans.rel_list_grid_scan(det, *args, md=metadata)."""
metadata = metadata or {}
args = _make_args(movers=movers, params=params, num_params=1)
yield from bp.rel_list_grid_scan(
tuple(detectors), *args, snake_axes=snake_axes, md=metadata
)
2 changes: 1 addition & 1 deletion system_tests/test_adsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def documents_from_num(
) -> dict[str, list[DocumentType]]:
docs: dict[str, list[DocumentType]] = {}
run_engine(
count({det}, num=request.param),
count([det], num=request.param),
lambda name, doc: docs.setdefault(name, []).append(doc),
)
return docs
Expand Down
Loading