Skip to content

Commit 2f18f18

Browse files
committed
add synchronization for persistent arrays
1 parent 8a3c838 commit 2f18f18

File tree

7 files changed

+63
-21
lines changed

7 files changed

+63
-21
lines changed

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ Create a persistent array (data saved to disk)::
123123
>>> z = zarr.open(path, shape=(10000, 1000), dtype='i4', chunks=(1000, 100))
124124
>>> z[:] = np.arange(10000000, dtype='i4').reshape(10000, 1000)
125125
>>> z
126-
zarr.ext.PersistentArray((10000, 1000), int32, chunks=(1000, 100))
126+
zarr.ext.SynchronizedPersistentArray((10000, 1000), int32, chunks=(1000, 100))
127127
cname: blosclz; clevel: 5; shuffle: 1 (BYTESHUFFLE)
128128
nbytes: 38.1M; cbytes: 2.0M; ratio: 19.3; initialized: 100/100
129129
mode: a; path: example.zarr

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@
7474
'setuptools>18.0',
7575
'setuptools-scm>1.5.4'
7676
],
77+
install_requires=[
78+
'fasteners',
79+
],
7780
ext_modules=ext_modules,
7881
package_dir={'': '.'},
7982
packages=['zarr', 'zarr.tests'],

zarr/core.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def full(shape, chunks, fill_value, dtype=None, cname=None, clevel=None,
166166

167167

168168
def array(data, chunks=None, dtype=None, cname=None, clevel=None,
169-
shuffle=None, synchronized=True, fill_value=None):
169+
shuffle=None, fill_value=None, synchronized=True):
170170
"""Create an array filled with `data`.
171171
172172
Parameters
@@ -185,11 +185,11 @@ def array(data, chunks=None, dtype=None, cname=None, clevel=None,
185185
shuffle : int, optional
186186
Shuffle filter, 0 means no shuffle, 1 means byte shuffle, 2 means
187187
bit shuffle.
188+
fill_value : object
189+
Default value to use for uninitialised portions of the array.
188190
synchronized : bool, optional
189191
If True, each chunk will be protected with a lock to prevent data
190192
collision during write operations.
191-
fill_value : object
192-
Default value to use for uninitialised portions of the array.
193193
194194
Returns
195195
-------
@@ -233,8 +233,9 @@ def array(data, chunks=None, dtype=None, cname=None, clevel=None,
233233
return z
234234

235235

236+
# noinspection PyShadowingBuiltins
236237
def open(path, mode='a', shape=None, chunks=None, dtype=None, cname=None,
237-
clevel=None, shuffle=None, fill_value=None):
238+
clevel=None, shuffle=None, fill_value=None, synchronized=True):
238239
"""Open a persistent array.
239240
240241
Parameters
@@ -262,16 +263,20 @@ def open(path, mode='a', shape=None, chunks=None, dtype=None, cname=None,
262263
bit shuffle.
263264
fill_value : object
264265
Default value to use for uninitialised portions of the array.
266+
synchronized : bool, optional
267+
If True, each chunk will be protected with a lock to prevent data
268+
collision during write operations.
265269
266270
Returns
267271
-------
268272
z : zarr Array
269273
270274
"""
271275

272-
# TODO synchronized option
273-
274-
cls = _ext.PersistentArray
276+
if synchronized:
277+
cls = _ext.SynchronizedPersistentArray
278+
else:
279+
cls = _ext.PersistentArray
275280
return cls(path=path, mode=mode, shape=shape, chunks=chunks, dtype=dtype,
276281
cname=cname, clevel=clevel, shuffle=shuffle,
277282
fill_value=fill_value)

zarr/ext.pxd

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ cdef class PersistentChunk(BaseChunk):
3737

3838

3939
cdef class SynchronizedPersistentChunk(PersistentChunk):
40-
# TODO
41-
pass
40+
cdef object _thread_lock
41+
cdef object _file_lock
4242

4343

4444
cdef class BaseArray:
@@ -73,7 +73,6 @@ cdef class PersistentArray(BaseArray):
7373

7474

7575
cdef class SynchronizedPersistentArray(PersistentArray):
76-
# TODO
7776
pass
7877

7978

zarr/ext.pyx

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import tempfile
1919
from collections import namedtuple
2020

2121

22+
import fasteners
2223
from zarr import util as _util
2324
from zarr import defaults
2425

@@ -574,13 +575,15 @@ cdef class PersistentChunk(BaseChunk):
574575
# noinspection PyAbstractClass
575576
cdef class SynchronizedPersistentChunk(PersistentChunk):
576577

577-
def __cinit__(self, **kargs):
578-
# TODO
579-
pass
578+
def __cinit__(self, **kwargs):
579+
lock_path = self._path + '.lock'
580+
self._thread_lock = RLock()
581+
self._file_lock = fasteners.InterProcessLock(lock_path)
580582

581583
def __setitem__(self, key, value):
582-
# TODO
583-
pass
584+
with self._thread_lock:
585+
with self._file_lock:
586+
super(SynchronizedPersistentChunk, self).__setitem__(key, value)
584587

585588

586589
###############################################################################
@@ -1303,9 +1306,18 @@ cdef class PersistentArray(BaseArray):
13031306
yield self._cdata[cidx]
13041307

13051308

1309+
# noinspection PyAbstractClass
13061310
cdef class SynchronizedPersistentArray(PersistentArray):
1307-
# TODO
1308-
pass
1311+
1312+
cdef BaseChunk create_chunk(self, tuple cidx):
1313+
chunk_filename = '.'.join(map(str, cidx)) + defaults.datasuffix
1314+
chunk_path = os.path.join(self._path, defaults.datapath,
1315+
chunk_filename)
1316+
return SynchronizedPersistentChunk(
1317+
path=chunk_path, shape=self._chunks, dtype=self._dtype,
1318+
cname=self._cname, clevel=self._clevel, shuffle=self._shuffle,
1319+
fill_value=self._fill_value
1320+
)
13091321

13101322

13111323
cdef class LazyArray(BaseArray):

zarr/tests/test_array.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
from nose.tools import eq_ as eq, assert_false, assert_true, assert_raises
1111
import numpy as np
1212
from numpy.testing import assert_array_equal
13-
from zarr.ext import Array, SynchronizedArray, PersistentArray
13+
from zarr.ext import Array, SynchronizedArray, PersistentArray, \
14+
SynchronizedPersistentArray
1415
from zarr import defaults
1516

1617

@@ -363,3 +364,15 @@ def test_persistence_2d(self):
363364
a = np.arange(10000).reshape((1000, 10))
364365
chunks = (100, 2)
365366
self._test_persistence(a, chunks=chunks)
367+
368+
369+
class TestSynchronizedPersistentArray(TestPersistentArray):
370+
371+
def create_array(self, **kwargs):
372+
path = kwargs.get('path', tempfile.mktemp())
373+
kwargs['path'] = path
374+
# tidy up
375+
atexit.register(
376+
lambda: shutil.rmtree(path) if os.path.exists(path) else None
377+
)
378+
return SynchronizedPersistentArray(**kwargs)

zarr/tests/test_chunk.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
from nose.tools import eq_ as eq, assert_false, assert_true, assert_raises
1010
import numpy as np
1111
from numpy.testing import assert_array_equal
12-
from zarr.ext import Chunk, PersistentChunk, SynchronizedChunk
12+
from zarr.ext import Chunk, PersistentChunk, SynchronizedChunk, \
13+
SynchronizedPersistentChunk
1314
from zarr import defaults
1415

1516

@@ -223,4 +224,13 @@ def test_persistence(self):
223224
.reshape(100, -1))
224225

225226

226-
# TODO test persistent synchronized chunks
227+
class TestSynchronizedPersistentChunk(TestPersistentChunk):
228+
229+
def create_chunk(self, **kwargs):
230+
path = kwargs.get('path', tempfile.mktemp())
231+
kwargs['path'] = path
232+
# tidy up
233+
atexit.register(
234+
lambda: os.remove(path) if os.path.exists(path) else None
235+
)
236+
return SynchronizedPersistentChunk(**kwargs)

0 commit comments

Comments
 (0)