Skip to content

Commit c58bca4

Browse files
authored
Merge pull request #630 from martindurant/fsspec_w
Fsspec writing
2 parents 480d21a + 2198438 commit c58bca4

File tree

5 files changed

+124
-38
lines changed

5 files changed

+124
-38
lines changed

docs/tutorial.rst

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -854,13 +854,44 @@ please raise an issue on the GitHub issue tracker with any profiling data you
854854
can provide, as there may be opportunities to optimise further either within
855855
Zarr or within the mapping interface to the storage.
856856

857+
IO with ``fsspec``
858+
~~~~~~~~~~~~~~~~~~
859+
860+
As of version 2.5, zarr supports passing URLs directly to `fsspec`_,
861+
and having it create the "mapping" instance automatically. This means, that
862+
for all of the backend storage implementations `supported by fsspec`_,
863+
you can skip importing and configuring the storage explicitly.
864+
For example::
865+
866+
>>> g = zarr.open_group("s3://zarr-demo/store", storage_options={'anon': True}) # doctest: +SKIP
867+
>>> g['foo/bar/baz'][:].tobytes() # doctest: +SKIP
868+
b'Hello from the cloud!'
869+
870+
The provision of the protocol specifier "s3://" will select the correct backend.
871+
Notice the kwargs ``storage_options``, used to pass parameters to that backend.
872+
873+
As of version 2.6, write mode and complex URLs are also supported, such as::
874+
875+
>>> g = zarr.open_group("simplecache::s3://zarr-demo/store",
876+
... storage_options={"s3": {'anon': True}}) # doctest: +SKIP
877+
>>> g['foo/bar/baz'][:].tobytes() # downloads target file # doctest: +SKIP
878+
b'Hello from the cloud!'
879+
>>> g['foo/bar/baz'][:].tobytes() # uses cached file # doctest: +SKIP
880+
b'Hello from the cloud!'
881+
882+
The second invocation here will be much faster. Note that the ``storage_options``
883+
have become more complex here, to account for the two parts of the supplied
884+
URL.
885+
886+
.. _fsspec: https://filesystem-spec.readthedocs.io/en/latest/
887+
888+
.. _supported by fsspec: https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
889+
857890
.. _tutorial_copy:
858891

859892
Consolidating metadata
860893
~~~~~~~~~~~~~~~~~~~~~~
861894

862-
(This is an experimental feature.)
863-
864895
Since there is a significant overhead for every connection to a cloud object
865896
store such as S3, the pattern described in the previous section may incur
866897
significant latency while scanning the metadata of the array hierarchy, even

zarr/convenience.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,12 +1165,12 @@ def open_consolidated(store, metadata_key='.zmetadata', mode='r+', **kwargs):
11651165
from .storage import ConsolidatedMetadataStore
11661166

11671167
# normalize parameters
1168-
store = normalize_store_arg(store)
1168+
store = normalize_store_arg(store, storage_options=kwargs.get("storage_options", None))
11691169
if mode not in {'r', 'r+'}:
11701170
raise ValueError("invalid mode, expected either 'r' or 'r+'; found {!r}"
11711171
.format(mode))
11721172

1173-
# setup metadata sotre
1173+
# setup metadata store
11741174
meta_store = ConsolidatedMetadataStore(store, metadata_key=metadata_key)
11751175

11761176
# pass through

zarr/core.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,25 +1535,48 @@ def _set_selection(self, indexer, value, fields=None):
15351535
check_array_shape('value', value, sel_shape)
15361536

15371537
# iterate over chunks in range
1538-
for chunk_coords, chunk_selection, out_selection in indexer:
1538+
if not hasattr(self.store, "setitems") or self._synchronizer is not None:
1539+
# iterative approach
1540+
for chunk_coords, chunk_selection, out_selection in indexer:
15391541

1540-
# extract data to store
1541-
if sel_shape == ():
1542-
chunk_value = value
1543-
elif is_scalar(value, self._dtype):
1544-
chunk_value = value
1545-
else:
1546-
chunk_value = value[out_selection]
1547-
# handle missing singleton dimensions
1548-
if indexer.drop_axes:
1549-
item = [slice(None)] * self.ndim
1550-
for a in indexer.drop_axes:
1551-
item[a] = np.newaxis
1552-
item = tuple(item)
1553-
chunk_value = chunk_value[item]
1554-
1555-
# put data
1556-
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
1542+
# extract data to store
1543+
if sel_shape == ():
1544+
chunk_value = value
1545+
elif is_scalar(value, self._dtype):
1546+
chunk_value = value
1547+
else:
1548+
chunk_value = value[out_selection]
1549+
# handle missing singleton dimensions
1550+
if indexer.drop_axes:
1551+
item = [slice(None)] * self.ndim
1552+
for a in indexer.drop_axes:
1553+
item[a] = np.newaxis
1554+
item = tuple(item)
1555+
chunk_value = chunk_value[item]
1556+
1557+
# put data
1558+
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
1559+
else:
1560+
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
1561+
chunk_values = []
1562+
for out_selection in lout_selection:
1563+
if sel_shape == ():
1564+
chunk_values.append(value)
1565+
elif is_scalar(value, self._dtype):
1566+
chunk_values.append(value)
1567+
else:
1568+
cv = value[out_selection]
1569+
# handle missing singleton dimensions
1570+
if indexer.drop_axes: # pragma: no cover
1571+
item = [slice(None)] * self.ndim
1572+
for a in indexer.drop_axes:
1573+
item[a] = np.newaxis
1574+
item = tuple(item)
1575+
cv = chunk_value[item]
1576+
chunk_values.append(cv)
1577+
1578+
self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values,
1579+
fields=fields)
15571580

15581581
def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
15591582
out_is_ndarray, fields, out_selection):
@@ -1677,6 +1700,12 @@ def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
16771700
fill_value = self._fill_value
16781701
out[out_select] = fill_value
16791702

1703+
def _chunk_setitems(self, lchunk_coords, lchunk_selection, values, fields=None):
1704+
ckeys = [self._chunk_key(co) for co in lchunk_coords]
1705+
cdatas = [self._process_for_setitem(key, sel, val, fields=fields)
1706+
for key, sel, val in zip(ckeys, lchunk_selection, values)]
1707+
self.chunk_store.setitems({k: v for k, v in zip(ckeys, cdatas)})
1708+
16801709
def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
16811710
"""Replace part or whole of a chunk.
16821711
@@ -1704,10 +1733,12 @@ def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
17041733
fields=fields)
17051734

17061735
def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=None):
1707-
1708-
# obtain key for chunk storage
17091736
ckey = self._chunk_key(chunk_coords)
1737+
cdata = self._process_for_setitem(ckey, chunk_selection, value, fields=fields)
1738+
# store
1739+
self.chunk_store[ckey] = cdata
17101740

1741+
def _process_for_setitem(self, ckey, chunk_selection, value, fields=None):
17111742
if is_total_slice(chunk_selection, self._chunks) and not fields:
17121743
# totally replace chunk
17131744

@@ -1762,10 +1793,7 @@ def _chunk_setitem_nosync(self, chunk_coords, chunk_selection, value, fields=Non
17621793
chunk[chunk_selection] = value
17631794

17641795
# encode chunk
1765-
cdata = self._encode_chunk(chunk)
1766-
1767-
# store
1768-
self.chunk_store[ckey] = cdata
1796+
return self._encode_chunk(chunk)
17691797

17701798
def _chunk_key(self, chunk_coords):
17711799
return self._key_prefix + '.'.join(map(str, chunk_coords))

zarr/storage.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,14 +1021,14 @@ def __init__(self, url, normalize_keys=True, key_separator='.',
10211021
exceptions=(KeyError, PermissionError, IOError),
10221022
**storage_options):
10231023
import fsspec
1024-
self.path = url
10251024
self.normalize_keys = normalize_keys
10261025
self.key_separator = key_separator
10271026
self.map = fsspec.get_mapper(url, **storage_options)
10281027
self.fs = self.map.fs # for direct operations
1028+
self.path = self.fs._strip_protocol(url)
10291029
self.mode = mode
10301030
self.exceptions = exceptions
1031-
if self.fs.exists(url) and not self.fs.isdir(url):
1031+
if self.fs.exists(self.path) and not self.fs.isdir(self.path):
10321032
raise FSPathExistNotDir(url)
10331033

10341034
def _normalize_key(self, key):
@@ -1049,16 +1049,22 @@ def __getitem__(self, key):
10491049
except self.exceptions as e:
10501050
raise KeyError(key) from e
10511051

1052+
def setitems(self, values):
1053+
if self.mode == 'r':
1054+
raise ReadOnlyError()
1055+
values = {self._normalize_key(key): val for key, val in values.items()}
1056+
self.map.setitems(values)
1057+
10521058
def __setitem__(self, key, value):
10531059
if self.mode == 'r':
10541060
raise ReadOnlyError()
10551061
key = self._normalize_key(key)
10561062
path = self.dir_path(key)
1057-
value = ensure_contiguous_ndarray(value)
10581063
try:
10591064
if self.fs.isdir(path):
10601065
self.fs.rm(path, recursive=True)
10611066
self.map[key] = value
1067+
self.fs.invalidate_cache(self.fs._parent(path))
10621068
except self.exceptions as e:
10631069
raise KeyError(key) from e
10641070

zarr/tests/test_storage.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -966,17 +966,37 @@ def test_s3_complex(self):
966966
fill_value=-1, chunks=(1, 1, 1))
967967
expected[0] = 0
968968
expected[3] = 3
969+
expected[6, 6, 6] = 6
970+
a[6, 6, 6] = 6
969971
a[:4] = expected[:4]
970972

971-
a = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
973+
b = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
972974
dtype=[('foo', 'S3'), ('bar', 'i4')],
973975
fill_value=(b"b", 1))
974-
a[:4] = (b"aaa", 2)
975-
g = zarr.open_group("s3://test/out.zarr", mode='r',
976-
storage_options=self.s3so)
976+
b[:4] = (b"aaa", 2)
977+
g2 = zarr.open_group("s3://test/out.zarr", mode='r',
978+
storage_options=self.s3so)
979+
980+
assert (g2.data[:] == expected).all()
981+
982+
a[:] = 5 # write with scalar
983+
assert (a[:] == 5).all()
984+
985+
assert g2.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4
986+
with pytest.raises(PermissionError):
987+
g2.data[:] = 5
988+
989+
with pytest.raises(PermissionError):
990+
g2.store.setitems({})
991+
992+
with pytest.raises(PermissionError):
993+
# even though overwrite=True, store is read-only, so fails
994+
g2.create_dataset("data", shape=(8, 8, 8), mode='w',
995+
fill_value=-1, chunks=(1, 1, 1), overwrite=True)
977996

978-
assert (g.data[:] == expected).all()
979-
assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4
997+
a = g.create_dataset("data", shape=(8, 8, 8), mode='w',
998+
fill_value=-1, chunks=(1, 1, 1), overwrite=True)
999+
assert (a[:] == -np.ones((8, 8, 8))).all()
9801000

9811001

9821002
@pytest.fixture()
@@ -997,7 +1017,8 @@ def s3(request):
9971017

9981018
port = 5555
9991019
endpoint_uri = 'http://127.0.0.1:%s/' % port
1000-
proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port))
1020+
proc = subprocess.Popen(shlex.split("moto_server s3 -p %s" % port),
1021+
stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL)
10011022

10021023
timeout = 5
10031024
while timeout > 0:

0 commit comments

Comments
 (0)