Skip to content

Commit 610db34

Browse files
authored
Merge pull request #606 from martindurant/fs_multiget
use getitems with FSStore for concurrent reading
2 parents e3cdd1a + 82d33d7 commit 610db34

File tree

5 files changed

+125
-46
lines changed

5 files changed

+125
-46
lines changed

docs/release.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ Next release
3131
documentation.
3232
By :user:`Josh Moore <joshmoore>`; :issue:`571`.
3333

34+
* Added support for generic URL opening by ``fsspec``, where the URLs have the
35+
form "protocol://[server]/path" or can be chained URls with "::" separators.
36+
The additional argument ``storage_options`` is passed to the backend, see
37+
the ``fsspec`` docs.
38+
By :user:`Martin Durant <martindurant>`; :issue:`546`
39+
40+
* Added support for fetching multiple items via ``getitems`` method of a
41+
store, if it exists. This allows for concurrent fetching of data blocks
42+
from stores that implement this; presently HTTP, S3, GCS. Currently only
43+
applies to reading.
44+
By :user:`Martin Durant <martindurant>`; :issue:`606`
3445

3546
.. _release_2.4.0:
3647

requirements_dev_optional.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ pytest-cov==2.7.1
1818
pytest-doctestplus==0.4.0
1919
pytest-remotedata==0.3.2
2020
h5py==2.10.0
21-
s3fs==0.5.0; python_version > '3.6'
21+
s3fs==0.5.1; python_version > '3.6'
22+
fsspec==0.8.3; python_version > '3.6'
2223
moto>=1.3.14; python_version > '3.6'
2324
flask

zarr/core.py

Lines changed: 85 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,11 +1020,18 @@ def _get_selection(self, indexer, out=None, fields=None):
10201020
check_array_shape('out', out, out_shape)
10211021

10221022
# iterate over chunks
1023-
for chunk_coords, chunk_selection, out_selection in indexer:
1023+
if not hasattr(self.chunk_store, "getitems"):
1024+
# sequentially get one key at a time from storage
1025+
for chunk_coords, chunk_selection, out_selection in indexer:
10241026

1025-
# load chunk selection into output array
1026-
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1027-
drop_axes=indexer.drop_axes, fields=fields)
1027+
# load chunk selection into output array
1028+
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
1029+
drop_axes=indexer.drop_axes, fields=fields)
1030+
else:
1031+
# allow storage to get multiple items at once
1032+
lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
1033+
self._chunk_getitems(lchunk_coords, lchunk_selection, out, lout_selection,
1034+
drop_axes=indexer.drop_axes, fields=fields)
10281035

10291036
if out.shape:
10301037
return out
@@ -1548,6 +1555,52 @@ def _set_selection(self, indexer, value, fields=None):
15481555
# put data
15491556
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
15501557

1558+
def _process_chunk(self, out, cdata, chunk_selection, drop_axes,
1559+
out_is_ndarray, fields, out_selection):
1560+
"""Take binary data from storage and fill output array"""
1561+
if (out_is_ndarray and
1562+
not fields and
1563+
is_contiguous_selection(out_selection) and
1564+
is_total_slice(chunk_selection, self._chunks) and
1565+
not self._filters and
1566+
self._dtype != object):
1567+
1568+
dest = out[out_selection]
1569+
write_direct = (
1570+
dest.flags.writeable and
1571+
(
1572+
(self._order == 'C' and dest.flags.c_contiguous) or
1573+
(self._order == 'F' and dest.flags.f_contiguous)
1574+
)
1575+
)
1576+
1577+
if write_direct:
1578+
1579+
# optimization: we want the whole chunk, and the destination is
1580+
# contiguous, so we can decompress directly from the chunk
1581+
# into the destination array
1582+
1583+
if self._compressor:
1584+
self._compressor.decode(cdata, dest)
1585+
else:
1586+
chunk = ensure_ndarray(cdata).view(self._dtype)
1587+
chunk = chunk.reshape(self._chunks, order=self._order)
1588+
np.copyto(dest, chunk)
1589+
return
1590+
1591+
# decode chunk
1592+
chunk = self._decode_chunk(cdata)
1593+
1594+
# select data from chunk
1595+
if fields:
1596+
chunk = chunk[fields]
1597+
tmp = chunk[chunk_selection]
1598+
if drop_axes:
1599+
tmp = np.squeeze(tmp, axis=drop_axes)
1600+
1601+
# store selected data in output
1602+
out[out_selection] = tmp
1603+
15511604
def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
15521605
drop_axes=None, fields=None):
15531606
"""Obtain part or whole of a chunk.
@@ -1568,15 +1621,14 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
15681621
TODO
15691622
15701623
"""
1571-
1572-
assert len(chunk_coords) == len(self._cdata_shape)
1573-
15741624
out_is_ndarray = True
15751625
try:
15761626
out = ensure_ndarray(out)
15771627
except TypeError:
15781628
out_is_ndarray = False
15791629

1630+
assert len(chunk_coords) == len(self._cdata_shape)
1631+
15801632
# obtain key for chunk
15811633
ckey = self._chunk_key(chunk_coords)
15821634

@@ -1594,48 +1646,36 @@ def _chunk_getitem(self, chunk_coords, chunk_selection, out, out_selection,
15941646
out[out_selection] = fill_value
15951647

15961648
else:
1649+
self._process_chunk(out, cdata, chunk_selection, drop_axes,
1650+
out_is_ndarray, fields, out_selection)
15971651

1598-
if (out_is_ndarray and
1599-
not fields and
1600-
is_contiguous_selection(out_selection) and
1601-
is_total_slice(chunk_selection, self._chunks) and
1602-
not self._filters and
1603-
self._dtype != object):
1604-
1605-
dest = out[out_selection]
1606-
write_direct = (
1607-
dest.flags.writeable and (
1608-
(self._order == 'C' and dest.flags.c_contiguous) or
1609-
(self._order == 'F' and dest.flags.f_contiguous)
1610-
)
1611-
)
1612-
1613-
if write_direct:
1652+
def _chunk_getitems(self, lchunk_coords, lchunk_selection, out, lout_selection,
1653+
drop_axes=None, fields=None):
1654+
"""As _chunk_getitem, but for lists of chunks
16141655
1615-
# optimization: we want the whole chunk, and the destination is
1616-
# contiguous, so we can decompress directly from the chunk
1617-
# into the destination array
1656+
This gets called where the storage supports ``getitems``, so that
1657+
it can decide how to fetch the keys, allowing concurrency.
1658+
"""
1659+
out_is_ndarray = True
1660+
try:
1661+
out = ensure_ndarray(out)
1662+
except TypeError: # pragma: no cover
1663+
out_is_ndarray = False
16181664

1619-
if self._compressor:
1620-
self._compressor.decode(cdata, dest)
1665+
ckeys = [self._chunk_key(ch) for ch in lchunk_coords]
1666+
cdatas = self.chunk_store.getitems(ckeys)
1667+
for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
1668+
if ckey in cdatas:
1669+
self._process_chunk(out, cdatas[ckey], chunk_select, drop_axes,
1670+
out_is_ndarray, fields, out_select)
1671+
else:
1672+
# check exception type
1673+
if self._fill_value is not None:
1674+
if fields:
1675+
fill_value = self._fill_value[fields]
16211676
else:
1622-
chunk = ensure_ndarray(cdata).view(self._dtype)
1623-
chunk = chunk.reshape(self._chunks, order=self._order)
1624-
np.copyto(dest, chunk)
1625-
return
1626-
1627-
# decode chunk
1628-
chunk = self._decode_chunk(cdata)
1629-
1630-
# select data from chunk
1631-
if fields:
1632-
chunk = chunk[fields]
1633-
tmp = chunk[chunk_selection]
1634-
if drop_axes:
1635-
tmp = np.squeeze(tmp, axis=drop_axes)
1636-
1637-
# store selected data in output
1638-
out[out_selection] = tmp
1677+
fill_value = self._fill_value
1678+
out[out_select] = fill_value
16391679

16401680
def _chunk_setitem(self, chunk_coords, chunk_selection, value, fields=None):
16411681
"""Replace part or whole of a chunk.

zarr/storage.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1038,6 +1038,10 @@ def _normalize_key(self, key):
10381038
key = '/'.join(bits + [end.replace('.', self.key_separator)])
10391039
return key.lower() if self.normalize_keys else key
10401040

1041+
def getitems(self, keys):
1042+
keys = [self._normalize_key(key) for key in keys]
1043+
return self.map.getitems(keys, on_error="omit")
1044+
10411045
def __getitem__(self, key):
10421046
key = self._normalize_key(key)
10431047
try:

zarr/tests/test_storage.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,29 @@ def test_s3(self):
955955

956956
assert g.data[:].tolist() == [0, 1, 2, 3, 0, 0, 0, 0]
957957

958+
@pytest.mark.usefixtures("s3")
959+
def test_s3_complex(self):
960+
import zarr
961+
g = zarr.open_group("s3://test/out.zarr", mode='w',
962+
storage_options=self.s3so)
963+
expected = np.empty((8, 8, 8), dtype='int64')
964+
expected[:] = -1
965+
a = g.create_dataset("data", shape=(8, 8, 8),
966+
fill_value=-1, chunks=(1, 1, 1))
967+
expected[0] = 0
968+
expected[3] = 3
969+
a[:4] = expected[:4]
970+
971+
a = g.create_dataset("data_f", shape=(8, ), chunks=(1,),
972+
dtype=[('foo', 'S3'), ('bar', 'i4')],
973+
fill_value=(b"b", 1))
974+
a[:4] = (b"aaa", 2)
975+
g = zarr.open_group("s3://test/out.zarr", mode='r',
976+
storage_options=self.s3so)
977+
978+
assert (g.data[:] == expected).all()
979+
assert g.data_f['foo'].tolist() == [b"aaa"] * 4 + [b"b"] * 4
980+
958981

959982
@pytest.fixture()
960983
def s3(request):

0 commit comments

Comments
 (0)