Skip to content

Commit 10531ee

Browse files
committed
refactor synchronization for simplicity
1 parent 4b4930c commit 10531ee

File tree

6 files changed

+138
-131
lines changed

6 files changed

+138
-131
lines changed

zarr/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@
55
import atexit
66

77

8-
from zarr.core import Array
8+
from zarr.core import Array, SynchronizedArray
99
from zarr.creation import create, array, empty, zeros, ones, full, open, \
1010
empty_like, zeros_like, ones_like, full_like, open_like, open_array
1111
from zarr.storage import DictStore, DirectoryStore, ZipStore, init_array, \
1212
init_group, init_store
1313
from zarr.hierarchy import group, open_group, Group
14-
from zarr.sync import ThreadSynchronizer, ProcessSynchronizer, \
15-
SynchronizedArray
14+
from zarr.sync import ThreadSynchronizer, ProcessSynchronizer
1615
from zarr.version import version as __version__
1716

1817

zarr/attrs.py

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class Attributes(MutableMapping):
1212

13-
def __init__(self, store, key='attrs', readonly=False):
13+
def __init__(self, store, key='.zattrs', readonly=False):
1414
self.store = store
1515
self.key = key
1616
self.readonly = readonly
@@ -21,12 +21,7 @@ def __contains__(self, x):
2121
def __getitem__(self, item):
2222
return self.asdict()[item]
2323

24-
def put(self, d):
25-
26-
# guard conditions
27-
if self.readonly:
28-
raise ReadOnlyError('attributes are read-only')
29-
24+
def _put(self, d):
3025
s = json.dumps(d, indent=4, sort_keys=True, ensure_ascii=True)
3126
self.store[self.key] = s.encode('ascii')
3227

@@ -42,8 +37,8 @@ def __setitem__(self, key, value):
4237
# set key value
4338
d[key] = value
4439

45-
# put modified data
46-
self.put(d)
40+
# _put modified data
41+
self._put(d)
4742

4843
def __delitem__(self, key):
4944

@@ -57,8 +52,8 @@ def __delitem__(self, key):
5752
# delete key value
5853
del d[key]
5954

60-
# put modified data
61-
self.put(d)
55+
# _put modified data
56+
self._put(d)
6257

6358
def asdict(self):
6459
if self.key in self.store:
@@ -79,8 +74,8 @@ def update(self, *args, **kwargs):
7974
# update
8075
d.update(*args, **kwargs)
8176

82-
# put modified data
83-
self.put(d)
77+
# _put modified data
78+
self._put(d)
8479

8580
def __iter__(self):
8681
return iter(self.asdict())
@@ -96,3 +91,23 @@ def values(self):
9691

9792
def items(self):
9893
return self.asdict().items()
94+
95+
96+
class SynchronizedAttributes(Attributes):
97+
98+
def __init__(self, store, synchronizer, key='.zattrs', readonly=False):
99+
super(SynchronizedAttributes, self).__init__(store, key=key,
100+
readonly=readonly)
101+
self.synchronizer = synchronizer
102+
103+
def __setitem__(self, key, value):
104+
with self.synchronizer[self.key]:
105+
super(SynchronizedAttributes, self).__setitem__(key, value)
106+
107+
def __delitem__(self, key):
108+
with self.synchronizer[self.key]:
109+
super(SynchronizedAttributes, self).__delitem__(key)
110+
111+
def update(self, *args, **kwargs):
112+
with self.synchronizer[self.key]:
113+
super(SynchronizedAttributes, self).update(*args, **kwargs)

zarr/core.py

Lines changed: 98 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
normalize_storage_path
1414
from zarr.storage import array_meta_key, attrs_key, listdir, getsize
1515
from zarr.meta import decode_array_metadata, encode_array_metadata
16-
from zarr.attrs import Attributes
16+
from zarr.attrs import Attributes, SynchronizedAttributes
1717
from zarr.errors import ReadOnlyError
1818
from zarr.compat import reduce
1919

@@ -514,7 +514,7 @@ def _chunk_getitem(self, cidx, item, dest):
514514
try:
515515

516516
# obtain compressed data for chunk
517-
ckey = self._ckey(cidx)
517+
ckey = self._chunk_key(cidx)
518518
cdata = self._chunk_store[ckey]
519519

520520
except KeyError:
@@ -589,7 +589,7 @@ def _chunk_setitem(self, cidx, key, value):
589589
try:
590590

591591
# obtain compressed data for chunk
592-
ckey = self._ckey(cidx)
592+
ckey = self._chunk_key(cidx)
593593
cdata = self._chunk_store[ckey]
594594

595595
except KeyError:
@@ -614,10 +614,10 @@ def _chunk_setitem(self, cidx, key, value):
614614
cdata = self._compressor.compress(chunk)
615615

616616
# store
617-
ckey = self._ckey(cidx)
617+
ckey = self._chunk_key(cidx)
618618
self._chunk_store[ckey] = cdata
619619

620-
def _ckey(self, cidx):
620+
def _chunk_key(self, cidx):
621621
return self._key_prefix + '.'.join(map(str, cidx))
622622

623623
def __repr__(self):
@@ -691,6 +691,13 @@ def resize(self, *args):
691691
if self._readonly:
692692
raise ReadOnlyError('array is read-only')
693693

694+
self._resize(*args)
695+
696+
def _resize(self, *args):
697+
698+
# N.B., private implementation to avoid need for re-entrant lock on
699+
# SynchronizedArray.append().
700+
694701
# normalize new shape argument
695702
old_shape = self._shape
696703
new_shape = normalize_resize_args(old_shape, *args)
@@ -780,7 +787,7 @@ def append(self, data, axis=0):
780787
)
781788

782789
# resize
783-
self.resize(new_shape)
790+
self._resize(new_shape)
784791

785792
# store data
786793
# noinspection PyTypeChecker
@@ -789,3 +796,88 @@ def append(self, data, axis=0):
789796
for i in range(len(self._shape))
790797
)
791798
self[append_selection] = data
799+
800+
801+
class SynchronizedArray(Array):
802+
"""Instantiate a synchronized array.
803+
804+
Parameters
805+
----------
806+
store : MutableMapping
807+
Array store, already initialized.
808+
synchronizer : object
809+
Array synchronizer.
810+
readonly : bool, optional
811+
True if array should be protected against modification.
812+
path : string, optional
813+
Path under which array is stored.
814+
chunk_store : MutableMapping, optional
815+
Separate storage for chunks. If not provided, `store` will be used
816+
for storage of both chunks and metadata.
817+
818+
Examples
819+
--------
820+
>>> import zarr
821+
>>> store = dict()
822+
>>> zarr.init_array(store, shape=1000, chunks=100)
823+
>>> synchronizer = zarr.ThreadSynchronizer()
824+
>>> z = zarr.SynchronizedArray(store, synchronizer)
825+
>>> z
826+
zarr.sync.SynchronizedArray((1000,), float64, chunks=(100,), order=C)
827+
compression: blosc; compression_opts: {'clevel': 5, 'cname': 'lz4', 'shuffle': 1}
828+
nbytes: 7.8K; nbytes_stored: 285; ratio: 28.1; initialized: 0/10
829+
store: builtins.dict; synchronizer: zarr.sync.ThreadSynchronizer
830+
831+
Notes
832+
-----
833+
TODO review
834+
835+
Only writing data to the array via the __setitem__() method and
836+
modification of user attributes are synchronized. Neither append() nor
837+
resize() are synchronized.
838+
839+
Writing to the array is synchronized at the chunk level. I.e.,
840+
the array supports concurrent write operations via the __setitem__()
841+
method, but these will only exclude each other if they both require
842+
modification of the same chunk.
843+
844+
""" # flake8: noqa
845+
846+
def __init__(self, store, synchronizer, readonly=False, path=None,
847+
chunk_store=None):
848+
super(SynchronizedArray, self).__init__(store, readonly=readonly,
849+
path=path,
850+
chunk_store=chunk_store)
851+
self.synchronizer = synchronizer
852+
akey = self._key_prefix + attrs_key
853+
self._attrs = SynchronizedAttributes(store, synchronizer, key=akey,
854+
readonly=readonly)
855+
856+
def __repr__(self):
857+
r = super(SynchronizedArray, self).__repr__()
858+
r += ('; synchronizer: %s.%s' %
859+
(type(self.synchronizer).__module__,
860+
type(self.synchronizer).__name__))
861+
return r
862+
863+
def __getstate__(self):
864+
return self._store, self.synchronizer, self._readonly, self._path, \
865+
self._chunk_store
866+
867+
def __setstate__(self, state):
868+
self.__init__(*state)
869+
870+
def _chunk_setitem(self, cidx, key, value):
871+
ckey = self._chunk_key(cidx)
872+
with self.synchronizer[ckey]:
873+
super(SynchronizedArray, self)._chunk_setitem(cidx, key, value)
874+
875+
def resize(self, *args):
876+
mkey = self._key_prefix + array_meta_key
877+
with self.synchronizer[mkey]:
878+
super(SynchronizedArray, self).resize(*args)
879+
880+
def append(self, *args, **kwargs):
881+
mkey = self._key_prefix + array_meta_key
882+
with self.synchronizer[mkey]:
883+
super(SynchronizedArray, self).append(*args, **kwargs)

zarr/sync.py

Lines changed: 7 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,18 @@ class ThreadSynchronizer(object):
1818

1919
def __init__(self):
2020
self.mutex = Lock()
21-
self.attrs_lock = Lock()
22-
self.chunk_locks = defaultdict(Lock)
21+
self.locks = defaultdict(Lock)
2322

24-
def chunk_lock(self, ckey):
23+
def __getitem__(self, item):
2524
with self.mutex:
26-
lock = self.chunk_locks[ckey]
27-
return lock
25+
return self.locks[item]
2826

2927
def __getstate__(self):
3028
return dict()
3129

3230
def __setstate__(self, d):
3331
# reinitialize from scratch
34-
self.mutex = Lock()
35-
self.attrs_lock = Lock()
36-
self.chunk_locks = defaultdict(Lock)
32+
self.__init__()
3733

3834

3935
class ProcessSynchronizer(object):
@@ -51,104 +47,10 @@ class ProcessSynchronizer(object):
5147
def __init__(self, path):
5248
self.path = path
5349

54-
@property
55-
def attrs_lock(self):
56-
return fasteners.InterProcessLock(
57-
os.path.join(self.path, 'attrs.lock')
58-
)
59-
60-
def chunk_lock(self, ckey):
50+
def __getitem__(self, item):
6151
lock = fasteners.InterProcessLock(
62-
os.path.join(self.path, '%s.lock' % ckey)
52+
os.path.join(self.path, '%s.lock' % item)
6353
)
6454
return lock
6555

66-
67-
class SynchronizedArray(Array):
68-
"""Instantiate a synchronized array.
69-
70-
Parameters
71-
----------
72-
store : MutableMapping
73-
Array store, already initialized.
74-
synchronizer : object
75-
Array synchronizer.
76-
readonly : bool, optional
77-
True if array should be protected against modification.
78-
path : string, optional
79-
Path under which array is stored.
80-
81-
Examples
82-
--------
83-
>>> import zarr
84-
>>> store = dict()
85-
>>> zarr.init_array(store, shape=1000, chunks=100)
86-
>>> synchronizer = zarr.ThreadSynchronizer()
87-
>>> z = zarr.SynchronizedArray(store, synchronizer)
88-
>>> z
89-
zarr.sync.SynchronizedArray((1000,), float64, chunks=(100,), order=C)
90-
compression: blosc; compression_opts: {'clevel': 5, 'cname': 'lz4', 'shuffle': 1}
91-
nbytes: 7.8K; nbytes_stored: 285; ratio: 28.1; initialized: 0/10
92-
store: builtins.dict; synchronizer: zarr.sync.ThreadSynchronizer
93-
94-
Notes
95-
-----
96-
Only writing data to the array via the __setitem__() method and
97-
modification of user attributes are synchronized. Neither append() nor
98-
resize() are synchronized.
99-
100-
Writing to the array is synchronized at the chunk level. I.e.,
101-
the array supports concurrent write operations via the __setitem__()
102-
method, but these will only exclude each other if they both require
103-
modification of the same chunk.
104-
105-
""" # flake8: noqa
106-
107-
def __init__(self, store, synchronizer, readonly=False, path=None,
108-
chunk_store=None):
109-
super(SynchronizedArray, self).__init__(store, readonly=readonly,
110-
path=path,
111-
chunk_store=chunk_store)
112-
self.synchronizer = synchronizer
113-
akey = self._key_prefix + attrs_key
114-
self._attrs = SynchronizedAttributes(store, synchronizer, key=akey,
115-
readonly=readonly)
116-
117-
def _chunk_setitem(self, cidx, key, value):
118-
ckey = self._ckey(cidx)
119-
with self.synchronizer.chunk_lock(ckey):
120-
super(SynchronizedArray, self)._chunk_setitem(cidx, key, value)
121-
122-
def __repr__(self):
123-
r = super(SynchronizedArray, self).__repr__()
124-
r += ('; synchronizer: %s.%s' %
125-
(type(self.synchronizer).__module__,
126-
type(self.synchronizer).__name__))
127-
return r
128-
129-
def __getstate__(self):
130-
return self._store, self.synchronizer, self._readonly, self._path, \
131-
self._chunk_store
132-
133-
def __setstate__(self, state):
134-
self.__init__(*state)
135-
136-
137-
class SynchronizedAttributes(Attributes):
138-
139-
def __init__(self, store, synchronizer, key='attrs', readonly=False):
140-
super(SynchronizedAttributes, self).__init__(store, key=key,
141-
readonly=readonly)
142-
self.synchronizer = synchronizer
143-
144-
def __setitem__(self, key, value):
145-
with self.synchronizer.attrs_lock:
146-
super(SynchronizedAttributes, self).__setitem__(key, value)
147-
148-
def __delitem__(self, key):
149-
with self.synchronizer.attrs_lock:
150-
super(SynchronizedAttributes, self).__delitem__(key)
151-
152-
def update(self, *args, **kwargs):
153-
with self.synchronizer.attrs_lock:
154-
super(SynchronizedAttributes, self).update(*args, **kwargs)
56+
# pickling and unpickling should be handled automatically

zarr/tests/test_attrs.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,5 +86,3 @@ def test_readonly(self):
8686
del a['foo']
8787
with assert_raises(ReadOnlyError):
8888
a.update(foo='quux')
89-
with assert_raises(ReadOnlyError):
90-
a.put(dict())

0 commit comments

Comments
 (0)