-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy patharray.py
More file actions
286 lines (230 loc) · 10.1 KB
/
array.py
File metadata and controls
286 lines (230 loc) · 10.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import warnings
from typing import Any, Callable, Union
import numpy as np
from zarr.core.metadata.v3 import ArrayV3Metadata, RegularChunkGrid
from virtualizarr.manifests.array_api import (
MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS,
_isnan,
)
from virtualizarr.manifests.manifest import ChunkManifest
class ManifestArray:
"""
Virtualized array representation of the chunk data in a single Zarr Array.
Supports concatenation / stacking, but only if the two arrays to be concatenated have the same codecs.
Cannot be directly altered.
Implements subset of the array API standard such that it can be wrapped by xarray.
Doesn't store the zarr array name, zattrs or ARRAY_DIMENSIONS, as instead those can be stored on a wrapping xarray object.
"""
_manifest: ChunkManifest
_metadata: ArrayV3Metadata
def __init__(
self,
metadata: ArrayV3Metadata | dict,
chunkmanifest: dict | ChunkManifest,
) -> None:
"""
Create a ManifestArray directly from the .zarray information of a zarr array and the manifest of chunks.
Parameters
----------
metadata : dict or ArrayV3Metadata
chunkmanifest : dict or ChunkManifest
"""
if isinstance(metadata, ArrayV3Metadata):
_metadata = metadata
else:
# try unpacking the dict
_metadata = ArrayV3Metadata(**metadata)
if isinstance(chunkmanifest, ChunkManifest):
_chunkmanifest = chunkmanifest
elif isinstance(chunkmanifest, dict):
_chunkmanifest = ChunkManifest(entries=chunkmanifest)
else:
raise TypeError(
f"chunkmanifest arg must be of type ChunkManifest or dict, but got type {type(chunkmanifest)}"
)
# TODO check that the metadata shape and chunkmanifest shape are consistent with one another
# TODO also cover the special case of scalar arrays
self._metadata = _metadata
self._manifest = _chunkmanifest
@property
def manifest(self) -> ChunkManifest:
return self._manifest
@property
def metadata(self) -> ArrayV3Metadata:
return self._metadata
@property
def chunks(self) -> tuple[int, ...]:
"""
Individual chunk size by number of elements.
"""
if isinstance(self._metadata.chunk_grid, RegularChunkGrid):
return self._metadata.chunk_grid.chunk_shape
else:
raise NotImplementedError(
"Only RegularChunkGrid is currently supported for chunk size"
)
@property
def dtype(self) -> np.dtype:
dtype_str = self.metadata.data_type
return dtype_str.to_numpy()
@property
def shape(self) -> tuple[int, ...]:
"""
Array shape by number of elements along each dimension.
"""
return tuple(int(length) for length in list(self.metadata.shape))
@property
def ndim(self) -> int:
return len(self.shape)
@property
def size(self) -> int:
return int(np.prod(self.shape))
def __repr__(self) -> str:
return f"ManifestArray<shape={self.shape}, dtype={self.dtype}, chunks={self.chunks}>"
@property
def nbytes_virtual(self) -> int:
"""
Size required to hold these references in memory in bytes.
Note this is not the size of the referenced array if it were actually loaded into memory,
this is only the size of the pointers to the chunk locations.
If you were to load the data into memory it would be ~1e6x larger for 1MB chunks.
"""
# note: we don't name this method `.nbytes` as we don't want xarray's repr to use it
return self.manifest.nbytes
def __array_function__(self, func, types, args, kwargs) -> Any:
"""
Hook to teach this class what to do if np.concat etc. is called on it.
Use this instead of __array_namespace__ so that we don't make promises we can't keep.
"""
if func not in MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS:
return NotImplemented
# Note: this allows subclasses that don't override
# __array_function__ to handle ManifestArray objects
if not all(issubclass(t, ManifestArray) for t in types):
return NotImplemented
return MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS[func](*args, **kwargs)
# Everything beyond here is basically just to make this array class wrappable by xarray #
def __array_ufunc__(self, ufunc, method, *inputs, **kwargs) -> Any:
"""We have to define this in order to convince xarray that this class is a duckarray, even though we will never support ufuncs."""
if ufunc == np.isnan:
return _isnan(self.shape)
return NotImplemented
def __array__(self, dtype: np.typing.DTypeLike = None) -> np.ndarray:
raise NotImplementedError(
"ManifestArrays can't be converted into numpy arrays or pandas Index objects"
)
def __eq__( # type: ignore[override]
self,
other: Union[int, float, bool, np.ndarray, "ManifestArray"],
) -> np.ndarray:
"""
Element-wise equality checking.
Returns a numpy array of booleans.
"""
if isinstance(other, (int, float, bool, np.ndarray)):
# TODO what should this do when comparing against numpy arrays?
return np.full(shape=self.shape, fill_value=False, dtype=np.dtype(bool))
elif not isinstance(other, ManifestArray):
raise TypeError(
f"Cannot check equality between a ManifestArray and an object of type {type(other)}"
)
if self.shape != other.shape:
raise NotImplementedError("Unsure how to handle broadcasting like this")
if self.metadata != other.metadata:
return np.full(shape=self.shape, fill_value=False, dtype=np.dtype(bool))
else:
if self.manifest == other.manifest:
return np.full(shape=self.shape, fill_value=True, dtype=np.dtype(bool))
else:
# TODO this doesn't yet do what it should - it simply returns all False if any of the chunk entries are different.
# What it should do is return True for the locations where the chunk entries are the same.
warnings.warn(
"__eq__ currently is over-cautious, returning an array of all False if any of the chunk entries don't match.",
UserWarning,
)
# do chunk-wise comparison
equal_chunk_paths = self.manifest._paths == other.manifest._paths
equal_chunk_offsets = self.manifest._offsets == other.manifest._offsets
equal_chunk_lengths = self.manifest._lengths == other.manifest._lengths
equal_chunks = (
equal_chunk_paths & equal_chunk_offsets & equal_chunk_lengths
)
if not equal_chunks.all():
# TODO expand chunk-wise comparison into an element-wise result instead of just returning all False
return np.full(
shape=self.shape, fill_value=False, dtype=np.dtype(bool)
)
else:
raise RuntimeWarning("Should not be possible to get here")
def astype(self, dtype: np.dtype, /, *, copy: bool = True) -> "ManifestArray":
"""Cannot change the dtype, but needed because xarray will call this even when it's a no-op."""
if dtype != self.dtype:
raise NotImplementedError()
else:
return self
def __getitem__(
self,
key,
/,
) -> "ManifestArray":
"""
Only supports extremely limited indexing.
Only here because xarray will apparently attempt to index into its lazy indexing classes even if the operation would be a no-op anyway.
"""
from xarray.core.indexing import BasicIndexer
if isinstance(key, BasicIndexer):
indexer = key.tuple
else:
indexer = key
indexer = _possibly_expand_trailing_ellipsis(key, self.ndim)
if len(indexer) != self.ndim:
raise ValueError(
f"Invalid indexer for array with ndim={self.ndim}: {indexer}"
)
if all(
isinstance(axis_indexer, slice) and axis_indexer == slice(None)
for axis_indexer in indexer
):
# indexer is all slice(None)'s, so this is a no-op
return self
else:
raise NotImplementedError(f"Doesn't support slicing with {indexer}")
def rename_paths(
self,
new: str | Callable[[str], str],
) -> "ManifestArray":
"""
Rename paths to chunks in this array's manifest.
Accepts either a string, in which case this new path will be used for all chunks, or
a function which accepts the old path and returns the new path.
Parameters
----------
new
New path to use for all chunks, either as a string, or as a function which accepts and returns strings.
Returns
-------
ManifestArray
Examples
--------
Rename paths to reflect moving the referenced files from local storage to an S3 bucket.
>>> def local_to_s3_url(old_local_path: str) -> str:
... from pathlib import Path
...
... new_s3_bucket_url = "http://s3.amazonaws.com/my_bucket/"
...
... filename = Path(old_local_path).name
... return str(new_s3_bucket_url / filename)
>>> marr.rename_paths(local_to_s3_url)
See Also
--------
ChunkManifest.rename_paths
"""
renamed_manifest = self.manifest.rename_paths(new)
return ManifestArray(metadata=self.metadata, chunkmanifest=renamed_manifest)
def _possibly_expand_trailing_ellipsis(key, ndim: int):
if key[-1] == ...:
extra_slices_needed = ndim - (len(key) - 1)
*indexer, ellipsis = key
return tuple(tuple(indexer) + (slice(None),) * extra_slices_needed)
else:
return key