Skip to content

Commit bb6b905

Browse files
authored
Merge pull request #546 from martindurant/fsspec
Add FSStore
2 parents 3ee5a5a + 7ad67e1 commit bb6b905

File tree

7 files changed

+313
-13
lines changed

7 files changed

+313
-13
lines changed

docs/api/storage.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ Storage (``zarr.storage``)
3535

3636
.. autoclass:: ABSStore
3737

38+
.. autoclass:: FSStore
39+
3840
.. autoclass:: ConsolidatedMetadataStore
3941

4042
.. autofunction:: init_array

requirements_dev_optional.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,6 @@ pytest-cov==2.7.1
1818
pytest-doctestplus==0.4.0
1919
pytest-remotedata==0.3.2
2020
h5py==2.10.0
21-
s3fs==0.3.4; python_version > '3.0'
21+
s3fs==0.5.0; python_version > '3.6'
22+
moto>=1.3.14; python_version > '3.6'
23+
flask

zarr/creation.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from zarr.n5 import N5Store
1111
from zarr.storage import (DirectoryStore, ZipStore, contains_array,
1212
contains_group, default_compressor, init_array,
13-
normalize_storage_path)
13+
normalize_storage_path, FSStore)
1414

1515

1616
def create(shape, chunks=True, dtype=None, compressor='default',
@@ -127,12 +127,16 @@ def create(shape, chunks=True, dtype=None, compressor='default',
127127
return z
128128

129129

130-
def normalize_store_arg(store, clobber=False, default=dict):
130+
def normalize_store_arg(store, clobber=False, default=dict, storage_options=None):
131131
if store is None:
132132
return default()
133133
elif isinstance(store, str):
134+
mode = 'w' if clobber else 'r'
135+
if "://" in store or "::" in store:
136+
return FSStore(store, mode=mode, **(storage_options or {}))
137+
elif storage_options:
138+
raise ValueError("storage_options passed with non-fsspec path")
134139
if store.endswith('.zip'):
135-
mode = 'w' if clobber else 'r'
136140
return ZipStore(store, mode=mode)
137141
elif store.endswith('.n5'):
138142
return N5Store(store)
@@ -353,7 +357,8 @@ def array(data, **kwargs):
353357
def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
354358
compressor='default', fill_value=0, order='C', synchronizer=None,
355359
filters=None, cache_metadata=True, cache_attrs=True, path=None,
356-
object_codec=None, chunk_store=None, **kwargs):
360+
object_codec=None, chunk_store=None, storage_options=None,
361+
**kwargs):
357362
"""Open an array using file-mode-like semantics.
358363
359364
Parameters
@@ -399,6 +404,9 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
399404
A codec to encode object arrays, only needed if dtype=object.
400405
chunk_store : MutableMapping or string, optional
401406
Store or path to directory in file system or name of zip file.
407+
storage_options : dict
408+
If using an fsspec URL to create the store, these will be passed to
409+
the backend implementation. Ignored otherwise.
402410
403411
Returns
404412
-------
@@ -435,9 +443,10 @@ def open_array(store=None, mode='a', shape=None, chunks=True, dtype=None,
435443

436444
# handle polymorphic store arg
437445
clobber = mode == 'w'
438-
store = normalize_store_arg(store, clobber=clobber)
446+
store = normalize_store_arg(store, clobber=clobber, storage_options=storage_options)
439447
if chunk_store is not None:
440-
chunk_store = normalize_store_arg(chunk_store, clobber=clobber)
448+
chunk_store = normalize_store_arg(chunk_store, clobber=clobber,
449+
storage_options=storage_options)
441450
path = normalize_storage_path(path)
442451

443452
# API compatibility with h5py

zarr/hierarchy.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,8 +1032,9 @@ def move(self, source, dest):
10321032
self._write_op(self._move_nosync, source, dest)
10331033

10341034

1035-
def _normalize_store_arg(store, clobber=False):
1036-
return normalize_store_arg(store, clobber=clobber, default=MemoryStore)
1035+
def _normalize_store_arg(store, clobber=False, storage_options=None):
1036+
return normalize_store_arg(store, clobber=clobber, default=MemoryStore,
1037+
storage_options=storage_options)
10371038

10381039

10391040
def group(store=None, overwrite=False, chunk_store=None,
@@ -1095,7 +1096,7 @@ def group(store=None, overwrite=False, chunk_store=None,
10951096

10961097

10971098
def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=None,
1098-
chunk_store=None):
1099+
chunk_store=None, storage_options=None):
10991100
"""Open a group using file-mode-like semantics.
11001101
11011102
Parameters
@@ -1117,6 +1118,9 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N
11171118
Group path within store.
11181119
chunk_store : MutableMapping or string, optional
11191120
Store or path to directory in file system or name of zip file.
1121+
storage_options : dict
1122+
If using an fsspec URL to create the store, these will be passed to
1123+
the backend implementation. Ignored otherwise.
11201124
11211125
Returns
11221126
-------
@@ -1139,9 +1143,11 @@ def open_group(store=None, mode='a', cache_attrs=True, synchronizer=None, path=N
11391143
"""
11401144

11411145
# handle polymorphic store arg
1142-
store = _normalize_store_arg(store)
1146+
clobber = mode != 'r'
1147+
store = _normalize_store_arg(store, clobber=clobber, storage_options=storage_options)
11431148
if chunk_store is not None:
1144-
chunk_store = _normalize_store_arg(chunk_store)
1149+
chunk_store = _normalize_store_arg(chunk_store, clobber=clobber,
1150+
storage_options=storage_options)
11451151
path = normalize_storage_path(path)
11461152

11471153
# ensure store is initialized

zarr/storage.py

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,128 @@ def atexit_rmglob(path,
937937
rmtree(p)
938938

939939

940+
class FSStore(MutableMapping):
941+
"""Wraps an fsspec.FSMap to give access to arbitrary filesystems
942+
943+
Requires that ``fsspec`` is installed, as well as any additional
944+
requirements for the protocol chosen.
945+
946+
Parameters
947+
----------
948+
url : str
949+
The destination to map. Should include protocol and path,
950+
like "s3://bucket/root"
951+
normalize_keys : bool
952+
key_separator : str
953+
Character to use when constructing the target path strings
954+
for data keys
955+
mode : str
956+
"w" for writable, "r" for read-only
957+
exceptions : list of Exception subclasses
958+
When accessing data, any of these exceptions will be treated
959+
as a missing key
960+
storage_options : passed to the fsspec implementation
961+
"""
962+
963+
def __init__(self, url, normalize_keys=True, key_separator='.',
964+
mode='w',
965+
exceptions=(KeyError, PermissionError, IOError),
966+
**storage_options):
967+
import fsspec
968+
self.path = url
969+
self.normalize_keys = normalize_keys
970+
self.key_separator = key_separator
971+
self.map = fsspec.get_mapper(url, **storage_options)
972+
self.fs = self.map.fs # for direct operations
973+
self.mode = mode
974+
self.exceptions = exceptions
975+
if self.fs.exists(url) and not self.fs.isdir(url):
976+
err_fspath_exists_notdir(url)
977+
978+
def _normalize_key(self, key):
979+
key = normalize_storage_path(key).lstrip('/')
980+
if key:
981+
*bits, end = key.split('/')
982+
key = '/'.join(bits + [end.replace('.', self.key_separator)])
983+
return key.lower() if self.normalize_keys else key
984+
985+
def __getitem__(self, key):
986+
key = self._normalize_key(key)
987+
try:
988+
return self.map[key]
989+
except self.exceptions as e:
990+
raise KeyError(key) from e
991+
992+
def __setitem__(self, key, value):
993+
if self.mode == 'r':
994+
err_read_only()
995+
key = self._normalize_key(key)
996+
path = self.dir_path(key)
997+
value = ensure_contiguous_ndarray(value)
998+
try:
999+
if self.fs.isdir(path):
1000+
self.fs.rm(path, recursive=True)
1001+
self.map[key] = value
1002+
except self.exceptions as e:
1003+
raise KeyError(key) from e
1004+
1005+
def __delitem__(self, key):
1006+
if self.mode == 'r':
1007+
err_read_only()
1008+
key = self._normalize_key(key)
1009+
path = self.dir_path(key)
1010+
if self.fs.isdir(path):
1011+
self.fs.rm(path, recursive=True)
1012+
else:
1013+
del self.map[key]
1014+
1015+
def __contains__(self, key):
1016+
key = self._normalize_key(key)
1017+
return key in self.map
1018+
1019+
def __eq__(self, other):
1020+
return (type(self) == type(other) and self.map == other.map
1021+
and self.mode == other.mode)
1022+
1023+
def keys(self):
1024+
return iter(self.map)
1025+
1026+
def __iter__(self):
1027+
return self.keys()
1028+
1029+
def __len__(self):
1030+
return len(list(self.keys()))
1031+
1032+
def dir_path(self, path=None):
1033+
store_path = normalize_storage_path(path)
1034+
return self.map._key_to_str(store_path)
1035+
1036+
def listdir(self, path=None):
1037+
dir_path = self.dir_path(path)
1038+
try:
1039+
out = sorted(p.rstrip('/').rsplit('/', 1)[-1]
1040+
for p in self.fs.ls(dir_path, detail=False))
1041+
return out
1042+
except IOError:
1043+
return []
1044+
1045+
def rmdir(self, path=None):
1046+
if self.mode == 'r':
1047+
err_read_only()
1048+
store_path = self.dir_path(path)
1049+
if self.fs.isdir(store_path):
1050+
self.fs.rm(store_path, recursive=True)
1051+
1052+
def getsize(self, path=None):
1053+
store_path = self.dir_path(path)
1054+
return self.fs.du(store_path, True, True)
1055+
1056+
def clear(self):
1057+
if self.mode == 'r':
1058+
err_read_only()
1059+
self.map.clear()
1060+
1061+
9401062
class TempStore(DirectoryStore):
9411063
"""Directory store using a temporary directory for storage.
9421064

0 commit comments

Comments
 (0)