Skip to content

Commit 35bca63

Browse files
authored
Separate store and array name for intermediate storage (#785)
* Use array name for path within store * Use array name for path within store for rechunk * Fix mypy
1 parent 30be4d5 commit 35bca63

File tree

6 files changed

+57
-35
lines changed

6 files changed

+57
-35
lines changed

cubed/core/ops.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@
1717
from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array
1818
from cubed.backend_array_api import namespace as nxp
1919
from cubed.core.array import CoreArray, check_array_specs, compute, gensym
20-
from cubed.core.plan import Plan, new_temp_path
20+
from cubed.core.plan import Plan, context_dir_path
2121
from cubed.primitive.blockwise import blockwise as primitive_blockwise
2222
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
2323
from cubed.primitive.memory import get_buffer_copies
2424
from cubed.primitive.rechunk import rechunk as primitive_rechunk
2525
from cubed.spec import spec_from_config
2626
from cubed.storage.backend import open_backend_array
27+
from cubed.storage.zarr import lazy_zarr_array
2728
from cubed.types import T_RegularChunks, T_Shape
2829
from cubed.utils import (
2930
array_memory,
@@ -157,6 +158,14 @@ def store(sources: Union["Array", Sequence["Array"]], targets, executor=None, **
157158
for source, target in zip(sources, targets):
158159
identity = lambda a: a
159160
ind = tuple(range(source.ndim))
161+
162+
if target is not None and not isinstance(target, zarr.Array):
163+
target = lazy_zarr_array(
164+
target,
165+
shape=source.shape,
166+
dtype=source.dtype,
167+
chunks=source.chunksize,
168+
)
160169
array = blockwise(
161170
identity,
162171
ind,
@@ -192,6 +201,14 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs):
192201
# by map fusion (if it was produced with a blockwise operation).
193202
identity = lambda a: a
194203
ind = tuple(range(x.ndim))
204+
if store is not None and not isinstance(store, zarr.Array):
205+
store = lazy_zarr_array(
206+
store,
207+
shape=x.shape,
208+
dtype=x.dtype,
209+
chunks=x.chunksize,
210+
path=path,
211+
)
195212
out = blockwise(
196213
identity,
197214
ind,
@@ -200,7 +217,6 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs):
200217
dtype=x.dtype,
201218
align_arrays=False,
202219
target_store=store,
203-
target_path=path,
204220
)
205221
out.compute(executor=executor, _return_in_memory_array=False, **kwargs)
206222

@@ -298,7 +314,7 @@ def blockwise(
298314
spec = check_array_specs(arrays)
299315
buffer_copies = get_buffer_copies(spec)
300316
if target_store is None:
301-
target_store = new_temp_path(name=name, spec=spec)
317+
target_store = context_dir_path(spec=spec)
302318
op = primitive_blockwise(
303319
func,
304320
out_ind,
@@ -452,14 +468,14 @@ def _general_blockwise(
452468
if isinstance(target_stores, list): # multiple outputs
453469
name = [gensym() for _ in range(len(target_stores))]
454470
target_stores = [
455-
ts if ts is not None else new_temp_path(name=n, spec=spec)
456-
for n, ts in zip(name, target_stores)
471+
ts if ts is not None else context_dir_path(spec=spec)
472+
for ts in target_stores
457473
]
458474
target_names = name
459475
else: # single output
460476
name = gensym()
461477
if target_stores is None:
462-
target_stores = [new_temp_path(name=name, spec=spec)]
478+
target_stores = [context_dir_path(spec=spec)]
463479
target_names = [name]
464480

465481
op = primitive_general_blockwise(
@@ -886,18 +902,17 @@ def rechunk(x, chunks, *, target_store=None, min_mem=None, use_new_impl=True):
886902
name = gensym()
887903
spec = x.spec
888904
if target_store is None:
889-
target_store = new_temp_path(name=name, spec=spec)
905+
target_store = context_dir_path(spec=spec)
890906
name_int = f"{name}-int"
891-
temp_store = new_temp_path(name=name_int, spec=spec)
892907
ops = primitive_rechunk(
893908
x._zarray,
894-
source_array_name=name,
909+
source_array_name=x.name,
895910
int_array_name=name_int,
911+
target_array_name=name,
896912
target_chunks=target_chunks,
897913
allowed_mem=spec.allowed_mem,
898914
reserved_mem=spec.reserved_mem,
899915
target_store=target_store,
900-
temp_store=temp_store,
901916
storage_options=spec.storage_options,
902917
)
903918

cubed/core/plan.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -557,17 +557,22 @@ def arrays_to_plan(*arrays):
557557
return plans[0].arrays_to_plan(*arrays)
558558

559559

560+
def context_dir_path(spec=None):
561+
work_dir = spec.work_dir if spec is not None else None
562+
if work_dir is None:
563+
work_dir = tempfile.gettempdir()
564+
context_dir = join_path(work_dir, CONTEXT_ID)
565+
delete_on_exit(context_dir)
566+
return context_dir
567+
568+
560569
def new_temp_path(name, suffix=".zarr", spec=None):
561570
"""Return a string path for a temporary file path, which may be local or remote.
562571
563572
Note that this function does not create the file or any directories (and they
564573
may never be created, if for example the file doesn't need to be materialized).
565574
"""
566-
work_dir = spec.work_dir if spec is not None else None
567-
if work_dir is None:
568-
work_dir = tempfile.gettempdir()
569-
context_dir = join_path(work_dir, CONTEXT_ID)
570-
delete_on_exit(context_dir)
575+
context_dir = context_dir_path(spec)
571576
return join_path(context_dir, f"{name}{suffix}")
572577

573578

cubed/primitive/blockwise.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,15 +365,15 @@ def general_blockwise(
365365
f"All outputs must have matching number of blocks in each dimension. Chunks specified: {chunkss}"
366366
)
367367
ta: Union[zarr.Array, LazyZarrArray]
368-
if isinstance(target_store, zarr.Array):
368+
if isinstance(target_store, (zarr.Array, LazyZarrArray)):
369369
ta = target_store
370370
else:
371371
ta = lazy_zarr_array(
372372
target_store,
373373
shapes[i],
374374
dtype=dtypes[i],
375375
chunks=target_chunks_ or chunksize,
376-
path=target_paths[i] if target_paths is not None else None,
376+
path=target_names[i], # use array name for path within store
377377
storage_options=storage_options,
378378
compressor=compressor,
379379
)

cubed/primitive/rechunk.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ def rechunk(
2424
source: T_ZarrArray,
2525
source_array_name: str,
2626
int_array_name: str,
27+
target_array_name: str,
2728
target_chunks: T_RegularChunks,
2829
allowed_mem: int,
2930
reserved_mem: int,
3031
target_store: T_Store,
31-
temp_store: Optional[T_Store] = None,
3232
storage_options: Optional[Dict[str, Any]] = None,
3333
) -> List[PrimitiveOperation]:
3434
"""Change the chunking of an array, without changing its shape or dtype.
@@ -44,8 +44,6 @@ def rechunk(
4444
The memory reserved on a worker for non-data use when running a task, in bytes
4545
target_store : str
4646
Path to output Zarr store.
47-
temp_store : str, optional
48-
Path to temporary store for intermediate data.
4947
5048
Returns
5149
-------
@@ -61,10 +59,11 @@ def rechunk(
6159

6260
read_proxy, int_proxy, write_proxy = _setup_array_rechunk(
6361
source_array=source,
62+
int_array_name=int_array_name,
63+
target_array_name=target_array_name,
6464
target_chunks=target_chunks,
6565
max_mem=rechunker_max_mem,
6666
target_store=target_store,
67-
temp_store=temp_store,
6867
storage_options=storage_options,
6968
)
7069

@@ -118,10 +117,11 @@ def rechunk(
118117
# from rechunker, but simpler since it only has to handle Zarr arrays
119118
def _setup_array_rechunk(
120119
source_array: T_ZarrArray,
120+
int_array_name: str,
121+
target_array_name: str,
121122
target_chunks: T_RegularChunks,
122123
max_mem: int,
123124
target_store: T_Store,
124-
temp_store: Optional[T_Store] = None,
125125
storage_options: Optional[Dict[str, Any]] = None,
126126
) -> Tuple[CubedArrayProxy, Optional[CubedArrayProxy], CubedArrayProxy]:
127127
shape = source_array.shape
@@ -148,17 +148,21 @@ def _setup_array_rechunk(
148148
shape,
149149
dtype,
150150
chunks=target_chunks,
151+
path=target_array_name, # use array name for path within store
151152
storage_options=storage_options,
152153
)
153154

154155
if read_chunks == write_chunks:
155156
int_array = None
156157
else:
157158
# do intermediate store
158-
if temp_store is None:
159-
raise ValueError("A temporary store location must be provided.")
160159
int_array = lazy_zarr_array(
161-
temp_store, shape, dtype, chunks=int_chunks, storage_options=storage_options
160+
target_store,
161+
shape,
162+
dtype,
163+
chunks=int_chunks,
164+
path=int_array_name, # use array name for path within store
165+
storage_options=storage_options,
162166
)
163167

164168
read_proxy = CubedArrayProxy(source_array, read_chunks)

cubed/tests/primitive/test_blockwise.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def test_blockwise(tmp_path, executor, reserved_mem):
6767

6868
execute_pipeline(op.pipeline, executor=executor)
6969

70-
res = open_backend_array(target_store, mode="r")
70+
res = open_backend_array(target_store, mode="r", path="target")
7171
assert_array_equal(res[:], np.outer([0, 1, 2], [10, 50, 100]))
7272

7373

@@ -132,7 +132,7 @@ def test_blockwise_with_args(tmp_path, executor):
132132

133133
execute_pipeline(op.pipeline, executor=executor)
134134

135-
res = open_backend_array(target_store, mode="r")
135+
res = open_backend_array(target_store, mode="r", path="target")
136136
assert_array_equal(
137137
res[:], np.transpose(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), axes=(1, 0))
138138
)
@@ -225,7 +225,7 @@ def key_function(out_key):
225225

226226
execute_pipeline(op.pipeline, executor=executor)
227227

228-
res = open_backend_array(target_store, mode="r")
228+
res = open_backend_array(target_store, mode="r", path="target")
229229
assert_array_equal(res[:], np.arange(20))
230230

231231

@@ -284,10 +284,10 @@ def block_function(out_key):
284284

285285
input = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
286286

287-
res1 = open_backend_array(target_store1, mode="r")
287+
res1 = open_backend_array(target_store1, mode="r", path="target1")
288288
assert_array_equal(res1[:], np.sqrt(input))
289289

290-
res2 = open_backend_array(target_store2, mode="r")
290+
res2 = open_backend_array(target_store2, mode="r", path="target2")
291291
assert_array_equal(res2[:], -np.sqrt(input))
292292

293293

cubed/tests/primitive/test_rechunk.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,17 +70,16 @@ def test_rechunk(
7070
):
7171
source = zarr.ones(shape, chunks=source_chunks, store=tmp_path / "source.zarr")
7272
target_store = tmp_path / "target.zarr"
73-
temp_store = tmp_path / "temp.zarr"
7473

7574
ops = rechunk(
7675
source,
7776
source_array_name="source-array",
7877
int_array_name="int-array",
78+
target_array_name="target-array",
7979
target_chunks=target_chunks,
8080
allowed_mem=allowed_mem,
8181
reserved_mem=reserved_mem,
8282
target_store=target_store,
83-
temp_store=temp_store,
8483
)
8584

8685
assert len(ops) == len(expected_num_tasks)
@@ -103,7 +102,7 @@ def test_rechunk(
103102
for op in ops:
104103
execute_pipeline(op.pipeline, executor=executor)
105104

106-
res = open_backend_array(target_store, mode="r")
105+
res = open_backend_array(target_store, mode="r", path="target-array")
107106
assert_array_equal(res[:], np.ones(shape))
108107
assert res.chunks == target_chunks
109108

@@ -112,7 +111,6 @@ def test_rechunk_allowed_mem_exceeded(tmp_path):
112111
source = zarr.ones((4, 4), chunks=(2, 2), store=tmp_path / "source.zarr")
113112
allowed_mem = 16
114113
target_store = tmp_path / "target.zarr"
115-
temp_store = tmp_path / "temp.zarr"
116114

117115
# cubed's allowed_mem is reduced by a factor of 4 for rechunker's max_mem from 16 to 4
118116
with pytest.raises(
@@ -122,9 +120,9 @@ def test_rechunk_allowed_mem_exceeded(tmp_path):
122120
source,
123121
source_array_name="source-array",
124122
int_array_name="int-array",
123+
target_array_name="target-array",
125124
target_chunks=(4, 1),
126125
allowed_mem=allowed_mem,
127126
reserved_mem=0,
128127
target_store=target_store,
129-
temp_store=temp_store,
130128
)

0 commit comments

Comments
 (0)