Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array
from cubed.backend_array_api import namespace as nxp
from cubed.core.array import CoreArray, check_array_specs, compute, gensym
from cubed.core.plan import Plan, context_dir_path
from cubed.core.plan import Plan, intermediate_store
from cubed.primitive.blockwise import blockwise as primitive_blockwise
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
from cubed.primitive.memory import get_buffer_copies
Expand Down Expand Up @@ -315,7 +315,7 @@ def blockwise(
spec = check_array_specs(arrays)
buffer_copies = get_buffer_copies(spec)
if target_store is None:
target_store = context_dir_path(spec=spec)
target_store = intermediate_store(spec=spec)
op = primitive_blockwise(
func,
out_ind,
Expand Down Expand Up @@ -469,14 +469,14 @@ def _general_blockwise(
if isinstance(target_stores, list): # multiple outputs
name = [gensym() for _ in range(len(target_stores))]
target_stores = [
ts if ts is not None else context_dir_path(spec=spec)
ts if ts is not None else intermediate_store(spec=spec)
for ts in target_stores
]
target_names = name
else: # single output
name = gensym()
if target_stores is None:
target_stores = [context_dir_path(spec=spec)]
target_stores = [intermediate_store(spec=spec)]
target_names = [name]

op = primitive_general_blockwise(
Expand Down Expand Up @@ -903,7 +903,7 @@ def rechunk(x, chunks, *, target_store=None, min_mem=None, use_new_impl=True):
name = gensym()
spec = x.spec
if target_store is None:
target_store = context_dir_path(spec=spec)
target_store = intermediate_store(spec=spec)
name_int = f"{name}-int"
ops = primitive_rechunk(
x._zarray,
Expand Down
19 changes: 8 additions & 11 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,14 @@ def arrays_to_plan(*arrays):
return plans[0].arrays_to_plan(*arrays)


def context_dir_path(spec=None):
def intermediate_store(spec=None):
"""Return a file path or a store object that is used for storing
intemediate data.

By default returns a temporary file path, which may be local or remote.
"""
if spec.intermediate_store is not None:
return spec.intermediate_store
work_dir = spec.work_dir if spec is not None else None
if work_dir is None:
work_dir = tempfile.gettempdir()
Expand All @@ -567,16 +574,6 @@ def context_dir_path(spec=None):
return context_dir


def new_temp_path(name, suffix=".zarr", spec=None):
"""Return a string path for a temporary file path, which may be local or remote.

Note that this function does not create the file or any directories (and they
may never be created, if for example the file doesn't need to be materialized).
"""
context_dir = context_dir_path(spec)
return join_path(context_dir, f"{name}{suffix}")


def create_zarr_array(lazy_zarr_array, *, config=None):
"""Stage function for create."""
lazy_zarr_array.create(mode="a")
Expand Down
10 changes: 10 additions & 0 deletions cubed/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from cubed.runtime.create import create_executor
from cubed.runtime.types import Executor
from cubed.types import T_Store
from cubed.utils import convert_to_bytes


Expand All @@ -22,6 +23,9 @@ def __init__(
executor_options: Optional[dict] = None,
storage_options: Union[dict, None] = None,
zarr_compressor: Union[dict, str, None] = "default",
intermediate_store: Union[
T_Store, None
] = None, # TODO: doc, repr, eq (+position?)
):
"""
Specify resources available to run a computation.
Expand Down Expand Up @@ -65,6 +69,7 @@ def __init__(
self._executor_options = executor_options
self._storage_options = storage_options
self._zarr_compressor = zarr_compressor
self._intermediate_store = intermediate_store

@property
def work_dir(self) -> Optional[str]:
Expand Down Expand Up @@ -118,6 +123,11 @@ def zarr_compressor(self) -> Union[dict, str, None]:
"""The compressor used by Zarr for intermediate data."""
return self._zarr_compressor

@property
def intermediate_store(self) -> Union[dict, str, None]:
"""The Zarr store for intermediate data. Takes precedence over 'work_dir'."""
return self._intermediate_store

def __repr__(self) -> str:
return (
f"cubed.Spec(work_dir={self._work_dir}, allowed_mem={self._allowed_mem}, "
Expand Down
29 changes: 29 additions & 0 deletions cubed/tests/test_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import numpy as np
import pytest
import zarr
from numpy.testing import assert_array_equal

import cubed
import cubed.array_api as xp
from cubed.storage.backend import backend_storage_name

ZARR_PYTHON_V2 = zarr.__version__[0] == "2"


@pytest.mark.skipif(
ZARR_PYTHON_V2 or backend_storage_name() == "tensorstore",
reason="setting an arbitrary Zarr store is not supported for Zarr Python v2, or tensorstore",
)
def test_arbitrary_zarr_store():
store = zarr.storage.MemoryStore()
spec = cubed.Spec(intermediate_store=store, allowed_mem="100kB")
a = xp.asarray([[1, 2, 3], [4, 5, 6], [7, 8, 9]], chunks=(2, 2), spec=spec)
b = xp.asarray([[1, 1, 1], [1, 1, 1], [1, 1, 1]], chunks=(2, 2), spec=spec)
c = xp.add(a, b)
assert_array_equal(c, np.array([[2, 3, 4], [5, 6, 7], [8, 9, 10]]))

# check store was used
z = zarr.open_group(store)
array_keys = list(z.array_keys())
assert len(array_keys) == 1
assert array_keys[0].startswith("array-")
Loading