diff --git a/.gitignore b/.gitignore index 64cb2f3d4..a696c4a7b 100644 --- a/.gitignore +++ b/.gitignore @@ -36,6 +36,9 @@ nosetests.xml coverage.xml cover/* +# hypothesis +.hypothesis/ + # PyBuilder target/ diff --git a/CHANGELOG.md b/CHANGELOG.md index adc621e93..3a0778107 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ Write the date in place of the "Unreleased" in the case a new version is release ## Unreleased +### Changed + +- Array client fully supports slicing when communicating with the server + and only fetches the data needed to satisfy the slice. + +## v0.2.7 (2026-02-27) + ### Fixed - A potential race condition when subscribing to an already started stream diff --git a/pixi.toml b/pixi.toml index ebde23eda..793599df9 100644 --- a/pixi.toml +++ b/pixi.toml @@ -51,6 +51,7 @@ pyarrow-all = ">=14.0.1" # includes fix to CVE 2023-47248 aiohttp = "*" coverage = "*" flake8 = "*" +hypothesis = "*" ipython = "*" ldap3 = "*" matplotlib = "*" diff --git a/pyproject.toml b/pyproject.toml index 5806c41a7..19bbf5377 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -263,6 +263,7 @@ test = [ "aiohttp", "coverage", "flake8", + "hypothesis", "importlib_resources;python_version < \"3.9\"", "ipython", "ldap3", diff --git a/tests/test_array.py b/tests/test_array.py index a1dd0783d..5a7c36aff 100644 --- a/tests/test_array.py +++ b/tests/test_array.py @@ -7,11 +7,13 @@ import httpx import numpy import pytest -from starlette.status import HTTP_400_BAD_REQUEST, HTTP_406_NOT_ACCEPTABLE +from starlette.status import HTTP_406_NOT_ACCEPTABLE, HTTP_422_UNPROCESSABLE_CONTENT from tiled.adapters.array import ArrayAdapter from tiled.adapters.mapping import MapAdapter -from tiled.client import Context, from_context +from tiled.client import Context, from_context, record_history +from tiled.client.array import ArrayClient +from tiled.ndslice import NDSlice from tiled.serialization.array import as_buffer from tiled.server.app import build_app @@ -43,6 +45,10 @@ cube_cases = { "tiny_cube": numpy.random.random((10, 10, 10)), "tiny_hypercube": numpy.random.random((10, 10, 10, 10, 10)), + "chunked": dask.array.from_array( + numpy.arange(1_200_000, dtype="uint64").reshape((10, 300, 400)), + chunks=(1, 300, 200), + ), } cube_tree = MapAdapter({k: ArrayAdapter.from_array(v) for k, v in cube_cases.items()}) inf_tree = MapAdapter( @@ -146,12 +152,16 @@ def strict_parse_constant(c): def test_block_validation(context): - "Verify that block must be fully specified." + "Verify that block is correctly specified." client = from_context(context, "dask")["cube"]["tiny_cube"] block_url = httpx.URL(client.item["links"]["block"]) # Malformed because it has only 2 dimensions, not 3. malformed_block_url = block_url.copy_with(params={"block": "0,0"}) - with fail_with_status_code(HTTP_400_BAD_REQUEST): + with fail_with_status_code(HTTP_422_UNPROCESSABLE_CONTENT): + client.context.http_client.get(malformed_block_url).raise_for_status() + # Malformed because it has 4 dimensions, not 3. + malformed_block_url = block_url.copy_with(params={"block": "0,0,0,0"}) + with fail_with_status_code(HTTP_422_UNPROCESSABLE_CONTENT): client.context.http_client.get(malformed_block_url).raise_for_status() @@ -166,7 +176,59 @@ def test_dask(context): def test_array_format_shape_from_cube(context): client = from_context(context)["cube"] with fail_with_status_code(HTTP_406_NOT_ACCEPTABLE): - hyper_cube = client["tiny_hypercube"].export("test.png") # noqa: F841 + client["tiny_hypercube"].export("test.png") + + +@pytest.mark.parametrize( + "bytesize_limit, num_gets_expected", + [ + (None, 1), # Default, Entire array fits in one response + (300 * 400 * 8, 10), # Each frame fits in one response + (300 * 400 * 8 - 1, 20), # Just under the limit, each frame is split in half + (300 * 400 * 16, 5), # Two frames fit in one response + (300 * 100 * 8, 40), # Each chunk is split in half + ], +) +def test_request_chunking(context, bytesize_limit, num_gets_expected, monkeypatch): + # Try reading a (10, 300, 400) array with (1, 300, 200) chunks and count requests + bytesize_limit = bytesize_limit or ArrayClient.RESPONSE_BYTESIZE_LIMIT + monkeypatch.setattr(ArrayClient, "RESPONSE_BYTESIZE_LIMIT", bytesize_limit) + client = from_context(context)["cube/chunked"] + with record_history() as h: + arr = client.read() + num_gets = sum(1 for entry in h.requests if entry.method == "GET") + assert num_gets == num_gets_expected + assert all("/array/full" in req.url.path for req in h.requests) + numpy.testing.assert_equal(arr, cube_cases["chunked"]) + + +def test_request_slicing(context): + # One slice that requires data from all chunks + client = from_context(context)["cube/chunked"] + expected = cube_cases["chunked"][:, 42, 100:300] + with record_history() as h: + actual = client[:, 42, 100:300] + assert len(h.requests) == 1 + numpy.testing.assert_equal(actual, expected) + + +def test_request_empty_slice(context): + # When reading an entire array, `slice=` should not be requested + client = from_context(context)["cube/chunked"] + with record_history() as h: + client.read() + client.read(slice=None) + client.read(slice=()) + client.read(slice=Ellipsis) + client.read(slice=NDSlice()) + client[...] + client[()] + client[:, :, :] + client[:] + client[:, ..., :] + assert len(h.requests) == 10 + assert all("expected_shape" in req.url.params for req in h.requests) + assert all("slice" not in req.url.params for req in h.requests) def test_array_interface(context): @@ -198,7 +260,8 @@ def test_unparsable_nested_array_stringified(kind, context): client = from_context(context)["nested_arrays"][kind] assert " start: + step = data.draw(st.integers(1, 10), label="step") + elif stop < start: + step = data.draw(st.integers(-10, -1), label="step") + + # Preferred splits strictly inside slice and unique + preferred_splits = data.draw( + st.lists( + st.integers(min(start, stop), max(start, stop) - 1), + unique=True, + max_size=10, + ), + label="preferred_splits", + ) + + result = split_1d(start, stop, step, max_len, preferred_splits) + + # 1. Check that first and last boundaries match + assert result[0][0] == start + assert result[-1][1] == stop + + # 2. Check contiguous intervals + for (_, a_stop), (b_start, _) in zip(result, result[1:]): + assert a_stop == b_start + + # 3. Check grid alignment + for a, b in result: + assert (a - start) % step == 0 + if b != stop: + assert (b - start) % step == 0 + + # 4. Check max length constraint + for a, b in result: + assert len(range(a, b, step)) <= max_len + + # 5. Check step direction consistency + for a, b in result: + if step > 0: + assert b >= a + else: + assert b <= a + + # 6. Check no degenerate intervals + for a, b in result: + assert a != b + assert len(range(a, b, step)) > 0 + + +@st.composite +def nd_slice_strategy(draw): + ndim = draw(st.integers(1, 4)) + + starts = draw(st.lists(st.integers(0, 20), min_size=ndim, max_size=ndim)) + lengths = draw(st.lists(st.integers(0, 20), min_size=ndim, max_size=ndim)) + steps = draw(st.lists(st.integers(1, 5), min_size=ndim, max_size=ndim)) + reversed = draw(st.lists(st.booleans(), min_size=ndim, max_size=ndim)) + + slices, shape = [], [] + + for start, length, step, rev in zip(starts, lengths, steps, reversed): + if length == 0: + # Singleton dimension, use an integer index + slices.append(start) + shape.append(start + 1) + else: + stop = start + length * step + if rev: + slices.append(slice(stop, start, -step)) + else: + slices.append(slice(start, stop, step)) + shape.append(stop) + + shape = tuple(shape) + + return NDSlice(*slices).expand_for_shape(shape), shape + + +@st.composite +def pref_split_strategy(draw, slices): + pref = [] + + for sl in slices: + if isinstance(sl, int): + pref.append([]) # No splits for singleton dimensions + continue + + start, stop, step = sl.start, sl.stop, sl.step or 1 + grid = list(range(start + step, stop, step)) + + if grid: + splits = draw( + st.lists( + st.sampled_from(grid), + unique=True, + max_size=min(len(grid), 5), + ) + ) + else: + splits = [] + + pref.append(sorted(splits)) + + return pref + + +@given(data=st.data(), max_size=st.sampled_from([1, 2, 3, 5, 50, 100, 200])) +@settings(deadline=None) +def test_split_slice(data, max_size): + arr_slice, shape = data.draw(nd_slice_strategy(), label="nd_slice") + + if math.prod(arr_slice.shape_after_slice(shape)) == 0: + # Skip degenerate case where slice results in empty array + return + + pref_splits = data.draw(pref_split_strategy(arr_slice), label="pref_splits") + arr = np.arange(math.prod(shape)).reshape(shape) + slices = split_slice(arr_slice, max_size, pref_splits) + + # 1. Check reconstruction of the original slice from the pieces + darr = da.Array( + name="test", + dask={ + ("test",) + indx: (lambda x: arr[x], slc) for indx, slc in slices.items() + }, + dtype=arr.dtype, + chunks=slices_to_dask_chunks(slices, shape), + shape=arr_slice.shape_after_slice(shape), + ) + np.testing.assert_array_equal(darr.compute(), arr[arr_slice]) + + # 2. Check that each slice respects the max_size constraint + for slc in slices.values(): + slc_shape = slc.shape_after_slice(shape) + assert math.prod(slc_shape) <= max_size + + +@pytest.mark.parametrize( + "inputs, expected", + [ + ([NDSlice(1, 2)], NDSlice(1, 2)), # single slice + ([NDSlice(1, 2), NDSlice(1, 2)], NDSlice(1, 2)), # identical integers + ( + [NDSlice(0), NDSlice(1), NDSlice(2)], + NDSlice(slice(0, 3)), + ), # consecutive integers + ([NDSlice(2), NDSlice(0), NDSlice(1)], NDSlice(slice(0, 3))), # unordered + ([NDSlice(0), NDSlice(2)], None), # non-consecutive integers + ( + [NDSlice(slice(0, 5)), NDSlice(slice(0, 5))], + NDSlice(slice(0, 5)), + ), # identical slices + ( + [NDSlice(slice(0, 3)), NDSlice(slice(3, 6))], + NDSlice(slice(0, 6)), + ), # adjacent slices + ( + [NDSlice(slice(3, 6)), NDSlice(slice(0, 3))], + NDSlice(slice(0, 6)), + ), # unordered adjacent slices + ([NDSlice(slice(0, 3)), NDSlice(slice(4, 6))], None), # non-adjacent slices + ( + [NDSlice(slice(0, 4, 1)), NDSlice(slice(4, 8, 2))], + None, + ), # slice step mismatch + ( + [NDSlice(2), NDSlice(slice(3, 4))], + NDSlice(slice(2, 4)), + ), # mixed integer and slice + ( + [NDSlice(slice(5, 5)), NDSlice(slice(5, 5))], + NDSlice(slice(5, 5)), + ), # empty slice dimension + ([NDSlice(Ellipsis, 1), NDSlice(Ellipsis, 1)], NDSlice(Ellipsis, 1)), + ([NDSlice(Ellipsis, 0), NDSlice(Ellipsis, 1)], NDSlice(Ellipsis, slice(0, 2))), + ( + [NDSlice(Ellipsis, slice(None, 3)), NDSlice(Ellipsis, slice(3, 6))], + NDSlice(Ellipsis, slice(0, 6)), + ), + ([NDSlice(0, slice(0, 3)), NDSlice(0, slice(3, 6))], NDSlice(0, slice(0, 6))), + ([NDSlice(slice(0, 3), 0), NDSlice(slice(3, 6), 0)], NDSlice(slice(0, 6), 0)), + ([NDSlice(0, 0), NDSlice(1, 1)], None), # too many differing dimensions + ([NDSlice(0), NDSlice(0, 1)], None), + ([NDSlice(slice(0, 2)), NDSlice(5)], None), + ([NDBlock(0), NDBlock(1)], NDBlock(slice(0, 2))), + ([NDSlice(0), NDBlock(1)], NDSlice(slice(0, 2))), + ([NDSlice(slice(None, 5)), NDSlice(slice(0, 5))], NDSlice(slice(None, 5))), + ([NDSlice(slice(None, 3)), NDSlice(slice(3, 6))], NDSlice(slice(0, 6))), + ([NDSlice(slice(3, None)), NDSlice(slice(3, None))], NDSlice(slice(3, None))), + ( + [NDSlice(slice(None, None)), NDSlice(slice(None, None))], + NDSlice(slice(None, None)), + ), + ( + [NDSlice(slice(None, None)), NDSlice(0)], + None, + ), # open slice cannot merge with integer + ([NDSlice(-2), NDSlice(-1)], NDSlice(slice(-2, 0))), # negative integrs + ([NDSlice(slice(-5, -3)), NDSlice(slice(-3, -1))], NDSlice(slice(-5, -1))), + ([NDSlice(slice(-5, -3)), NDSlice(slice(-2, 0))], None), + ( + [NDSlice(slice(5, 0, -1)), NDSlice(slice(5, 0, -1))], + NDSlice(slice(5, 0, -1)), + ), # identical reverse slices allowed + ( + [NDSlice(slice(5, 0, -1)), NDSlice(slice(5, 0, -2))], + None, + ), # reverse slices with different step rejected + ( + [NDSlice(slice(5, 3, -1)), NDSlice(slice(3, 1, -1))], + NDSlice(slice(5, 1, -1)), + ), # adjacent reverse slices + ( + [NDSlice(slice(None, None)), NDSlice(slice(0, 0))], + None, + ), # open slice incompatible with explicit empty slice + ( + [NDSlice(slice(None, 3), 0), NDSlice(slice(3, 6), 0)], + NDSlice(slice(0, 6), 0), + ), + ], +) +def test_merge_slices(inputs, expected): + if expected is None: + with pytest.raises(ValueError): + merge_slices(*inputs) + else: + result = merge_slices(*inputs) + + assert type(result) is type(expected) + assert tuple(result) == tuple(expected) + + # Test that the result is the same regardless of the order of the inputs + if len(inputs) > 2: + for permuted_inputs in itertools.permutations(inputs): + assert merge_slices(*permuted_inputs) == expected + + +@pytest.mark.parametrize( + "slice_dict,shape,expected", + [ + ({(0, 0): NDSlice.from_numpy_str(":10, :5")}, (10, 5), ((10,), (5,))), + ( + { + (0,): NDSlice.from_numpy_str(":10"), + (1,): NDSlice.from_numpy_str("10:30"), + }, + (30,), + ((10, 20),), + ), + ( + { + (0, 0): NDSlice.from_numpy_str(":10, :5"), + (0, 1): NDSlice.from_numpy_str(":10, 5:12"), + (1, 0): NDSlice.from_numpy_str("10:18, :5"), + (1, 1): NDSlice.from_numpy_str("10:18, 5:12"), + }, + (18, 12), + ((10, 8), (5, 7)), + ), + ( + { + (0, 0, 0): NDSlice.from_numpy_str(":2, :3, :4"), + (0, 0, 1): NDSlice.from_numpy_str(":2, :3, 4:9"), + (1, 0, 0): NDSlice.from_numpy_str("2:8, :3, :4"), + (1, 0, 1): NDSlice.from_numpy_str("2:8, :3, 4:9"), + }, + (8, 3, 9), + ((2, 6), (3,), (4, 5)), + ), + ], +) +def test_dask_slices(slice_dict, shape, expected): + assert slices_to_dask_chunks(slice_dict, shape) == expected diff --git a/tiled/adapters/array.py b/tiled/adapters/array.py index fff66d052..ef4a818c3 100644 --- a/tiled/adapters/array.py +++ b/tiled/adapters/array.py @@ -8,10 +8,10 @@ from tiled.adapters.core import Adapter -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..structures.array import ArrayStructure from ..structures.core import Spec, StructureFamily -from ..type_aliases import JSON +from ..type_aliases import JSON, Chunks from .utils import force_reshape @@ -49,7 +49,7 @@ def from_array( array: NDArray[Any], *, shape: Optional[Tuple[int, ...]] = None, - chunks: Optional[Tuple[Tuple[int, ...], ...]] = None, + chunks: Optional[Chunks] = None, dims: Optional[Tuple[str, ...]] = None, metadata: Optional[JSON] = None, specs: Optional[List[Spec]] = None, @@ -106,12 +106,12 @@ def read( def read_block( self, - block: Tuple[int, ...], + block: NDBlock, slice: NDSlice = NDSlice(...), ) -> NDArray[Any]: # Slice the whole array to get this block. array = force_reshape(self._array, self._structure.shape) - slice_, _ = slice_and_shape_from_block_and_chunks(block, self._structure.chunks) + slice_ = block.slice_from_chunks(self._structure.chunks) # _array[...] requires an actual tuple, not just a subclass of tuple array = array[tuple(slice_)] # Slice within the block. @@ -119,19 +119,3 @@ def read_block( if isinstance(self._array, dask.array.Array): return array.compute() return array - - -def slice_and_shape_from_block_and_chunks( - block: Tuple[int, ...], chunks: Tuple[Tuple[int, ...], ...] -) -> tuple[NDSlice, NDSlice]: - """ - Given dask-like chunks and block id, return slice and shape of the block. - """ - slice_ = [] - shape = [] - for b, c in zip(block, chunks): - start = sum(c[:b]) - dim = c[b] - slice_.append(slice(start, start + dim)) - shape.append(dim) - return NDSlice(*slice_), NDSlice(*shape) diff --git a/tiled/adapters/sequence.py b/tiled/adapters/sequence.py index a3ca6ff1a..122363693 100644 --- a/tiled/adapters/sequence.py +++ b/tiled/adapters/sequence.py @@ -1,6 +1,6 @@ import builtins from abc import abstractmethod -from typing import Any, Iterable, List, Optional, Tuple, Union +from typing import Any, Iterable, List, Optional, Union import numpy as np from ndindex import ndindex @@ -8,9 +8,8 @@ from tiled.adapters.core import Adapter -from ..adapters.array import slice_and_shape_from_block_and_chunks from ..catalog.orm import Node -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..structures.array import ArrayStructure, BuiltinDtype from ..structures.core import Spec, StructureFamily from ..structures.data_source import DataSource @@ -136,14 +135,16 @@ def read( left_axis, *the_rest = slice # Could be int or slice (i, ...) or (slice(...), ...); the_rest is converted to a list if isinstance(left_axis, int): - # e.g. read(slice=(0, ....)) + # e.g. read(slice=(0, ....)), dimensionality is reduced by 1 arr = np.squeeze(self._load_from_files(left_axis), 0) elif left_axis is Ellipsis: - # Return all images + # Return all images; include any leading dimensions arr = self._load_from_files() - the_rest.insert(0, Ellipsis) # Include any leading dimensions + the_rest.insert(0, Ellipsis) elif isinstance(left_axis, builtins.slice): + # Include the first dimension when further subslicing arr = self.read(slice=left_axis) + the_rest.insert(0, builtins.slice(None)) sliced_shape = ndindex(left_axis).newshape(self.structure().shape) arr = force_reshape(arr, sliced_shape) @@ -153,22 +154,9 @@ def read( sliced_shape = ndindex(slice).newshape(self.structure().shape) return force_reshape(arr, sliced_shape) - def read_block( - self, block: Tuple[int, ...], slice: NDSlice = NDSlice(...) - ) -> NDArray[Any]: - """ - - Parameters - ---------- - block : - slice : - - Returns - ------- - - """ + def read_block(self, block: NDBlock, slice: NDSlice = NDSlice(...)) -> NDArray[Any]: if any(block[1:]): raise IndexError(block) - slice_, _ = slice_and_shape_from_block_and_chunks(block, self._structure.chunks) - arr = self.read(slice_[0]) + block_slice = block.slice_from_chunks(self._structure.chunks) + arr = self.read(block_slice[0]) return arr[slice] if slice else arr diff --git a/tiled/adapters/sparse.py b/tiled/adapters/sparse.py index b3726398a..71501b899 100644 --- a/tiled/adapters/sparse.py +++ b/tiled/adapters/sparse.py @@ -8,12 +8,11 @@ from tiled.adapters.core import Adapter -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..structures.array import BuiltinDtype from ..structures.core import Spec, StructureFamily from ..structures.sparse import COOStructure, SparseStructure -from ..type_aliases import JSON -from .array import slice_and_shape_from_block_and_chunks +from ..type_aliases import JSON, Chunks class SparseAdapter(Adapter[SparseStructure]): @@ -101,7 +100,7 @@ def from_global_ref( cls, blocks: Dict[Tuple[int, ...], Tuple[NDArray[Any], Any]], shape: Tuple[int, ...], - chunks: Tuple[Tuple[int, ...], ...], + chunks: Chunks, *, dims: Optional[Tuple[str, ...]] = None, metadata: Optional[JSON] = None, @@ -109,6 +108,7 @@ def from_global_ref( ) -> "COOAdapter": """ Construct from blocks with coords given in global reference frame. + Parameters ---------- blocks : @@ -155,6 +155,7 @@ def __init__( ) -> None: """ Construct from blocks with coords given in block-local reference frame. + Parameters ---------- blocks : @@ -165,36 +166,13 @@ def __init__( self.blocks = blocks super().__init__(structure, metadata=metadata, specs=specs) - def read_block( - self, block: Tuple[int, ...], slice: NDSlice = NDSlice(...) - ) -> sparse.COO: - """ - - Parameters - ---------- - block : - slice : - - Returns - ------- - - """ + def read_block(self, block: NDBlock, slice: NDSlice = NDSlice(...)) -> sparse.COO: coords, data = self.blocks[block] - _, shape = slice_and_shape_from_block_and_chunks(block, self._structure.chunks) + shape = block.shape_from_chunks(self._structure.chunks) arr = sparse.COO(data=data[:], coords=coords[:], shape=shape) return arr[slice] if slice else arr def read(self, slice: NDSlice = NDSlice(...)) -> sparse.COO: - """ - - Parameters - ---------- - slice : - - Returns - ------- - - """ all_coords = [] all_data = [] for block, (coords, data) in self.blocks.items(): diff --git a/tiled/adapters/sparse_blocks_parquet.py b/tiled/adapters/sparse_blocks_parquet.py index dfec0a81d..63e08eda7 100644 --- a/tiled/adapters/sparse_blocks_parquet.py +++ b/tiled/adapters/sparse_blocks_parquet.py @@ -4,7 +4,6 @@ from typing import Any, List, Optional, Tuple, Union from urllib.parse import quote_plus -import dask.base import dask.dataframe import numpy import pandas @@ -13,9 +12,8 @@ from tiled.adapters.core import Adapter -from ..adapters.array import slice_and_shape_from_block_and_chunks from ..catalog.orm import Node -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..storage import FileStorage, Storage from ..structures.core import Spec from ..structures.data_source import Asset, DataSource @@ -26,16 +24,6 @@ def load_block(uri: str) -> Tuple[List[int], Tuple[NDArray[Any], Any]]: - """ - - Parameters - ---------- - uri : - - Returns - ------- - - """ # TODO This can be done without pandas. # Better to use a plain I/O library. df = pandas.read_parquet(path_from_uri(uri)) @@ -80,17 +68,6 @@ def init_storage( data_source: DataSource[COOStructure], path_parts: List[str], ) -> DataSource[COOStructure]: - """ - - Parameters - ---------- - data_uri : - structure : - - Returns - ------- - - """ data_source = copy.deepcopy(data_source) # Do not mutate caller input. data_uri = storage.uri + "".join( f"/{quote_plus(segment)}" for segment in path_parts @@ -126,32 +103,12 @@ def write_block( data.to_parquet(path_from_uri(uri)) def write(self, data: Union[dask.dataframe.DataFrame, pandas.DataFrame]) -> None: - """ - - Parameters - ---------- - data : - - Returns - ------- - - """ if len(self.blocks) > 1: raise NotImplementedError uri = self.blocks[(0,) * len(self.structure().shape)] data.to_parquet(path_from_uri(uri)) def read(self, slice: NDSlice = NDSlice(...)) -> sparse.COO: - """ - - Parameters - ---------- - slice : - - Returns - ------- - - """ all_coords = [] all_data = [] for block, uri in self.blocks.items(): @@ -170,21 +127,8 @@ def read(self, slice: NDSlice = NDSlice(...)) -> sparse.COO: ) return arr[slice] if slice else arr - def read_block( - self, block: Tuple[int, ...], slice: NDSlice = NDSlice(...) - ) -> sparse.COO: - """ - - Parameters - ---------- - block : - slice : - - Returns - ------- - - """ + def read_block(self, block: NDBlock, slice: NDSlice = NDSlice(...)) -> sparse.COO: coords, data = load_block(self.blocks[block]) - _, shape = slice_and_shape_from_block_and_chunks(block, self._structure.chunks) + shape = block.shape_from_chunks(self._structure.chunks) arr = sparse.COO(data=data[:], coords=coords[:], shape=shape) return arr[slice] if slice else arr diff --git a/tiled/adapters/zarr.py b/tiled/adapters/zarr.py index cfd566aa2..3fc3b6251 100644 --- a/tiled/adapters/zarr.py +++ b/tiled/adapters/zarr.py @@ -17,7 +17,7 @@ from ..adapters.utils import IndexersMixin from ..catalog.orm import Node from ..iterviews import ItemsView, KeysView, ValuesView -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..storage import ( SUPPORTED_OBJECT_URI_SCHEMES, FileStorage, @@ -28,9 +28,9 @@ from ..structures.array import ArrayStructure from ..structures.core import Spec, StructureFamily from ..structures.data_source import Asset, DataSource -from ..type_aliases import JSON +from ..type_aliases import JSON, Chunks from ..utils import Conflicts, node_repr, path_from_uri -from .array import ArrayAdapter, slice_and_shape_from_block_and_chunks +from .array import ArrayAdapter ZARR_LIB_V2 = Version(version("zarr")) < Version("3") if ZARR_LIB_V2: @@ -111,29 +111,12 @@ def read( self, slice: NDSlice = NDSlice(...), ) -> NDArray[Any]: - """ - - Parameters - ---------- - slice : - - Returns - ------- - - """ arr = cast(NDArray, self._array[self._stencil()]) return arr[slice] - def read_block( - self, - block: Tuple[int, ...], - slice: NDSlice = NDSlice(...), - ) -> NDArray[Any]: - block_slice, _ = slice_and_shape_from_block_and_chunks( - block, self.structure().chunks - ) - # Slice the block out of the whole array, - # and optionally a sub-slice therein. + def read_block(self, block: NDBlock, slice: NDSlice = NDSlice(...)) -> NDArray[Any]: + "Slice the block out of the whole array and optionally a sub-slice therein." + block_slice = block.slice_from_chunks(self.structure().chunks) return self._array[self._stencil()][block_slice][slice or ...] def write( @@ -145,14 +128,8 @@ def write( raise NotImplementedError self._array[self._stencil()] = data - def write_block( - self, - data: NDArray[Any], - block: Tuple[int, ...], - ) -> None: - block_slice, shape = slice_and_shape_from_block_and_chunks( - block, self.structure().chunks - ) + def write_block(self, data: NDArray[Any], block: NDBlock) -> None: + block_slice = block.slice_from_chunks(self.structure().chunks) self._array[block_slice] = data def patch( @@ -160,7 +137,7 @@ def patch( data: NDArray[Any], offset: Tuple[int, ...], extend: bool = False, - ) -> Tuple[Tuple[int, ...], Tuple[Tuple[int, ...], ...]]: + ) -> Tuple[Tuple[int, ...], Chunks]: """ Write data into a slice of the array, maybe extending it. diff --git a/tiled/catalog/adapter.py b/tiled/catalog/adapter.py index 6d192d460..5b88bbedb 100644 --- a/tiled/catalog/adapter.py +++ b/tiled/catalog/adapter.py @@ -1324,11 +1324,7 @@ async def write(self, media_type, deserializer, entry, body, persist=True): async def write_block( self, block, media_type, deserializer, entry, body, persist=True ): - from tiled.adapters.array import slice_and_shape_from_block_and_chunks - - _, shape = slice_and_shape_from_block_and_chunks( - block, entry.structure().chunks - ) + shape = block.shape_from_chunks(entry.structure().chunks) if self.context.streaming_cache: await self._stream(media_type, entry, body, shape, block=block) if not persist: diff --git a/tiled/client/array.py b/tiled/client/array.py index 0a3cb8b70..0d2bb23cb 100644 --- a/tiled/client/array.py +++ b/tiled/client/array.py @@ -1,5 +1,6 @@ import concurrent.futures import itertools +import math from typing import TYPE_CHECKING, Optional, Union from urllib.parse import parse_qs, urlparse @@ -9,6 +10,7 @@ import numpy from numpy.typing import NDArray +from ..ndslice import NDBlock, NDSlice, split_slice from ..structures.core import STRUCTURE_TYPES from .base import BaseClient from .utils import ( @@ -17,6 +19,7 @@ handle_error, params_from_slice, retry_context, + slices_to_dask_chunks, ) if TYPE_CHECKING: @@ -26,6 +29,12 @@ class _DaskArrayClient(BaseClient): "Client-side wrapper around an array-like that returns dask arrays" + # The limit on the expected size of the response body (before compression). + # This will be used to determine how to combine multiple requests when fetching + # data in blocks. If set to None, the client will not attempt to combine + # requests and will fetch each chunk separately as determiied by the structure. + RESPONSE_BYTESIZE_LIMIT = 100 * 1024 * 1024 # 100 MiB + def __init__(self, *args, item, **kwargs): super().__init__(*args, item=item, **kwargs) @@ -60,14 +69,13 @@ def ndim(self): return len(self.structure().shape) def __repr__(self): - structure = self.structure() attrs = { - "shape": structure.shape, - "chunks": chunks_repr(structure.chunks), - "dtype": structure.data_type.to_numpy_dtype(), + "shape": self.shape, + "chunks": chunks_repr(self.chunks), + "dtype": self.dtype, } - if structure.dims: - attrs["dims"] = structure.dims + if dims := self.structure().dims: + attrs["dims"] = dims return ( f"<{type(self).__name__}" + "".join(f" {k}={v}" for k, v in attrs.items()) @@ -77,104 +85,187 @@ def __repr__(self): def __array__(self, *args, **kwargs): return self.read().__array__(*args, **kwargs) - def _get_block(self, block, dtype, shape, slice=None): - """ - Fetch the actual data for one block in a chunked (dask) array. + def _get_block(self, block: NDBlock, block_slice: Optional[NDSlice] = None): + """Fetch the data for one chunk (block) in a chunked array. + + This private method is used internally by the client and requires the + `block` and `block_slice` arguments to be pre-cast as NDBlock and NDSlice + types, respectively. - See read_block() for a public version of this. This private version - enables more efficient multi-block access by requiring the caller to - pass in the structure (dtype, shape). + This method uses the `/array/block` endpoint to fetch one block of the array, + at a time. The block boundaries are determined by the structure of the array, + and usually correspond to the chunking of the array on the server side. + + See read_block() for a public version of this. + + Parameters + ---------- + block : NDBlock + The chunk index, e.g. (0, 0), (0, 1), (0, 2) .... for a 2D array + chunked into 3 blocks. + block_slice : NDSlice, optional + A slice within this block to return. """ + media_type = "application/octet-stream" - if slice is not None: - # TODO The server accepts a slice parameter but we'll need to write - # careful code here to determine what the new shape will be. - raise NotImplementedError( - "Slicing less than one block is not yet supported." - ) - if shape: + + # Determine the expected shape of the resulting array after slicing + exp_shape = [] + # Expand the block to convert for URL + block = block.expand_for_shape([len(dim) for dim in self.chunks]) + if shape := block.shape_from_chunks(self.chunks): + exp_shape = block_slice.shape_after_slice(shape) if block_slice else shape # Check for special case of shape with 0 in it. - if 0 in shape: + if 0 in exp_shape: # This is valid, and it has come up in the wild. An array with # 0 as one of the dimensions never contains data, so we can # short-circuit here without any further information from the # service. - return numpy.array([], dtype=dtype).reshape(shape) - expected_shape = ",".join(map(str, shape)) - else: - expected_shape = "scalar" + return numpy.array([], dtype=self.dtype).reshape(exp_shape) + url_path = self.item["links"]["block"] + params = { + **parse_qs(urlparse(url_path).query), + "block": block.to_numpy_str(), + "expected_shape": ",".join(map(str, exp_shape)) or "scalar", + } + params = params | ({"slice": block_slice.to_numpy_str()} if block_slice else {}) for attempt in retry_context(): with attempt: content = handle_error( self.context.http_client.get( url_path, headers={"Accept": media_type}, - params={ - **parse_qs(urlparse(url_path).query), - "block": ",".join(map(str, block)), - "expected_shape": expected_shape, - }, + params=params, ) ).read() - return numpy.frombuffer(content, dtype=dtype).reshape(shape) - def read_block(self, block, slice=None): + return numpy.frombuffer(content, dtype=self.dtype).reshape(exp_shape) + + def _get_slice(self, slice: NDSlice): + """Fetch the data for a slice of the full array + + This private method is used internally by the client and requires the + `slice` argument to be pre-cast as an NDSlice type. + + The request is made to the `/array/full` endpoint. + + See read() for a public version of this. + + Parameters + ---------- + slice : NDSlice + A slice of the array to return, pass NDSlice() to return the whole array. """ - Access the data for one block of this chunked (dask) array. + + media_type = "application/octet-stream" + + # Determine the expected shape of the resulting array after slicing + exp_shape = slice.shape_after_slice(self.shape) if slice else self.shape + + # Check for special case of shape with 0 in it. + if 0 in exp_shape: + # This is valid, and it has come up in the wild. An array with + # 0 as one of the dimensions never contains data, so we can + # short-circuit here without any further information from the service. + return numpy.array([], dtype=self.dtype).reshape(exp_shape) + + url_path = self.item["links"]["full"] + params = { + **parse_qs(urlparse(url_path).query), + "expected_shape": ",".join(map(str, exp_shape)) or "scalar", + } + params = params | ({"slice": slice.to_numpy_str()} if slice else {}) + for attempt in retry_context(): + with attempt: + content = handle_error( + self.context.http_client.get( + url_path, + headers={"Accept": media_type}, + params=params, + ) + ).read() + + return numpy.frombuffer(content, dtype=self.dtype).reshape(exp_shape) + + def read_block(self, block, slice=None): + """Access the data for one block of this chunked (dask) array. + + This method uses the `/array/block` endpoint to fetch one block of the array, + at a time. The block boundaries are determined by the structure of the array, + and usually correspond to the chunking of the array on the server side. Optionally, access only a slice *within* this block. """ - structure = self.structure() - chunks = structure.chunks - dtype = structure.data_type.to_numpy_dtype() + + block, block_slice = NDBlock(block), NDSlice(slice) + try: - shape = tuple(chunks[dim][i] for dim, i in enumerate(block)) + shape = block.shape_from_chunks(self.chunks) except IndexError: raise IndexError(f"Block index {block} out of range") + + exp_shape = block_slice.shape_after_slice(shape) if block_slice else shape dask_array = dask.array.from_delayed( - dask.delayed(self._get_block)(block, dtype, shape), dtype=dtype, shape=shape + dask.delayed(self._get_block)(block, block_slice), + dtype=self.dtype, + shape=exp_shape, ) - # TODO Make the request in _get_block include the slice so that we only - # fetch exactly the data that we want. This will require careful code - # to determine what the shape will be. - if slice is not None: - dask_array = dask_array[slice] return dask_array def read(self, slice=None): - """ - Access the entire array or a slice. + """Access the entire array or its slice The array will be internally chunked with dask. """ - structure = self.structure() - shape = structure.shape - dtype = structure.data_type.to_numpy_dtype() - # Build a client-side dask array whose chunks pull from a server-side - # dask array. + + # Determine the expected shape of the resulting array after slicing + if arr_slice := NDSlice(slice): + arr_slice = arr_slice.expand_for_shape(self.shape) # Remove "..." + exp_shape = arr_slice.shape_after_slice(self.shape) + total_bytes = math.prod(exp_shape) * self.dtype.itemsize + + # Check for special case of shape with 0 in it. + if 0 in exp_shape: + # This is valid, and it has come up in the wild. An array with + # 0 as one of the dimensions never contains data, so we can + # short-circuit here without any further information from the service. + return dask.array.array([], dtype=self.dtype).reshape(exp_shape) + + # If the expected response is small, fetch it in one go. + if total_bytes < self.RESPONSE_BYTESIZE_LIMIT: + dask_array = dask.array.from_delayed( + dask.delayed(self._get_slice)(arr_slice), + dtype=self.dtype, + shape=exp_shape, + ) + return dask_array + + # The response is expected to be large, subslice it and recombine with dask + # Build chunk boundaries along each axis to find best candidate split points + chunk_bounds = tuple( + tuple(itertools.accumulate(axis_chunks, initial=0)) + for axis_chunks in self.chunks + ) + indexed_slices = split_slice( + arr_slice.expand_for_shape(self.shape), + max_size=self.RESPONSE_BYTESIZE_LIMIT // self.dtype.itemsize, + pref_splits=chunk_bounds, + ) + + # Build a client-side dask array whose chunks correspond to subsplits of the slice name = "remote-dask-array-" f"{self.uri}" - chunks = structure.chunks - # Count the number of blocks along each axis. - num_blocks = (range(len(n)) for n in chunks) - # Loop over each block index --- e.g. (0, 0), (0, 1), (0, 2) .... --- - # and build a dask task encoding the method for fetching its data from - # the server. dask_tasks = { - (name,) - + block: ( - self._get_block, - block, - dtype, - tuple(chunks[dim][i] for dim, i in enumerate(block)), - ) - for block in itertools.product(*num_blocks) + (name,) + indx: (self._get_slice, slc) + for indx, (slc) in indexed_slices.items() } dask_array = dask.array.Array( - dask=dask_tasks, name=name, chunks=chunks, dtype=dtype, shape=shape + name=name, + dask=dask_tasks, + dtype=self.dtype, + chunks=slices_to_dask_chunks(indexed_slices, self.shape), + shape=exp_shape, ) - if slice is not None: - dask_array = dask_array[slice] return dask_array def write(self, array, persist=True): diff --git a/tiled/client/utils.py b/tiled/client/utils.py index 3e73db23e..65f194f67 100644 --- a/tiled/client/utils.py +++ b/tiled/client/utils.py @@ -1,6 +1,7 @@ import builtins import os import uuid +from collections import defaultdict from collections.abc import Hashable from dataclasses import asdict from pathlib import Path @@ -14,6 +15,7 @@ import stamina.instrumentation from ..structures.core import Spec +from ..type_aliases import Chunks from ..utils import path_from_uri MSGPACK_MIME_TYPE = "application/x-msgpack" @@ -418,7 +420,7 @@ def get_asset_filepaths(node): return filepaths -def chunks_repr(chunks: tuple[tuple[int, ...], ...]) -> str: +def chunks_repr(chunks: Chunks) -> str: """A human-friendly representation of the chunks spec Avoids printing long line of repeated values when representing chunks @@ -455,3 +457,27 @@ def normalize_specs( spec = Spec(spec) normalized_specs.append(asdict(spec)) return normalized_specs + + +def slices_to_dask_chunks(slice_dict, shape): + """Convert a dictionary mapping into Dask-style chunk representation + + For example, a dictionary in the form {index_tuple: list[NDSlice]} is + converted into a tuple of tuples, one per axis. + """ + + # Collect chunk sizes per axis, keyed by axis index + ndim = len(next(iter(slice_dict.keys()))) + axis_chunks = [defaultdict(int) for _ in range(ndim)] + for idx, slc in slice_dict.items(): + shp = slc.shape_after_slice(shape) + for ax in range(ndim): + axis_chunks[ax][idx[ax]] = shp[ax] + + # Convert to ordered tuples (sorted by chunk index) + dask_chunks = tuple( + tuple(size for _, size in sorted(axis_dict.items())) + for axis_dict in axis_chunks + ) + + return dask_chunks diff --git a/tiled/ndslice.py b/tiled/ndslice.py index 61b577a76..d26330931 100644 --- a/tiled/ndslice.py +++ b/tiled/ndslice.py @@ -1,17 +1,232 @@ import builtins +import itertools +import math import re from typing import Optional, Union -from fastapi import Query +from ndindex import ndindex -from .type_aliases import JSON, EllipsisType +from .type_aliases import JSON, Chunks, EllipsisType -# NOTE: The regex below is used by fastapi to parse the string representation of a slice -# and raise a 422 error if the string is not valid. -# It does not capture certain erroneous cases, such as ",,", for example. -DIM_REGEX = r"(?:(?:-?\d+)?:){0,2}(?:-?\d+)?" -SLICE_REGEX = rf"^{DIM_REGEX}(?:,{DIM_REGEX})*$" -# SLICE_REGEX = rf"^(?:!,){DIM_REGEX}(?:,(?:=[^,]){DIM_REGEX})*$" + +def is_ellipsis(arg) -> bool: + return isinstance(arg, EllipsisType) or (arg == Ellipsis) + + +def merge_slices(*args: Union["NDSlice", "NDBlock"]) -> Union["NDSlice", "NDBlock"]: + """Merge multiple NDSlices or NDBlocks + + Creates a single NDSlice or (NDBlock if all arguemnts are NDBlocks) if they are + contiguous and compatible. + """ + if (len(args)) == 1: + return args[0] + + # Check that all arguments have the same dimension + if len(set(map(len, args))) != 1: + raise ValueError("All NDSlices must have the same number of dimensions") + + # Loop over dimensions and check if they can be merged + already_merged, result = False, [] + for dim in zip(*args): + # Ellipsis at the same position can be merged + if all(map(is_ellipsis, dim)): + result.append(Ellipsis) + continue + + # Integers must match exactly or be consecutive and have no gaps + if all(isinstance(d, int) for d in dim): + if len(vals := set(dim)) == 1: + # All integers are the same, keep this dimension as is + result.append(vals.pop()) + continue + elif vals == set(range(min(vals), max(vals) + 1)): + # Integers are consecutive, merge into a slice + if already_merged: + raise ValueError("Can not merge more than one dimension") + result.append(builtins.slice(min(vals), max(vals) + 1)) + already_merged = True + continue + else: + raise ValueError( + "Integer dimensions must match exactly " + "or be consecutive and have no gaps" + ) + + # Some dimensions could be integers and some are slices + if any(isinstance(d, int) for d in dim): + dim = tuple( + builtins.slice(d, d + 1) if isinstance(d, int) else d for d in dim + ) + + # Now we have only slices and can check for contiguity and compatibility + if all(isinstance(d, builtins.slice) for d in dim): + starts = set(d.start or 0 for d in dim) + stops = set(d.stop for d in dim) + + if len(set(d.step or 1 for d in dim)) != 1: + raise ValueError("Slice dimensions must have the same step") + + elif len(starts) == 1 and len(stops) == 1: + # All slices are the same, keep this dimension as is + result.append(dim[0]) + continue + + elif ( + (len(starts) == len(stops)) + and (len(first := starts - stops) == 1) + and (len(last := stops - starts) == 1) + ): + # Slices are consecutive, merge into a single slice + if already_merged: + raise ValueError("Can not merge more than one dimension") + result.append(builtins.slice(first.pop(), last.pop(), dim[0].step)) + already_merged = True + continue + + elif starts == stops: + # This is an empty dimension, keep it as is + val = starts.pop() + result.append(builtins.slice(val, val, dim[0].step)) + continue + + # If we got here, the slices are not compatible and can not be merged + raise ValueError("The slices are not compatible and can not be merged") + + _cls = NDBlock if all(isinstance(arg, NDBlock) for arg in args) else NDSlice + + return _cls(*result) + + +def split_1d(start, stop, step, max_len: int, pref_splits: Optional[list[int]] = None): + """Split a 1D slice into sub-slices that do not exceed max_len steps. + + Splits are chosen close to the ideal equal partition. If preferred + split points are provided, the closest one to the ideal point is used + as long as it does not violate the min/max constraints. + """ + + # Total number of steps and max steps per split + total_steps = math.ceil(abs(stop - start) / abs(step)) + + # Convert preferred points to index space + pref_indx = sorted( + (x - start) // step + for x in (pref_splits or []) + if x in range(start, stop, step) + ) + + result, crnt_indx, _pi = [], 0, 0 + while crnt_indx + max_len < total_steps: + # Compute ideal next index for equal partitioning of the remaining steps + steps_remained = total_steps - crnt_indx + num_splits = max(1, math.ceil(steps_remained / max_len)) + ideal_split_size = math.ceil(steps_remained / num_splits) + next_indx = crnt_indx + ideal_split_size + + # Check if there are preferred points between the current index and the max allowed + if pref_indx: + pref_best = None + while _pi < len(pref_indx) and pref_indx[_pi] <= crnt_indx + max_len: + if pref_indx[_pi] > crnt_indx: + pref_best = pref_best or pref_indx[_pi] + if abs(pref_indx[_pi] - next_indx) < abs(pref_best - next_indx): + pref_best = pref_indx[_pi] + _pi += 1 + next_indx = pref_best or next_indx + + result.append((start + crnt_indx * step, start + next_indx * step)) + crnt_indx = next_indx + + result.append((start + crnt_indx * step, stop)) + + return result + + +def split_slice( + arr_slice: "NDSlice", max_size: int, pref_splits: Optional[list[list[int]]] = None +) -> dict[tuple[int, ...], "NDSlice"]: + """Split an N-dimensional slice into smaller slices that do not exceed max_size + + Splits are chosen by iteratively subslicing along the most chunked or longest + dimension until the resulting slices are all under the max_size limit. + + Parameters + ---------- + arr_slice : NDSlice + The N-dimensional slice to split. The slice must be expanded to account for + the specific shape (i.e. not contain any ":" or "..."). Integer dimensions + are allowed. + max_size : int + The maximum allowed size (number of elements) for each resulting slice. + pref_splits : list of list of int, optional + Preferred split points for each dimension. + + Returns + ------- + dict[tuple[int, ...], NDSlice] + A dictionary mapping from index tuples to the corresponding NDSlice objects. + """ + + # Remove singleton dimensions and replace with slices for simplicity. Revert later + is_int_dim = [isinstance(s, int) for s in arr_slice] + arr_slice = arr_slice.unsqueeze() + ndim = len(arr_slice) + + # Make sure preferred split points align with the step grid and within the bounds + if pref_splits is not None: + pref_splits = [ + [ + x + for x in bnd + if x in range(slc.start, slc.stop, slc.step or 1) and x != slc.start + ] + for bnd, slc in zip(pref_splits, arr_slice) + ] + + # Start with the most chunked or longest dimension and subslice it + sorting_order = ( + [len(ps) for ps in pref_splits] + if pref_splits is not None + else [len(range(s.start, s.stop, s.step or 1)) for s in arr_slice] + ) + result = [[s] for s in arr_slice] + for d in sorted(range(ndim), key=lambda i: sorting_order[i], reverse=True): + # Find the size of largest block along all other dimensions, excluding d + max_other = math.prod( + [ + max(len(range(s.start, s.stop, s.step or 1)) for s in result[_d]) + for _d in range(ndim) + if _d != d + ] + ) + slc = result[d].pop() + + # Use maximum length along this dimension that keeps the slice under the limit + splits = split_1d( + slc.start, + slc.stop, + slc.step or 1, + max_len=max(1, int(max_size / max_other)), + pref_splits=pref_splits[d] if pref_splits is not None else None, + ) + result[d].extend([builtins.slice(a, b, slc.step) for a, b in splits]) + + # Check if we need further subslicing along other dimensions + max_crnt = max(len(range(s.start, s.stop, s.step or 1)) for s in result[d]) + if max_crnt * max_other <= max_size: + break + + # Replace (squeeze) any singleton slices if they were integers originally + result = [[res[0].start] if flag else res for res, flag in zip(result, is_int_dim)] + + # Form the dict of Cartesian products + keys = itertools.product( + *(range(len(x)) for x in result if not isinstance(x[0], int)) + ) + vals = itertools.product(*result) + + return {k: NDSlice(v) for k, v in zip(keys, vals)} class NDSlice(tuple): @@ -26,6 +241,8 @@ class NDSlice(tuple): """ def __new__(cls, *args: Union[int, builtins.slice, EllipsisType]) -> "NDSlice": + if len(args) == 1 and (isinstance(args[0], (tuple, list)) or args[0] is None): + return cls(*(args[0] or ())) if any(not isinstance(s, (int, builtins.slice, EllipsisType)) for s in args): raise TypeError( f"NDSlice expected int, slice or Ellipsis; got {args} instead. Use " @@ -39,7 +256,7 @@ def __new__(cls, *args: Union[int, builtins.slice, EllipsisType]) -> "NDSlice": return super().__new__(cls, tuple(args)) def __bool__(self) -> bool: - "NDSlice is considered empty if all slices are '...' or ':', i.e. returning entire array" + "NDSlice is considered empty if all slices are '...' or ':', returning entire array" full_slices = ( builtins.slice(None), builtins.slice(0, None), @@ -48,7 +265,7 @@ def __bool__(self) -> bool: Ellipsis, ) return bool(super()) and not all( - isinstance(s, EllipsisType) or s in full_slices for s in self + is_ellipsis(s) or s in full_slices for s in self ) @classmethod @@ -151,11 +368,6 @@ def from_numpy_str(cls, arg: str) -> "NDSlice": raise ValueError(f'Invalid slice part: "{part}"') return cls(*result) - @classmethod - def from_query(cls, slice: str = Query("", pattern=SLICE_REGEX)) -> "NDSlice": - """Parse and convert a query parameter representation of a slice""" - return cls.from_numpy_str(slice) - def to_numpy_str(self) -> str: "Convert NDSlice to a numpy-style string representation" result = [] @@ -174,3 +386,61 @@ def to_numpy_str(self) -> str: else: raise ValueError("Unprocessable entry in NDSlice, %s", s) return ",".join(result) + + def is_valid_for_shape(self, shape: tuple[int, ...]) -> bool: + "Check if this NDSlice is valid for an array of the given shape" + return (not self) or ndindex(self).isvalid(shape) + + def expand_for_shape(self, shape: tuple[int, ...]) -> "NDSlice": + "Expand this NDSlice (remove ':' or '...') for an array of the given shape" + return self.__class__(*ndindex(self).expand(shape).raw) + + def shape_after_slice(self, shape: tuple[int, ...]) -> tuple[int, ...]: + "Calculate the shape after applying NDSlice to an array of the given shape" + return ndindex(self).newshape(shape) if self else shape + + def unsqueeze(self) -> "NDSlice": + "Convert all integer dims to slices of length 1 to preserve the dimensionality" + return self.__class__( + *(builtins.slice(s, s + 1, 1) if isinstance(s, int) else s for s in self) + ) + + +class NDBlock(NDSlice): + """A slice used to specify a block index, i.e. a slice over the chunks of an array. + + A major requirement is for the sliced blocks/chunks to be contiguous, + i.e. the `step` parameter along each dimension, if specified, must be 1. + """ + + def __new__(cls, *args: Union[int, builtins.slice, EllipsisType]) -> "NDBlock": + inst = super().__new__(cls, *args) + + for s in inst: + if isinstance(s, builtins.slice) and s.step not in (None, 1): + raise ValueError( + f"NDBlock can only contain slices with step 1; got {s} instead." + ) + + return inst + + def shape_from_chunks(self, chunks: Chunks) -> tuple[int, ...]: + "Find the shape of the block of chunks for an array with the given chunks" + expanded = self.expand_for_shape(tuple(map(len, chunks))) + selected = tuple(ch[sl] for ch, sl in zip(chunks, expanded)) + return tuple(sum(ch) if isinstance(ch, tuple) else ch for ch in selected) + + def slice_from_chunks(self, chunks: Chunks) -> NDSlice: + "Find the slice over entire array with given chunks that selects this block" + expanded = self.expand_for_shape(tuple(map(len, chunks))) + slice_ = [] + for sl, ch in zip(expanded, chunks): + if isinstance(sl, int): + start = sum(ch[:sl]) + slice_.append(slice(start, start + ch[sl])) + elif isinstance(sl, builtins.slice): + start = sum(ch[: sl.start]) if sl.start is not None else None + stop = sum(ch[: sl.stop]) if sl.stop is not None else None + slice_.append(slice(start, stop)) + + return NDSlice(*slice_) diff --git a/tiled/server/dependencies.py b/tiled/server/dependencies.py index 81f9b8a53..a289eb632 100644 --- a/tiled/server/dependencies.py +++ b/tiled/server/dependencies.py @@ -2,10 +2,16 @@ import pydantic_settings from fastapi import HTTPException, Query, Request -from starlette.status import HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_410_GONE +from starlette.status import ( + HTTP_400_BAD_REQUEST, + HTTP_403_FORBIDDEN, + HTTP_404_NOT_FOUND, + HTTP_410_GONE, +) from ..access_control.protocols import AccessPolicy from ..adapters.protocols import AnyAdapter +from ..ndslice import NDBlock, NDSlice from ..structures.core import StructureFamily from ..type_aliases import AccessTags, Scopes from ..utils import BrokenLink @@ -13,6 +19,13 @@ from .schemas import Principal from .utils import filter_for_access, record_timing +# NOTE: The regex below is used by fastapi to parse the string representation of a slice +# and raise a 422 error if the string is not valid. +# It does not capture certain erroneous cases, such as ",,", for example. +# It does not support Ellipsis ("..."). +DIM_REGEX = r"(?:(?:-?\d+)?:){0,2}(?:-?\d+)?" +SLICE_REGEX = rf"^{DIM_REGEX}(?:,{DIM_REGEX})*$" + def get_root_tree(request: Request): return request.app.state.root_tree @@ -125,14 +138,22 @@ async def get_entry( ) -def block( - # Ellipsis as the "default" tells FastAPI to make this parameter required. - block: str = Query(..., pattern="^[0-9]*(,[0-9]+)*$"), -): - "Specify and parse a block index parameter." - if not block: - return () - return tuple(map(int, block.split(","))) +def parse_block_param(block: str = Query(..., pattern="^[0-9]*(,[0-9]+)*$")) -> NDBlock: + "Specify and parse a block index parameter" + try: + # Even though NDBlock can contain slices, we currently only support indexing + # with integers, and the server wouldn't accept slices in the query + return NDBlock.from_numpy_str(block) + except ValueError as e: + raise HTTPException(status_code=HTTP_400_BAD_REQUEST, detail=str(e)) + + +def parse_slice_param(slice: str = Query("", pattern=SLICE_REGEX)): + "Specify and parse a slice parameter" + try: + return NDSlice.from_numpy_str(slice) + except ValueError as e: + raise HTTPException(status_code=HTTP_400_BAD_REQUEST, detail=str(e)) def expected_shape( diff --git a/tiled/server/router.py b/tiled/server/router.py index fee73b651..d6d12db7d 100644 --- a/tiled/server/router.py +++ b/tiled/server/router.py @@ -36,6 +36,7 @@ HTTP_406_NOT_ACCEPTABLE, HTTP_410_GONE, HTTP_422_UNPROCESSABLE_CONTENT, + HTTP_500_INTERNAL_SERVER_ERROR, ) from tiled.adapters.protocols import AnyAdapter @@ -48,7 +49,7 @@ from .. import __version__ from ..links import links_for_node -from ..ndslice import NDSlice +from ..ndslice import NDBlock, NDSlice from ..stream_messages import ArrayPatch from ..structures.core import Spec, StructureFamily from ..type_aliases import AccessTags, Scopes @@ -82,11 +83,12 @@ resolve_media_type, ) from .dependencies import ( - block, expected_shape, get_entry, get_root_tree, offset_param, + parse_block_param, + parse_slice_param, patch_offset_param, patch_shape_param, shape_param, @@ -512,8 +514,8 @@ async def metadata( async def array_block( request: Request, path: str, - block=Depends(block), - slice=Depends(NDSlice.from_query), + block: NDBlock = Depends(parse_block_param), + slice: NDSlice = Depends(parse_slice_param), expected_shape=Depends(expected_shape), format: Optional[str] = None, filename: Optional[str] = None, @@ -540,24 +542,56 @@ async def array_block( {StructureFamily.array, StructureFamily.sparse}, getattr(request.app.state, "access_policy", None), ) - shape = entry.structure().shape - # Check that block dimensionality matches array dimensionality. + shape, chunks = entry.structure().shape, entry.structure().chunks ndim = len(shape) + + # Check that request block matches the chunks dimensionality + if block == () and ndim > 0: + raise HTTPException( + status_code=HTTP_422_UNPROCESSABLE_CONTENT, + detail=f"Requested scalar but shape is {shape}", + ) if len(block) != ndim: raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, + status_code=HTTP_422_UNPROCESSABLE_CONTENT, detail=( f"Block parameter must have {ndim} comma-separated parameters, " f"corresponding to the dimensions of this {ndim}-dimensional array." ), ) - if block == (): + if not block.is_valid_for_shape(tuple(map(len, chunks))): + raise HTTPException( + status_code=HTTP_422_UNPROCESSABLE_CONTENT, + detail=( + "Block parameter does not match the chunks dimensionality. " + f"Expected {ndim} comma-separated integers, " + f"that index {chunks}, got {block}." + ), + ) + + block_shape = block.shape_from_chunks(chunks) + if not slice.is_valid_for_shape(block_shape): + raise HTTPException( + status_code=HTTP_422_UNPROCESSABLE_CONTENT, + detail=( + f"Slice parameter {slice} is not valid for the shape {block_shape} " + "that would result from the requested block." + ), + ) + + # Check if resulting shape matches expected and raise before even loading the data + resulting_shape = slice.shape_after_slice(block_shape) + if (expected_shape is not None) and (expected_shape != resulting_shape): + raise HTTPException( + status_code=HTTP_422_UNPROCESSABLE_CONTENT, + detail=( + f"The expected shape {expected_shape} does not match the actual shape " + f"{resulting_shape} that would result from the requested block and slice." + ), + ) + + if ndim == 0: # Handle special case of numpy scalar. - if shape != (): - raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, - detail=f"Requested scalar but shape is {entry.structure().shape}", - ) with record_timing(request.state.metrics, "read"): array = await ensure_awaitable(entry.read) else: @@ -566,12 +600,15 @@ async def array_block( array = await ensure_awaitable(entry.read_block, block, slice) except IndexError: raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, detail="Block index out of range" + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + detail="Block index out of range", ) + # Something is wrong with the shape of the data vs the value in the structure if (expected_shape is not None) and (expected_shape != array.shape): raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, - detail=f"The expected_shape {expected_shape} does not match the actual shape {array.shape}", + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"The shape expected from the structure {expected_shape} " + f"does not match the actual data shape {array.shape}", ) if array.nbytes > settings.response_bytesize_limit: raise HTTPException( @@ -595,7 +632,6 @@ async def array_block( filename=filename, ) except UnsupportedMediaTypes as err: - # raise HTTPException(status_code=406, detail=", ".join(err.supported)) raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail=err.args[0]) @router.get( @@ -604,7 +640,7 @@ async def array_block( async def array_full( path: str, request: Request, - slice=Depends(NDSlice.from_query), + slice: NDSlice = Depends(parse_slice_param), expected_shape=Depends(expected_shape), format: Optional[str] = None, filename: Optional[str] = None, @@ -640,15 +676,19 @@ async def array_full( with record_timing(request.state.metrics, "read"): array = await ensure_awaitable(entry.read, slice) if structure_family == StructureFamily.array: - array = numpy.asarray(array) # Force dask or PIMS or ... to do I/O. + # Force dask or PIMS or ... to do I/O. Ensure dtype is preserved. + array = numpy.asarray( + array, dtype=entry.structure().data_type.to_numpy_dtype() + ) except IndexError: raise HTTPException( status_code=HTTP_400_BAD_REQUEST, detail="Block index out of range" ) if (expected_shape is not None) and (expected_shape != array.shape): raise HTTPException( - status_code=HTTP_400_BAD_REQUEST, - detail=f"The expected_shape {expected_shape} does not match the actual shape {array.shape}", + status_code=HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"The shape expected from the structure {expected_shape} " + f"does not match the actual data shape {array.shape}", ) if array.nbytes > settings.response_bytesize_limit: raise HTTPException( @@ -1757,7 +1797,7 @@ async def put_array_full( async def put_array_block( request: Request, path: str, - block=Depends(block), + block: NDBlock = Depends(parse_block_param), persist: bool = Query(True, description="Persist data to storage"), principal: Optional[Principal] = Depends(get_current_principal), root_tree=Depends(get_root_tree), diff --git a/tiled/structures/array.py b/tiled/structures/array.py index abd931559..cf96d5b17 100644 --- a/tiled/structures/array.py +++ b/tiled/structures/array.py @@ -9,6 +9,8 @@ from tiled.structures.root import Structure +from ..type_aliases import Chunks + # from dtype.descr FieldDescr = Union[Tuple[str, str], Tuple[str, str, Tuple[int, ...]]] NumpyDescr = List[FieldDescr] @@ -230,7 +232,7 @@ def from_json(cls, structure: Mapping[str, Any]) -> "StructDtype": @dataclass class ArrayStructure(Structure): data_type: Union[BuiltinDtype, StructDtype] - chunks: Tuple[Tuple[int, ...], ...] # tuple-of-tuples-of-ints like ((3,), (3,)) + chunks: Chunks # tuple-of-tuples-of-ints like ((3,), (3,)) shape: Tuple[int, ...] # tuple of ints like (3, 3) dims: Optional[Tuple[str, ...]] = None # None or tuple of names like ("x", "y") resizable: Union[bool, Tuple[bool, ...]] = False diff --git a/tiled/structures/sparse.py b/tiled/structures/sparse.py index 2d5ea5e18..a36290e22 100644 --- a/tiled/structures/sparse.py +++ b/tiled/structures/sparse.py @@ -5,6 +5,7 @@ from tiled.structures.root import Structure +from ..type_aliases import Chunks from .array import BuiltinDtype, Endianness, Kind, StructDtype @@ -16,7 +17,7 @@ class SparseLayout(str, enum.Enum): @dataclass class COOStructure(Structure): - chunks: Tuple[Tuple[int, ...], ...] # tuple-of-tuples-of-ints like ((3,), (3,)) + chunks: Chunks # tuple-of-tuples-of-ints like ((3,), (3,)) shape: Tuple[int, ...] # tuple of ints like (3, 3) data_type: Optional[Union[BuiltinDtype, StructDtype]] = None coord_data_type: Optional[BuiltinDtype] = field( diff --git a/tiled/type_aliases.py b/tiled/type_aliases.py index 0f98e764c..61df73aae 100644 --- a/tiled/type_aliases.py +++ b/tiled/type_aliases.py @@ -16,6 +16,7 @@ Mapping, Sequence, Set, + Tuple, TypedDict, Union, ) @@ -32,6 +33,7 @@ Filters = List[Query] AccessBlob = Mapping[str, Any] AccessTags = Set[str] +Chunks = Tuple[Tuple[int, ...], ...] AppTask = Callable[[], Coroutine[None, None, Any]] """Async function to be run as part of the app's lifecycle""" diff --git a/web-frontend/package-lock.json b/web-frontend/package-lock.json index d8171cf8d..6c1d8de38 100644 --- a/web-frontend/package-lock.json +++ b/web-frontend/package-lock.json @@ -122,7 +122,6 @@ "resolved": "https://registry.npmjs.org/@babel/core/-/core-7.28.3.tgz", "integrity": "sha512-yDBHV9kQNcr2/sUr9jghVyz9C3Y5G2zUM2H2lo+9mKv4sFgbA8s8Z9t8D1jiTkGoO/NoIfKMyKWr4s6CN23ZwQ==", "license": "MIT", - "peer": true, "dependencies": { "@ampproject/remapping": "^2.2.0", "@babel/code-frame": "^7.27.1", @@ -522,7 +521,6 @@ } ], "license": "MIT", - "peer": true, "engines": { "node": ">=18" }, @@ -546,7 +544,6 @@ } ], "license": "MIT", - "peer": true, "engines": { "node": ">=18" } @@ -615,7 +612,6 @@ "resolved": "https://registry.npmjs.org/@emotion/react/-/react-11.14.0.tgz", "integrity": "sha512-O000MLDBDdk/EohJPFUqvnp4qnHeYkVP5B0xEG0D/L7cOKP9kefu2DXn8dj74cQfsEzUqh+sr1RzFqiL1o+PpA==", "license": "MIT", - "peer": true, "dependencies": { "@babel/runtime": "^7.18.3", "@emotion/babel-plugin": "^11.13.5", @@ -659,7 +655,6 @@ "resolved": "https://registry.npmjs.org/@emotion/styled/-/styled-11.14.1.tgz", "integrity": "sha512-qEEJt42DuToa3gurlH4Qqc1kVpNq8wO8cJtDzU46TjlzWjDlsVyevtYCRijVq3SrHsROS+gVQ8Fnea108GnKzw==", "license": "MIT", - "peer": true, "dependencies": { "@babel/runtime": "^7.18.3", "@emotion/babel-plugin": "^11.13.5", @@ -1410,7 +1405,6 @@ "resolved": "https://registry.npmjs.org/@mui/material/-/material-5.18.0.tgz", "integrity": "sha512-bbH/HaJZpFtXGvWg3TsBWG4eyt3gah3E7nCNU8GLyRjVoWcA91Vm/T+sjHfUcwgJSw9iLtucfHBoq+qW/T30aA==", "license": "MIT", - "peer": true, "dependencies": { "@babel/runtime": "^7.23.9", "@mui/core-downloads-tracker": "^5.18.0", @@ -2442,7 +2436,8 @@ "resolved": "https://registry.npmjs.org/@types/aria-query/-/aria-query-5.0.4.tgz", "integrity": "sha512-rfT93uj5s0PRL7EzccGMs3brplhcrghnDoV26NqKhCAS1hVo+WdNsPvE/yb6ilfr5hi2MEk6d5EWJTKdxg8jVw==", "dev": true, - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/@types/babel__core": { "version": "7.20.5", @@ -2637,7 +2632,6 @@ "integrity": "sha512-m5ObIqwsUp6BZzyiy4RdZpzWGub9bqLJMvZDD0QMXhxjqMHMENlj+SqF5QxoUwaQNFe+8kz8XM8ZQhqkQPTgMQ==", "devOptional": true, "license": "MIT", - "peer": true, "dependencies": { "undici-types": "~6.21.0" } @@ -2659,7 +2653,6 @@ "resolved": "https://registry.npmjs.org/@types/react/-/react-19.1.11.tgz", "integrity": "sha512-lr3jdBw/BGj49Eps7EvqlUaoeA0xpj3pc0RoJkHpYaCHkVK7i28dKyImLQb3JVlqs3aYSXf7qYuWOW/fgZnTXQ==", "license": "MIT", - "peer": true, "dependencies": { "csstype": "^3.0.2" } @@ -2681,7 +2674,6 @@ "integrity": "sha512-xG7xaBMJCpcK0RpN8jDbAACQo54ycO6h4dSSmgv8+fu6ZIAdANkx/WsawASUjVXYfy+J9AbUpRMNNEsXCDfDBQ==", "dev": true, "license": "MIT", - "peer": true, "peerDependencies": { "@types/react": "^19.0.0" } @@ -2953,6 +2945,7 @@ "integrity": "sha512-Cxwpt2SfTzTtXcfOlzGEee8O+c+MmUgGrNiBcXnuWxuFJHe6a5Hz7qwhwe5OgaSYI0IJvkLqWX1ASG+cJOkEiA==", "dev": true, "license": "MIT", + "peer": true, "engines": { "node": ">=10" }, @@ -3085,7 +3078,6 @@ } ], "license": "MIT", - "peer": true, "dependencies": { "caniuse-lite": "^1.0.30001735", "electron-to-chromium": "^1.5.204", @@ -3551,7 +3543,8 @@ "resolved": "https://registry.npmjs.org/dom-accessibility-api/-/dom-accessibility-api-0.5.16.tgz", "integrity": "sha512-X7BJ2yElsnOJ30pZF4uIIDfBEVgF4XEBxL9Bxhy6dnrm5hkzqmsWHGTiHqRiITNhMyFLyAiWndIJP7Z1NTteDg==", "dev": true, - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/dom-helpers": { "version": "5.2.1", @@ -4430,7 +4423,6 @@ "integrity": "sha512-Cvc9WUhxSMEo4McES3P7oK3QaXldCfNWp7pl2NNeiIFlCoLr3kfq9kb1fxftiwk1FLV7CvpvDfonxtzUDeSOPg==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "cssstyle": "^4.2.1", "data-urls": "^5.0.0", @@ -4558,6 +4550,7 @@ "integrity": "sha512-h5bgJWpxJNswbU7qCrV0tIKQCaS3blPDrqKWx+QxzuzL1zGUzij9XCWLrSLsJPu5t+eWA/ycetzYAO5IOMcWAQ==", "dev": true, "license": "MIT", + "peer": true, "bin": { "lz-string": "bin/bin.js" } @@ -4903,7 +4896,6 @@ "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "license": "MIT", - "peer": true, "engines": { "node": ">=12" }, @@ -4993,6 +4985,7 @@ "integrity": "sha512-Qb1gy5OrP5+zDf2Bvnzdl3jsTf1qXVMazbvCoKhtKqVs4/YK4ozX4gKQJJVyNe+cajNPn0KoC0MC3FUmaHWEmQ==", "dev": true, "license": "MIT", + "peer": true, "dependencies": { "ansi-regex": "^5.0.1", "ansi-styles": "^5.0.0", @@ -5007,7 +5000,8 @@ "resolved": "https://registry.npmjs.org/react-is/-/react-is-17.0.2.tgz", "integrity": "sha512-w2GsyukL62IJnlaff/nRegPQR94C/XXamvMWmSHRJ4y7Ts/4ocGRmTHvOs8PSE6pB3dWOrD/nueuU5sduBsQ4w==", "dev": true, - "license": "MIT" + "license": "MIT", + "peer": true }, "node_modules/prismjs": { "version": "1.30.0", @@ -5089,7 +5083,6 @@ "resolved": "https://registry.npmjs.org/react/-/react-19.1.1.tgz", "integrity": "sha512-w8nqGImo45dmMIfljjMwOGtbmC/mk4CMYhWIicdSflH91J9TyCyczcPFXJzrZ/ZXcgGRFeP6BU0BEJTw6tZdfQ==", "license": "MIT", - "peer": true, "engines": { "node": ">=0.10.0" } @@ -5099,7 +5092,6 @@ "resolved": "https://registry.npmjs.org/react-dom/-/react-dom-19.1.1.tgz", "integrity": "sha512-Dlq/5LAZgF0Gaz6yiqZCf6VCcZs1ghAJyrsu84Q/GT0gV+mCxbfmKNoGRKBYMJ8IEdGPqu49YWXD02GCknEDkw==", "license": "MIT", - "peer": true, "dependencies": { "scheduler": "^0.26.0" }, @@ -5919,7 +5911,6 @@ "resolved": "https://registry.npmjs.org/vite/-/vite-7.1.3.tgz", "integrity": "sha512-OOUi5zjkDxYrKhTV3V7iKsoS37VUM7v40+HuwEmcrsf11Cdx9y3DIr2Px6liIcZFwt3XSRpQvFpL3WVy7ApkGw==", "license": "MIT", - "peer": true, "dependencies": { "esbuild": "^0.25.0", "fdir": "^6.5.0", @@ -6054,7 +6045,6 @@ "integrity": "sha512-LUCP5ev3GURDysTWiP47wRRUpLKMOfPh+yKTx3kVIEiu5KOMeqzpnYNsKyOoVrULivR8tLcks4+lga33Whn90A==", "dev": true, "license": "MIT", - "peer": true, "dependencies": { "@types/chai": "^5.2.2", "@vitest/expect": "3.2.4",