forked from fsspec/kerchunk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
591 lines (515 loc) · 18.6 KB
/
utils.py
File metadata and controls
591 lines (515 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
import base64
import copy
import itertools
import fsspec.asyn
from typing import Any, cast
import warnings
import ujson
import fsspec.implementations.asyn_wrapper
import numpy as np
import zarr.storage
def dict_to_store(store_dict: dict):
"""Create an in memory zarr store backed by the given dictionary"""
return zarr.storage.MemoryStore(read_only=False, store_dict=store_dict)
def refs_as_fs(
refs,
fs=None,
remote_protocol=None,
remote_options=None,
asynchronous=True,
**kwargs,
):
"""Convert a reference set to an fsspec filesystem"""
if fs is not None:
fs.asynchronous = asynchronous
else:
remote_options = remote_options.copy() if remote_options else {}
remote_options["asynchronous"] = asynchronous
fs = fsspec.filesystem(
"reference",
fo=refs,
fs=fs,
remote_protocol=remote_protocol,
remote_options=remote_options,
**kwargs,
asynchronous=asynchronous,
)
return fs
def refs_as_store(
refs, read_only=False, fs=None, remote_protocol=None, remote_options=None
):
"""Convert a reference set to a zarr store"""
remote_options = remote_options or {}
remote_options["asynchronous"] = True
fss = refs_as_fs(
refs,
fs=fs,
remote_protocol=remote_protocol,
remote_options=remote_options,
)
return fs_as_store(fss, read_only=read_only)
def fs_as_store(fs: fsspec.asyn.AsyncFileSystem, read_only=False):
"""Open the refs as a zarr store
Parameters
----------
fs: fsspec.async.AsyncFileSystem
read_only: bool
Returns
-------
zarr.storage.Store or zarr.storage.Mapper, fsspec.AbstractFileSystem
"""
if not fs.async_impl:
try:
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
fs = AsyncFileSystemWrapper(fs)
except ImportError:
raise ImportError(
"Only fsspec>2024.10.0 supports the async filesystem wrapper "
"required for working with reference filesystems. "
)
fs.asynchronous = True
return zarr.storage.FsspecStore(fs, read_only=read_only)
def class_factory(func):
"""Experimental uniform API across function-based file scanners"""
class FunctionWrapper:
__doc__ = func.__doc__
__module__ = func.__module__
def __init__(self, url, storage_options=None, inline_threshold=100, **kwargs):
self.url = url
self.storage_options = storage_options
self.inline = inline_threshold
self.kwargs = kwargs
def translate(self):
return func(
self.url,
inline_threshold=self.inline,
storage_options=self.storage_options,
**self.kwargs,
)
def __str__(self):
return f"<Single file to zarr processor using {func.__module__}.{func.__qualname__}>"
__repr__ = __str__
return FunctionWrapper
def consolidate(refs):
"""Turn raw references into output"""
out = {}
for k, v in refs.items():
if hasattr(v, "to_bytes"):
v = v.to_bytes()
if isinstance(v, bytes):
try:
# easiest way to test if data is ascii
out[k] = v.decode("ascii")
except UnicodeDecodeError:
out[k] = (b"base64:" + base64.b64encode(v)).decode()
else:
out[k] = v
return {"version": 1, "refs": out}
def rename_target(refs, renames):
"""Utility to change URLs in a reference set in a predictable way
For reference sets including templates, this is more easily done by
using template overrides at access time; but rewriting the references
and saving a new file means not having to do that every time.
Parameters
----------
refs: dict
Reference set
renames: dict[str, str]
Mapping from the old URL (including protocol, if this is how they appear
in the original) to new URL
Returns
-------
dict: the altered reference set, which can be saved
"""
fs = refs_as_fs(refs) # to produce normalised refs
refs = fs.references
out = {}
for k, v in refs.items():
if isinstance(v, list) and v[0] in renames:
out[k] = [renames[v[0]]] + v[1:]
else:
out[k] = v
return consolidate(out)
def rename_target_files(
url_in, renames, url_out=None, storage_options_in=None, storage_options_out=None
):
"""Perform URL renames on a reference set - read and write from JSON
Parameters
----------
url_in: str
Original JSON reference set
renames: dict
URL renamings to perform (see ``renate_target``)
url_out: str | None
Where to write to. If None, overwrites original
storage_options_in: dict | None
passed to fsspec for opening url_in
storage_options_out: dict | None
passed to fsspec for opening url_out. If None, storage_options_in is used.
Returns
-------
None
"""
with fsspec.open(url_in, **(storage_options_in or {})) as f:
old = ujson.load(f)
new = rename_target(old, renames)
if url_out is None:
url_out = url_in
if storage_options_out is None:
storage_options_out = storage_options_in
with fsspec.open(url_out, mode="wt", **(storage_options_out or {})) as f:
ujson.dump(new, f)
def _encode_for_JSON(store):
"""Make store JSON encodable"""
for k, v in store.copy().items():
if isinstance(v, list):
continue
else:
try:
# minify JSON
v = ujson.dumps(ujson.loads(v))
except (ValueError, TypeError):
pass
try:
store[k] = v.decode() if isinstance(v, bytes) else v
except UnicodeDecodeError:
store[k] = "base64:" + base64.b64encode(v).decode()
return store
def encode_fill_value(v: Any, dtype: np.dtype, compressor: Any = None) -> Any:
# early out
if v is None:
return v
if dtype.kind == "V" and dtype.hasobject:
if compressor is None:
raise ValueError("missing compressor for object array")
v = compressor.encode(v)
v = str(base64.standard_b64encode(v), "ascii")
return v
if dtype.kind == "f":
if np.isnan(v):
return "NaN"
elif np.isposinf(v):
return "Infinity"
elif np.isneginf(v):
return "-Infinity"
else:
return float(v)
elif dtype.kind in "ui":
return int(v)
elif dtype.kind == "b":
return bool(v)
elif dtype.kind in "c":
c = cast(np.complex128, np.dtype(complex).type())
v = (
encode_fill_value(v.real, c.real.dtype, compressor),
encode_fill_value(v.imag, c.imag.dtype, compressor),
)
return v
elif dtype.kind in "SV":
v = str(base64.standard_b64encode(v), "ascii")
return v
elif dtype.kind == "U":
return v
elif dtype.kind in "mM":
return int(v.view("i8"))
else:
return v
def do_inline(store, threshold, remote_options=None, remote_protocol=None):
"""Replace short chunks with the value of that chunk and inline metadata
The chunk may need encoding with base64 if not ascii, so actual
length may be larger than threshold.
"""
fs = refs_as_fs(
store,
remote_protocol=remote_protocol,
remote_options=remote_options,
asynchronous=False,
)
out = fs.references.copy()
# Inlining is done when one of two conditions are satisfied:
# 1. The item is small enough, i.e. smaller than the threshold specified in the function call
# 2. The item is a metadata file, i.e. a .z* file
get_keys = [
k
for k, v in out.items()
if (isinstance(v, list) and len(v) == 3 and v[2] < threshold)
or (
isinstance(v, list)
and len(v) == 1
and isinstance(v[0], str)
and v[0].split("/")[-1].startswith(".z")
)
]
values = fs.cat(get_keys)
for k, v in values.items():
try:
# easiest way to test if data is ascii
v.decode("ascii")
except UnicodeDecodeError:
v = b"base64:" + base64.b64encode(v)
out[k] = v
return out
def _inline_array(group, threshold, names, prefix=""):
for name, thing in group.members():
if prefix:
prefix1 = f"{prefix}.{name}"
else:
prefix1 = name
if isinstance(thing, zarr.Group):
_inline_array(thing, threshold=threshold, prefix=prefix1, names=names)
else:
cond1 = threshold and thing.nbytes < threshold
cond2 = prefix1 in names
if cond1 or cond2:
original_attrs = dict(thing.attrs)
data = thing[...]
arr = group.create_array(
name=name,
dtype=thing.dtype,
shape=thing.shape,
chunks=thing.shape,
fill_value=thing.fill_value,
overwrite=True,
)
arr[...] = data
arr.attrs.update(original_attrs)
def inline_array(store, threshold=1000, names=None, remote_options=None):
"""Inline whole arrays by threshold or name, replace with a single metadata chunk
Inlining whole arrays results in fewer keys. If the constituent keys were
already inlined, this also results in a smaller file overall. No action is taken
for arrays that are already of one chunk (they should be in
Parameters
----------
store: dict/JSON file
reference set
threshold: int
Size in bytes below which to inline. Set to 0 to prevent inlining by size
names: list[str] | None
It the array name (as a dotted full path) appears in this list, it will
be inlined irrespective of the threshold size. Useful for coordinates.
remote_options: dict | None
Needed to fetch data, if the required keys are not already individually inlined
in the data.
Returns
-------
amended references set (simple style)
"""
fs = refs_as_fs(store, remote_options=remote_options or {})
zarr_store = fs_as_store(fs, read_only=False)
g = zarr.open_group(zarr_store, zarr_format=2)
_inline_array(g, threshold, names=names or [])
return fs.references
def subchunk(store, variable, factor):
"""
Split uncompressed chunks into integer subchunks on the largest axis
Parameters
----------
store: dict
reference set
variable: str
the named zarr variable (give as /-separated path if deep)
factor: int
the number of chunks each input chunk turns into. Must be an exact divisor
of the original largest dimension length.
Returns
-------
modified store
"""
fs = fsspec.filesystem("reference", fo=store)
store = fs.references
meta_file = f"{variable}/.zarray"
meta = ujson.loads(fs.cat(meta_file))
if meta["compressor"] is not None:
raise ValueError("Can only subchunk an uncompressed array")
chunks_orig = meta["chunks"]
chunk_new = []
# plan
multi = None
for ind, this_chunk in enumerate(chunks_orig):
if this_chunk == 1:
chunk_new.append(1)
continue
elif this_chunk % factor == 0:
chunk_new.extend([this_chunk // factor] + chunks_orig[ind + 1 :])
break
elif factor % this_chunk == 0:
# if factor // chunks_orig[0] > 1:
chunk_new.append(1)
if multi is None:
multi = this_chunk
factor //= this_chunk
else:
raise ValueError("Must subchunk by exact integer factor")
if multi:
# TODO: this reloads the referenceFS; *maybe* reuses it
return subchunk(store, variable, multi)
# execute
meta["chunks"] = chunk_new
store = copy.deepcopy(store)
store[meta_file] = ujson.dumps(meta)
for k, v in store.copy().items():
if k.startswith(f"{variable}/"):
kpart = k[len(variable) + 1 :]
if kpart.startswith(".z"):
continue
sep = "." if "." in kpart else "/"
chunk_index = [int(_) for _ in kpart.split(sep)]
if isinstance(v, (str, bytes)):
# TODO: check this early, as some refs might have been edited already
raise ValueError("Refusing to sub-chunk inlined data")
if len(v) > 1:
url, offset, size = v
else:
(url,) = v
offset = 0
size = fs.info(k)["size"]
for subpart in range(factor):
new_index = (
chunk_index[:ind]
+ [chunk_index[ind] * factor + subpart]
+ chunk_index[ind + 1 :]
)
newpart = sep.join(str(_) for _ in new_index)
newv = [url, offset + subpart * size // factor, size // factor]
store[f"{variable}/{newpart}"] = newv
return store
def dereference_archives(references, remote_options=None):
"""Directly point to uncompressed byte ranges in ZIP/TAR archives
If a set of references have been made for files contained within ZIP or
(uncompressed) TAR archives, the "zip://..." and "tar://..." URLs should
be converted to byte ranges in the overall file.
Parameters
----------
references: dict
a simple reference set
remote_options: dict or None
For opening the archives
"""
import zipfile
import tarfile
if "version" in references and references["version"] == 1:
references = references["refs"]
target_files = [l[0] for l in references.values() if isinstance(l, list)]
target_files = {
(t.split("::", 1)[1], t[:3])
for t in target_files
if t.startswith(("tar://", "zip://"))
}
# find all member file offsets in all archives
offsets = {}
for target, tar_or_zip in target_files:
with fsspec.open(target, **(remote_options or {})) as tf:
if tar_or_zip == "tar":
tar = tarfile.TarFile(fileobj=tf)
offsets[target] = {
ti.name: {"offset": ti.offset_data, "size": ti.size, "comp": False}
for ti in tar.getmembers()
if ti.isfile()
}
elif tar_or_zip == "zip":
zf = zipfile.ZipFile(file=tf)
offsets[target] = {}
for zipinfo in zf.filelist:
if zipinfo.is_dir():
continue
# if uncompressed, include only the buffer. In compressed (DEFLATE), include
# also the header, and must use DeflateCodec
if zipinfo.compress_type == zipfile.ZIP_DEFLATED:
# TODO: find relevant .zarray and add filter directly
header = 0
warnings.warn(
"ZIP file contains compressed files, must use DeflateCodec"
)
tail = len(zipinfo.FileHeader())
elif zipinfo.compress_type == zipfile.ZIP_STORED:
header = len(zipinfo.FileHeader())
tail = 0
else:
comp = zipfile.compressor_names[zipinfo.compress_type]
raise ValueError(
f"ZIP compression method not supported: {comp}"
)
offsets[target][zipinfo.filename] = {
"offset": zipinfo.header_offset + header,
"size": zipinfo.compress_size + tail,
"comp": zipinfo.compress_type != zipfile.ZIP_STORED,
}
# modify references
mods = copy.deepcopy(references)
for k, v in mods.items():
if not isinstance(v, list):
continue
target = v[0].split("::", 1)[1]
infile = v[0].split("::", 1)[0][6:] # strip "zip://" or "tar://"
if target not in offsets:
continue
detail = offsets[target][infile]
if detail["comp"]:
# leave compressed member file alone
pass
v[0] = target
if len(v) == 1:
v.append(detail["offset"])
v.append(detail["size"])
else:
v[1] += detail["offset"]
return mods
def _max_prefix(*strings):
# https://stackoverflow.com/a/6719272/3821154
def all_same(x):
return all(x[0] == y for y in x)
char_tuples = zip(*strings)
prefix_tuples = itertools.takewhile(all_same, char_tuples)
return "".join(x[0] for x in prefix_tuples)
def templateize(strings, min_length=10, template_name="u"):
"""Make prefix template for a set of strings
Useful for condensing strings by extracting out a common prefix.
If the common prefix is shorted than ``min_length``, the original
strings are returned and the output templates are empty.
Parameters
----------
strings: List[str]
inputs
min_length: int
Only perform transformm if the common prefix is at least this long.
template_name: str
The placeholder string, should be short.
Returns
-------
templates: Dict[str, str], strings: List[str]
Such that [s.format(**templates) for s in strings] recreates original strings list
"""
prefix = _max_prefix(*strings)
lpref = len(prefix)
if lpref >= min_length:
template = {template_name: prefix}
strings = [("{%s}" % template_name) + s[lpref:] for s in strings]
else:
template = {}
return template, strings
def translate_refs_serializable(refs: dict):
"""Translate a reference set to a serializable form, given that zarr
v3 memory stores store data in buffers by default. This modifies the
input dictionary in place, and returns a reference to it.
It also fixes keys that have a leading slash, which is not appropriate for
zarr v3 keys
Parameters
----------
refs: dict
The reference set
Returns
-------
dict
A serializable form of the reference set
"""
keys_to_remove = []
new_keys = {}
for k, v in refs.items():
if isinstance(v, zarr.core.buffer.cpu.Buffer):
key = k.removeprefix("/")
new_keys[key] = v.to_bytes()
keys_to_remove.append(k)
for k in keys_to_remove:
del refs[k]
refs.update(new_keys)
return refs