From cd0322eb13c4860058b0dcaddbab36dea3ed27e4 Mon Sep 17 00:00:00 2001 From: Tom White Date: Mon, 1 Sep 2025 15:03:18 +0100 Subject: [PATCH 1/2] Allow an arbitrary Zarr store to be used for intermediate storage --- cubed/core/ops.py | 10 +++++----- cubed/core/plan.py | 19 ++++++++----------- cubed/spec.py | 10 ++++++++++ cubed/tests/test_store.py | 28 ++++++++++++++++++++++++++++ 4 files changed, 51 insertions(+), 16 deletions(-) create mode 100644 cubed/tests/test_store.py diff --git a/cubed/core/ops.py b/cubed/core/ops.py index 46f6ffef..2bb43f79 100644 --- a/cubed/core/ops.py +++ b/cubed/core/ops.py @@ -17,7 +17,7 @@ from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array from cubed.backend_array_api import namespace as nxp from cubed.core.array import CoreArray, check_array_specs, compute, gensym -from cubed.core.plan import Plan, context_dir_path +from cubed.core.plan import Plan, intermediate_store from cubed.primitive.blockwise import blockwise as primitive_blockwise from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise from cubed.primitive.memory import get_buffer_copies @@ -315,7 +315,7 @@ def blockwise( spec = check_array_specs(arrays) buffer_copies = get_buffer_copies(spec) if target_store is None: - target_store = context_dir_path(spec=spec) + target_store = intermediate_store(spec=spec) op = primitive_blockwise( func, out_ind, @@ -469,14 +469,14 @@ def _general_blockwise( if isinstance(target_stores, list): # multiple outputs name = [gensym() for _ in range(len(target_stores))] target_stores = [ - ts if ts is not None else context_dir_path(spec=spec) + ts if ts is not None else intermediate_store(spec=spec) for ts in target_stores ] target_names = name else: # single output name = gensym() if target_stores is None: - target_stores = [context_dir_path(spec=spec)] + target_stores = [intermediate_store(spec=spec)] target_names = [name] op = primitive_general_blockwise( @@ -903,7 +903,7 @@ def rechunk(x, chunks, *, target_store=None, min_mem=None, use_new_impl=True): name = gensym() spec = x.spec if target_store is None: - target_store = context_dir_path(spec=spec) + target_store = intermediate_store(spec=spec) name_int = f"{name}-int" ops = primitive_rechunk( x._zarray, diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 55de62e6..3a6a042f 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -558,7 +558,14 @@ def arrays_to_plan(*arrays): return plans[0].arrays_to_plan(*arrays) -def context_dir_path(spec=None): +def intermediate_store(spec=None): + """Return a file path or a store object that is used for storing + intemediate data. + + By default returns a temporary file path, which may be local or remote. + """ + if spec.intermediate_store is not None: + return spec.intermediate_store work_dir = spec.work_dir if spec is not None else None if work_dir is None: work_dir = tempfile.gettempdir() @@ -567,16 +574,6 @@ def context_dir_path(spec=None): return context_dir -def new_temp_path(name, suffix=".zarr", spec=None): - """Return a string path for a temporary file path, which may be local or remote. - - Note that this function does not create the file or any directories (and they - may never be created, if for example the file doesn't need to be materialized). - """ - context_dir = context_dir_path(spec) - return join_path(context_dir, f"{name}{suffix}") - - def create_zarr_array(lazy_zarr_array, *, config=None): """Stage function for create.""" lazy_zarr_array.create(mode="a") diff --git a/cubed/spec.py b/cubed/spec.py index 96d930fd..d777fd21 100644 --- a/cubed/spec.py +++ b/cubed/spec.py @@ -6,6 +6,7 @@ from cubed.runtime.create import create_executor from cubed.runtime.types import Executor +from cubed.types import T_Store from cubed.utils import convert_to_bytes @@ -22,6 +23,9 @@ def __init__( executor_options: Optional[dict] = None, storage_options: Union[dict, None] = None, zarr_compressor: Union[dict, str, None] = "default", + intermediate_store: Union[ + T_Store, None + ] = None, # TODO: doc, repr, eq (+position?) ): """ Specify resources available to run a computation. @@ -65,6 +69,7 @@ def __init__( self._executor_options = executor_options self._storage_options = storage_options self._zarr_compressor = zarr_compressor + self._intermediate_store = intermediate_store @property def work_dir(self) -> Optional[str]: @@ -118,6 +123,11 @@ def zarr_compressor(self) -> Union[dict, str, None]: """The compressor used by Zarr for intermediate data.""" return self._zarr_compressor + @property + def intermediate_store(self) -> Union[dict, str, None]: + """The Zarr store for intermediate data. Takes precedence over 'work_dir'.""" + return self._intermediate_store + def __repr__(self) -> str: return ( f"cubed.Spec(work_dir={self._work_dir}, allowed_mem={self._allowed_mem}, " diff --git a/cubed/tests/test_store.py b/cubed/tests/test_store.py new file mode 100644 index 00000000..a08908ab --- /dev/null +++ b/cubed/tests/test_store.py @@ -0,0 +1,28 @@ +import numpy as np +import pytest +import zarr +from numpy.testing import assert_array_equal + +import cubed +import cubed.array_api as xp + +ZARR_PYTHON_V2 = zarr.__version__[0] == "2" + + +@pytest.mark.skipif( + ZARR_PYTHON_V2, + reason="setting an arbitrary Zarr store is not supported for Zarr Python v2", +) +def test_arbitrary_zarr_store(): + store = zarr.storage.MemoryStore() + spec = cubed.Spec(intermediate_store=store, allowed_mem="100kB") + a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec) + b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec) + c = xp.add(a, b) + assert_array_equal(c, np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]])) + + # check store was used + z = zarr.open_group(store) + array_keys = list(z.array_keys()) + assert len(array_keys) == 1 + assert array_keys[0].startswith("array-") From f806ba197281b9d3b0f413cacb2b92c71045d87c Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 11 Sep 2025 17:22:21 +0100 Subject: [PATCH 2/2] Don't run test_arbitrary_zarr_store on tensorstore --- cubed/tests/test_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cubed/tests/test_store.py b/cubed/tests/test_store.py index a08908ab..189d1539 100644 --- a/cubed/tests/test_store.py +++ b/cubed/tests/test_store.py @@ -5,13 +5,14 @@ import cubed import cubed.array_api as xp +from cubed.storage.backend import backend_storage_name ZARR_PYTHON_V2 = zarr.__version__[0] == "2" @pytest.mark.skipif( - ZARR_PYTHON_V2, - reason="setting an arbitrary Zarr store is not supported for Zarr Python v2", + ZARR_PYTHON_V2 or backend_storage_name() == "tensorstore", + reason="setting an arbitrary Zarr store is not supported for Zarr Python v2, or tensorstore", ) def test_arbitrary_zarr_store(): store = zarr.storage.MemoryStore()