21
21
import multiprocessing
22
22
import operator
23
23
import os
24
+ import json
24
25
import re
25
26
import shutil
26
27
import sys
@@ -949,14 +950,32 @@ def atexit_rmglob(path,
949
950
class FSStore (MutableMapping ):
950
951
951
952
def __init__ (self , url , normalize_keys = True , key_separator = '.' ,
952
- mode = 'w' , ** storage_options ):
953
+ mode = 'w' , consolidated = False , metadata_key = '.zmetadata' , ** storage_options ):
953
954
import fsspec
954
955
self .path = url
955
956
self .normalize_keys = normalize_keys
956
957
self .key_separator = key_separator
957
958
self .map = fsspec .get_mapper (url , ** storage_options )
958
959
self .fs = self .map .fs # for direct operations
959
960
self .mode = mode
961
+ # TODO: should warn if consolidated and write mode?
962
+ if self .fs .exists (url ) and not self .fs .isdir (url ):
963
+ err_fspath_exists_notdir (url )
964
+ self .consolidated = consolidated
965
+ self .metadata_key = metadata_key
966
+ if consolidated :
967
+ self .meta = json .loads (self .map .get (metadata_key , b"{}" ).decode ())
968
+ if mode == 'r' or 'zarr_consolidated_format' in self .meta :
969
+ consolidated_format = self .meta .get ('zarr_consolidated_format' , None )
970
+ if consolidated_format != 1 :
971
+ raise MetadataError ('unsupported zarr consolidated metadata format: %s' %
972
+ consolidated_format )
973
+ else :
974
+ self .meta ['zarr_consolidated_format' ] = 1
975
+
976
+ @staticmethod
977
+ def _is_meta (key ):
978
+ return key .split ('/' )[- 1 ] in [attrs_key , group_meta_key , array_meta_key ]
960
979
961
980
def _normalize_key (self , key ):
962
981
key = normalize_storage_path (key ).lstrip ('/' )
@@ -966,12 +985,17 @@ def _normalize_key(self, key):
966
985
return key .lower () if self .normalize_keys else key
967
986
968
987
def __getitem__ (self , key ):
988
+ if self .consolidated and self ._is_meta (key ):
989
+ return self .meta [key ]
969
990
key = self ._normalize_key (key )
970
991
return self .map [key ]
971
992
972
993
def __setitem__ (self , key , value ):
973
994
if self .mode == 'r' :
974
- raise PermissionError
995
+ err_read_only ()
996
+ if self .consolidated and self ._is_meta (key ):
997
+ self .meta [key ] = value .decode ()
998
+ self .map [self .metadata_key ] = json .dumps (self .meta ).encode ()
975
999
key = self ._normalize_key (key )
976
1000
path = self .dir_path (key )
977
1001
value = ensure_contiguous_ndarray (value )
@@ -984,7 +1008,10 @@ def __setitem__(self, key, value):
984
1008
985
1009
def __delitem__ (self , key ):
986
1010
if self .mode == 'r' :
987
- raise PermissionError
1011
+ err_read_only ()
1012
+ if self .consolidated and self ._is_meta (key ):
1013
+ del self .meta [key ]
1014
+ self .map [self .metadata_key ] = json .dumps (self .meta ).encode ()
988
1015
key = self ._normalize_key (key )
989
1016
path = self .dir_path (key )
990
1017
if self .fs .isdir (path ):
@@ -993,6 +1020,8 @@ def __delitem__(self, key):
993
1020
del self .map [key ]
994
1021
995
1022
def __contains__ (self , key ):
1023
+ if self .consolidated and self ._is_meta (key ):
1024
+ return key in self .meta
996
1025
key = self ._normalize_key (key )
997
1026
return key in self .map
998
1027
@@ -1022,6 +1051,8 @@ def listdir(self, path=None):
1022
1051
return []
1023
1052
1024
1053
def rmdir (self , path = None ):
1054
+ if self .mode == 'r' :
1055
+ err_read_only ()
1025
1056
store_path = self .dir_path (path )
1026
1057
if self .fs .isdir (store_path ):
1027
1058
self .fs .rm (store_path , recursive = True )
@@ -1031,6 +1062,10 @@ def getsize(self, path=None):
1031
1062
return self .fs .du (store_path , True , True )
1032
1063
1033
1064
def clear (self ):
1065
+ if self .mode == 'r' :
1066
+ err_read_only ()
1067
+ if self .consolidated :
1068
+ self .meta = {}
1034
1069
self .map .clear ()
1035
1070
1036
1071
0 commit comments