Skip to content

Commit d39b08f

Browse files
committed
implement group synchronization; refactor array synchronization
1 parent dc18335 commit d39b08f

File tree

8 files changed

+267
-268
lines changed

8 files changed

+267
-268
lines changed

zarr/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import atexit
66

77

8-
from zarr.core import Array, SynchronizedArray
8+
from zarr.core import Array
99
from zarr.creation import create, array, empty, zeros, ones, full, open, \
1010
empty_like, zeros_like, ones_like, full_like, open_like, open_array
1111
from zarr.storage import DictStore, DirectoryStore, ZipStore, init_array, \

zarr/attrs.py

Lines changed: 23 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010

1111
class Attributes(MutableMapping):
1212

13-
def __init__(self, store, key='.zattrs', readonly=False):
13+
def __init__(self, store, key='.zattrs', readonly=False,
14+
synchronizer=None):
1415
self.store = store
1516
self.key = key
1617
self.readonly = readonly
18+
self.synchronizer = synchronizer
1719

1820
def __contains__(self, x):
1921
return x in self.asdict()
@@ -25,26 +27,37 @@ def _put(self, d):
2527
s = json.dumps(d, indent=4, sort_keys=True, ensure_ascii=True)
2628
self.store[self.key] = s.encode('ascii')
2729

28-
def __setitem__(self, key, value):
30+
def _write_op(self, f, *args, **kwargs):
2931

30-
# guard conditions
32+
# guard condition
3133
if self.readonly:
3234
raise ReadOnlyError('attributes are read-only')
3335

36+
# synchronization
37+
if self.synchronizer is None:
38+
return f(*args, **kwargs)
39+
else:
40+
with self.synchronizer[self.key]:
41+
return f(*args, **kwargs)
42+
43+
def __setitem__(self, item, value):
44+
self._write_op(self._setitem_nosync, item, value)
45+
46+
def _setitem_nosync(self, item, value):
47+
3448
# load existing data
3549
d = self.asdict()
3650

3751
# set key value
38-
d[key] = value
52+
d[item] = value
3953

4054
# _put modified data
4155
self._put(d)
4256

43-
def __delitem__(self, key):
57+
def __delitem__(self, item):
58+
self._write_op(self._delitem_nosync, item)
4459

45-
# guard conditions
46-
if self.readonly:
47-
raise ReadOnlyError('attributes are read-only')
60+
def _delitem_nosync(self, key):
4861

4962
# load existing data
5063
d = self.asdict()
@@ -63,10 +76,9 @@ def asdict(self):
6376

6477
def update(self, *args, **kwargs):
6578
# override to provide update in a single write
79+
self._write_op(self._update_nosync, *args, **kwargs)
6680

67-
# guard conditions
68-
if self.readonly:
69-
raise ReadOnlyError('mapping is read-only')
81+
def _update_nosync(self, *args, **kwargs):
7082

7183
# load existing data
7284
d = self.asdict()
@@ -82,32 +94,3 @@ def __iter__(self):
8294

8395
def __len__(self):
8496
return len(self.asdict())
85-
86-
def keys(self):
87-
return self.asdict().keys()
88-
89-
def values(self):
90-
return self.asdict().values()
91-
92-
def items(self):
93-
return self.asdict().items()
94-
95-
96-
class SynchronizedAttributes(Attributes):
97-
98-
def __init__(self, store, synchronizer, key='.zattrs', readonly=False):
99-
super(SynchronizedAttributes, self).__init__(store, key=key,
100-
readonly=readonly)
101-
self.synchronizer = synchronizer
102-
103-
def __setitem__(self, key, value):
104-
with self.synchronizer[self.key]:
105-
super(SynchronizedAttributes, self).__setitem__(key, value)
106-
107-
def __delitem__(self, key):
108-
with self.synchronizer[self.key]:
109-
super(SynchronizedAttributes, self).__delitem__(key)
110-
111-
def update(self, *args, **kwargs):
112-
with self.synchronizer[self.key]:
113-
super(SynchronizedAttributes, self).update(*args, **kwargs)

zarr/core.py

Lines changed: 56 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
normalize_storage_path
1414
from zarr.storage import array_meta_key, attrs_key, listdir, getsize
1515
from zarr.meta import decode_array_metadata, encode_array_metadata
16-
from zarr.attrs import Attributes, SynchronizedAttributes
16+
from zarr.attrs import Attributes
1717
from zarr.errors import ReadOnlyError
1818
from zarr.compat import reduce
1919

@@ -32,6 +32,8 @@ class Array(object):
3232
chunk_store : MutableMapping, optional
3333
Separate storage for chunks. If not provided, `store` will be used
3434
for storage of both chunks and metadata.
35+
synchronizer : object, optional
36+
Array synchronizer.
3537
3638
Attributes
3739
----------
@@ -47,6 +49,7 @@ class Array(object):
4749
compression_opts
4850
fill_value
4951
order
52+
synchronizer
5053
attrs
5154
size
5255
itemsize
@@ -64,7 +67,8 @@ class Array(object):
6467
6568
""" # flake8: noqa
6669

67-
def __init__(self, store, path=None, readonly=False, chunk_store=None):
70+
def __init__(self, store, path=None, readonly=False, chunk_store=None,
71+
synchronizer=None):
6872
# N.B., expect at this point store is fully initialized with all
6973
# configuration metadata fully specified and normalized
7074

@@ -79,6 +83,7 @@ def __init__(self, store, path=None, readonly=False, chunk_store=None):
7983
self._chunk_store = store
8084
else:
8185
self._chunk_store = chunk_store
86+
self._synchronizer = synchronizer
8287

8388
# initialize metadata
8489
try:
@@ -101,9 +106,10 @@ def __init__(self, store, path=None, readonly=False, chunk_store=None):
101106

102107
# initialize attributes
103108
akey = self._key_prefix + attrs_key
104-
self._attrs = Attributes(store, key=akey, readonly=readonly)
109+
self._attrs = Attributes(store, key=akey, readonly=readonly,
110+
synchronizer=synchronizer)
105111

106-
def flush_metadata(self):
112+
def _flush_metadata(self):
107113
meta = dict(shape=self._shape, chunks=self._chunks, dtype=self._dtype,
108114
compression=self._compression,
109115
compression_opts=self._compression_opts,
@@ -183,6 +189,11 @@ def order(self):
183189
chunks of the array."""
184190
return self._order
185191

192+
@property
193+
def synchronizer(self):
194+
"""TODO doc me"""
195+
return self._synchronizer
196+
186197
@property
187198
def attrs(self):
188199
"""A MutableMapping containing user-defined attributes. Note that
@@ -563,6 +574,20 @@ def _chunk_setitem(self, cidx, key, value):
563574
564575
"""
565576

577+
# synchronization
578+
if self._synchronizer is None:
579+
self._chunk_setitem_nosync(cidx, key, value)
580+
else:
581+
# synchronize on the chunk
582+
ckey = self._chunk_key(cidx)
583+
with self._synchronizer[ckey]:
584+
self._chunk_setitem_nosync(cidx, key, value)
585+
586+
def _chunk_setitem_nosync(self, cidx, key, value):
587+
588+
# obtain key for chunk storage
589+
ckey = self._chunk_key(cidx)
590+
566591
if is_total_slice(key, self._chunks):
567592

568593
# optimisation: we are completely replacing the chunk, so no need
@@ -589,7 +614,6 @@ def _chunk_setitem(self, cidx, key, value):
589614
try:
590615

591616
# obtain compressed data for chunk
592-
ckey = self._chunk_key(cidx)
593617
cdata = self._chunk_store[ckey]
594618

595619
except KeyError:
@@ -614,7 +638,6 @@ def _chunk_setitem(self, cidx, key, value):
614638
cdata = self._compressor.compress(chunk)
615639

616640
# store
617-
ckey = self._chunk_key(cidx)
618641
self._chunk_store[ckey] = cdata
619642

620643
def _chunk_key(self, cidx):
@@ -644,14 +667,34 @@ def __repr__(self):
644667
r += '\n chunk_store: %s.%s' % \
645668
(type(self._chunk_store).__module__,
646669
type(self._chunk_store).__name__)
670+
if self._synchronizer is not None:
671+
r += ('\n synchronizer: %s.%s' %
672+
(type(self._synchronizer).__module__,
673+
type(self._synchronizer).__name__))
647674
return r
648675

649676
def __getstate__(self):
650-
return self._store, self._path, self._readonly, self._chunk_store
677+
return self._store, self._path, self._readonly, self._chunk_store, \
678+
self._synchronizer
651679

652680
def __setstate__(self, state):
653681
self.__init__(*state)
654682

683+
def _write_op(self, f, *args, **kwargs):
684+
685+
# guard condition
686+
if self._readonly:
687+
raise ReadOnlyError('array is read-only')
688+
689+
# synchronization
690+
if self._synchronizer is None:
691+
return f(*args, **kwargs)
692+
else:
693+
# synchronize on the array
694+
mkey = self._key_prefix + array_meta_key
695+
with self._synchronizer[mkey]:
696+
return f(*args, **kwargs)
697+
655698
def resize(self, *args):
656699
"""Change the shape of the array by growing or shrinking one or more
657700
dimensions.
@@ -687,16 +730,9 @@ def resize(self, *args):
687730
688731
""" # flake8: noqa
689732

690-
# guard conditions
691-
if self._readonly:
692-
raise ReadOnlyError('array is read-only')
693-
694-
self._resize(*args)
733+
return self._write_op(self._resize_nosync, *args)
695734

696-
def _resize(self, *args):
697-
698-
# N.B., private implementation to avoid need for re-entrant lock on
699-
# SynchronizedArray.append().
735+
def _resize_nosync(self, *args):
700736

701737
# normalize new shape argument
702738
old_shape = self._shape
@@ -718,7 +754,7 @@ def _resize(self, *args):
718754

719755
# update metadata
720756
self._shape = new_shape
721-
self.flush_metadata()
757+
self._flush_metadata()
722758

723759
def append(self, data, axis=0):
724760
"""Append `data` to `axis`.
@@ -760,10 +796,9 @@ def append(self, data, axis=0):
760796
store: builtins.dict
761797
762798
"""
799+
return self._write_op(self._append_nosync, data, axis=axis)
763800

764-
# guard conditions
765-
if self._readonly:
766-
raise ReadOnlyError('array is read-only')
801+
def _append_nosync(self, data, axis=0):
767802

768803
# ensure data is array-like
769804
if not hasattr(data, 'shape') or not hasattr(data, 'dtype'):
@@ -787,7 +822,7 @@ def append(self, data, axis=0):
787822
)
788823

789824
# resize
790-
self._resize(new_shape)
825+
self._resize_nosync(new_shape)
791826

792827
# store data
793828
# noinspection PyTypeChecker
@@ -796,88 +831,3 @@ def append(self, data, axis=0):
796831
for i in range(len(self._shape))
797832
)
798833
self[append_selection] = data
799-
800-
801-
class SynchronizedArray(Array):
802-
"""Instantiate a synchronized array.
803-
804-
Parameters
805-
----------
806-
store : MutableMapping
807-
Array store, already initialized.
808-
synchronizer : object
809-
Array synchronizer.
810-
readonly : bool, optional
811-
True if array should be protected against modification.
812-
path : string, optional
813-
Path under which array is stored.
814-
chunk_store : MutableMapping, optional
815-
Separate storage for chunks. If not provided, `store` will be used
816-
for storage of both chunks and metadata.
817-
818-
Examples
819-
--------
820-
>>> import zarr
821-
>>> store = dict()
822-
>>> zarr.init_array(store, shape=1000, chunks=100)
823-
>>> synchronizer = zarr.ThreadSynchronizer()
824-
>>> z = zarr.SynchronizedArray(store, synchronizer)
825-
>>> z
826-
zarr.core.SynchronizedArray((1000,), float64, chunks=(100,), order=C)
827-
compression: blosc; compression_opts: {'clevel': 5, 'cname': 'lz4', 'shuffle': 1}
828-
nbytes: 7.8K; nbytes_stored: 285; ratio: 28.1; initialized: 0/10
829-
store: builtins.dict; synchronizer: zarr.sync.ThreadSynchronizer
830-
831-
Notes
832-
-----
833-
TODO review
834-
835-
Only writing data to the array via the __setitem__() method and
836-
modification of user attributes are synchronized. Neither append() nor
837-
resize() are synchronized.
838-
839-
Writing to the array is synchronized at the chunk level. I.e.,
840-
the array supports concurrent write operations via the __setitem__()
841-
method, but these will only exclude each other if they both require
842-
modification of the same chunk.
843-
844-
""" # flake8: noqa
845-
846-
def __init__(self, store, synchronizer, readonly=False, path=None,
847-
chunk_store=None):
848-
super(SynchronizedArray, self).__init__(store, readonly=readonly,
849-
path=path,
850-
chunk_store=chunk_store)
851-
self._synchronizer = synchronizer
852-
akey = self._key_prefix + attrs_key
853-
self._attrs = SynchronizedAttributes(store, synchronizer, key=akey,
854-
readonly=readonly)
855-
856-
def __repr__(self):
857-
r = super(SynchronizedArray, self).__repr__()
858-
r += ('\n synchronizer: %s.%s' %
859-
(type(self._synchronizer).__module__,
860-
type(self._synchronizer).__name__))
861-
return r
862-
863-
def __getstate__(self):
864-
return self._store, self._synchronizer, self._readonly, self._path, \
865-
self._chunk_store
866-
867-
def __setstate__(self, state):
868-
self.__init__(*state)
869-
870-
def _chunk_setitem(self, cidx, key, value):
871-
ckey = self._chunk_key(cidx)
872-
with self._synchronizer[ckey]:
873-
super(SynchronizedArray, self)._chunk_setitem(cidx, key, value)
874-
875-
def resize(self, *args):
876-
mkey = self._key_prefix + array_meta_key
877-
with self._synchronizer[mkey]:
878-
super(SynchronizedArray, self).resize(*args)
879-
880-
def append(self, *args, **kwargs):
881-
mkey = self._key_prefix + array_meta_key
882-
with self._synchronizer[mkey]:
883-
super(SynchronizedArray, self).append(*args, **kwargs)

0 commit comments

Comments
 (0)