Skip to content

Commit c971e96

Browse files
committed
parallel tests
1 parent 7c571fd commit c971e96

File tree

4 files changed

+170
-82
lines changed

4 files changed

+170
-82
lines changed

docs/release.rst

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,9 @@ Release notes
77
-----
88

99
Various minor improvements, including: ``Group`` objects support member access
10-
via dot notation (``__getattr__``); fix metadata caching for ``Array.shape``
11-
property and derivatives; add ``Array.ndim`` property; fix ``Array.__array__``
12-
method arguments;
13-
10+
via dot notation (``__getattr__``); fixed metadata caching for ``Array.shape``
11+
property and derivatives; added ``Array.ndim`` property; fixed
12+
``Array.__array__`` method arguments; fixed bug in pickling ``Array`` state.
1413

1514
.. _release_2.1.0:
1615

zarr/storage.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def contains_group(store, path=None):
6161
def _rmdir_from_keys(store, path=None):
6262
# assume path already normalized
6363
prefix = _path_to_prefix(path)
64-
for key in set(store.keys()):
64+
for key in list(store.keys()):
6565
if key.startswith(prefix):
6666
del store[key]
6767

@@ -81,7 +81,7 @@ def _listdir_from_keys(store, path=None):
8181
# assume path already normalized
8282
prefix = _path_to_prefix(path)
8383
children = set()
84-
for key in store.keys():
84+
for key in list(store.keys()):
8585
if key.startswith(prefix) and len(key) > len(prefix):
8686
suffix = key[len(prefix):]
8787
child = suffix.split('/')[0]
@@ -718,9 +718,9 @@ def getsize(self, path=None):
718718
err_path_not_found(path)
719719

720720

721-
def _atexit_rmtree(path,
722-
isdir=os.path.isdir,
723-
rmtree=shutil.rmtree): # pragma: no cover
721+
def atexit_rmtree(path,
722+
isdir=os.path.isdir,
723+
rmtree=shutil.rmtree): # pragma: no cover
724724
"""Ensure directory removal at interpreter exit."""
725725
if isdir(path):
726726
rmtree(path)
@@ -731,7 +731,7 @@ class TempStore(DirectoryStore):
731731

732732
def __init__(self, suffix='', prefix='zarr', dir=None):
733733
path = tempfile.mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
734-
atexit.register(_atexit_rmtree, path)
734+
atexit.register(atexit_rmtree, path)
735735
super(TempStore, self).__init__(path)
736736

737737

zarr/tests/test_hierarchy.py

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
from zarr.errors import PermissionError
2323
from zarr.creation import open_array
2424
from zarr.compat import PY2
25-
from zarr.sync import ThreadSynchronizer, ProcessSynchronizer
2625
from zarr.codecs import Zlib
2726

2827

@@ -752,60 +751,6 @@ def test_chunk_store(self):
752751
eq(expect, actual)
753752

754753

755-
class TestGroupWithThreadSynchronizer(TestGroup):
756-
757-
def create_group(self, store=None, path=None, read_only=False,
758-
chunk_store=None, synchronizer=None):
759-
if store is None:
760-
store, chunk_store = self.create_store()
761-
init_group(store, path=path, chunk_store=chunk_store)
762-
synchronizer = ThreadSynchronizer()
763-
g = Group(store, path=path, read_only=read_only,
764-
chunk_store=chunk_store, synchronizer=synchronizer)
765-
return g
766-
767-
def test_group_repr(self):
768-
if not PY2:
769-
g = self.create_group()
770-
expect = 'Group(/, 0)\n' \
771-
' store: dict; synchronizer: ThreadSynchronizer'
772-
actual = repr(g)
773-
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
774-
eq(l1, l2)
775-
776-
def test_synchronizer_property(self):
777-
g = self.create_group()
778-
assert_is_instance(g.synchronizer, ThreadSynchronizer)
779-
780-
781-
class TestGroupWithProcessSynchronizer(TestGroup):
782-
783-
def create_group(self, store=None, path=None, read_only=False,
784-
chunk_store=None, synchronizer=None):
785-
if store is None:
786-
store, chunk_store = self.create_store()
787-
init_group(store, path=path, chunk_store=chunk_store)
788-
sync_path = tempfile.mkdtemp()
789-
atexit.register(shutil.rmtree, sync_path)
790-
synchronizer = ProcessSynchronizer(sync_path)
791-
g = Group(store, path=path, read_only=read_only,
792-
chunk_store=chunk_store, synchronizer=synchronizer)
793-
return g
794-
795-
def test_group_repr(self):
796-
if not PY2:
797-
g = self.create_group()
798-
expect = 'Group(/, 0)\n' \
799-
' store: dict; synchronizer: ProcessSynchronizer'
800-
actual = repr(g)
801-
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
802-
eq(l1, l2)
803-
804-
def test_synchronizer_property(self):
805-
g = self.create_group()
806-
assert_is_instance(g.synchronizer, ProcessSynchronizer)
807-
808-
809754
def test_group():
810755
# test the group() convenience function
811756

zarr/tests/test_sync.py

Lines changed: 161 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,38 @@
66
import shutil
77
from multiprocessing.pool import ThreadPool, Pool as ProcessPool
88
from multiprocessing import cpu_count
9-
import os
109
import tempfile
1110

1211

1312
import numpy as np
14-
from nose.tools import eq_ as eq
13+
from nose.tools import eq_ as eq, assert_is_instance
14+
from numpy.testing import assert_array_equal
1515

1616

1717
from zarr.tests.test_attrs import TestAttributes
1818
from zarr.tests.test_core import TestArray
19+
from zarr.tests.test_hierarchy import TestGroup
1920
from zarr.sync import ThreadSynchronizer, ProcessSynchronizer
2021
from zarr.core import Array
2122
from zarr.attrs import Attributes
22-
from zarr.storage import init_array, TempStore
23+
from zarr.storage import init_array, TempStore, init_group, atexit_rmtree
2324
from zarr.compat import PY2
2425
from zarr.codecs import Zlib
26+
from zarr.hierarchy import Group
2527

2628

27-
class TestThreadSynchronizedAttributes(TestAttributes):
29+
if PY2:
30+
31+
class TemporaryDirectory(object):
32+
def __init__(self):
33+
self.name = tempfile.mkdtemp()
34+
atexit.register(atexit_rmtree, self.name)
35+
36+
else:
37+
from tempfile import TemporaryDirectory
38+
39+
40+
class TestAttributesWithThreadSynchronizer(TestAttributes):
2841

2942
def init_attributes(self, store, read_only=False):
3043
key = 'attrs'
@@ -34,7 +47,7 @@ def init_attributes(self, store, read_only=False):
3447
read_only=read_only)
3548

3649

37-
class TestProcessSynchronizedAttributes(TestAttributes):
50+
class TestAttributesProcessSynchronizer(TestAttributes):
3851

3952
def init_attributes(self, store, read_only=False):
4053
key = 'attrs'
@@ -46,7 +59,7 @@ def init_attributes(self, store, read_only=False):
4659
read_only=read_only)
4760

4861

49-
def _append_data(arg):
62+
def _append(arg):
5063
z, i = arg
5164
import numpy as np
5265
x = np.empty(1000, dtype='i4')
@@ -55,22 +68,46 @@ def _append_data(arg):
5568
return z.shape
5669

5770

71+
def _set_arange(arg):
72+
z, i = arg
73+
import numpy as np
74+
x = np.arange(i*1000, (i*1000)+1000, 1)
75+
z[i*1000:(i*1000)+1000] = x
76+
return i
77+
78+
5879
class MixinArraySyncTests(object):
5980

81+
def test_parallel_setitem(self):
82+
n = 99
83+
84+
# setup
85+
arr = self.create_array(shape=n * 1000, chunks=999, dtype='i4')
86+
arr[:] = 0
87+
pool = self.create_pool()
88+
89+
# parallel setitem
90+
results = pool.map_async(_set_arange, zip([arr] * n, range(n)))
91+
print(results.get())
92+
93+
assert_array_equal(np.arange(n * 1000), arr[:])
94+
6095
def test_parallel_append(self):
96+
n = 99
6197

6298
# setup
63-
arr = self.create_array(shape=1000, chunks=100, dtype='i4')
99+
arr = self.create_array(shape=1000, chunks=999, dtype='i4')
64100
arr[:] = 0
65-
pool = self.create_pool(cpu_count())
101+
pool = self.create_pool()
66102

67-
results = pool.map_async(_append_data, zip([arr] * 39, range(1, 40, 1)))
103+
# parallel append
104+
results = pool.map_async(_append, zip([arr] * n, range(n)))
68105
print(results.get())
69106

70-
eq((40000,), arr.shape)
107+
eq(((n+1)*1000,), arr.shape)
71108

72109

73-
class TestThreadSynchronizedArray(TestArray, MixinArraySyncTests):
110+
class TestArrayWithThreadSynchronizer(TestArray, MixinArraySyncTests):
74111

75112
def create_array(self, read_only=False, **kwargs):
76113
store = dict()
@@ -93,17 +130,17 @@ def test_repr(self):
93130
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
94131
eq(l1, l2)
95132

96-
def create_pool(self, size):
97-
pool = ThreadPool(size)
133+
def create_pool(self):
134+
pool = ThreadPool(cpu_count())
98135
return pool
99136

100137

101-
class TestProcessSynchronizedArray(TestArray, MixinArraySyncTests):
138+
class TestArrayWithProcessSynchronizer(TestArray, MixinArraySyncTests):
102139

103140
def create_array(self, read_only=False, **kwargs):
104141
store = TempStore()
105142
init_array(store, **kwargs)
106-
synchronizer = ProcessSynchronizer(tempfile.TemporaryDirectory().name)
143+
synchronizer = ProcessSynchronizer(TemporaryDirectory().name)
107144
return Array(store, synchronizer=synchronizer,
108145
read_only=read_only, cache_metadata=False)
109146

@@ -122,6 +159,113 @@ def test_repr(self):
122159
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
123160
eq(l1, l2)
124161

125-
def create_pool(self, size):
126-
pool = ProcessPool(size)
162+
def create_pool(self):
163+
pool = ProcessPool(cpu_count())
127164
return pool
165+
166+
167+
def _create_group(arg):
168+
g, name = arg
169+
h = g.create_group(name)
170+
return h.name
171+
172+
173+
def _require_group(arg):
174+
g, name = arg
175+
h = g.require_group(name)
176+
return h.name
177+
178+
179+
class MixinGroupSyncTests(object):
180+
181+
def test_parallel_create_group(self):
182+
183+
# setup
184+
g = self.create_group()
185+
pool = self.create_pool()
186+
187+
# parallel create group
188+
n = 1000
189+
results = pool.map_async(
190+
_create_group, zip([g] * n, [str(i) for i in range(n)]))
191+
print(results.get())
192+
193+
eq(n, len(g))
194+
195+
def test_parallel_require_group(self):
196+
197+
# setup
198+
g = self.create_group()
199+
pool = self.create_pool()
200+
201+
# parallel require group
202+
n = 1000
203+
results = pool.map_async(
204+
_require_group, zip([g] * n, [str(i//10) for i in range(n)]))
205+
print(results.get())
206+
207+
eq(n//10, len(g))
208+
209+
210+
class TestGroupWithThreadSynchronizer(TestGroup, MixinGroupSyncTests):
211+
212+
def create_group(self, store=None, path=None, read_only=False,
213+
chunk_store=None, synchronizer=None):
214+
if store is None:
215+
store, chunk_store = self.create_store()
216+
init_group(store, path=path, chunk_store=chunk_store)
217+
synchronizer = ThreadSynchronizer()
218+
g = Group(store, path=path, read_only=read_only,
219+
chunk_store=chunk_store, synchronizer=synchronizer)
220+
return g
221+
222+
def create_pool(self):
223+
pool = ThreadPool(cpu_count())
224+
return pool
225+
226+
def test_group_repr(self):
227+
if not PY2:
228+
g = self.create_group()
229+
expect = 'Group(/, 0)\n' \
230+
' store: dict; synchronizer: ThreadSynchronizer'
231+
actual = repr(g)
232+
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
233+
eq(l1, l2)
234+
235+
def test_synchronizer_property(self):
236+
g = self.create_group()
237+
assert_is_instance(g.synchronizer, ThreadSynchronizer)
238+
239+
240+
class TestGroupWithProcessSynchronizer(TestGroup, MixinGroupSyncTests):
241+
242+
def create_store(self):
243+
return TempStore(), None
244+
245+
def create_group(self, store=None, path=None, read_only=False,
246+
chunk_store=None, synchronizer=None):
247+
if store is None:
248+
store = TempStore()
249+
chunk_store = None
250+
init_group(store, path=path, chunk_store=chunk_store)
251+
synchronizer = ProcessSynchronizer(TemporaryDirectory().name)
252+
g = Group(store, path=path, read_only=read_only,
253+
synchronizer=synchronizer, chunk_store=chunk_store)
254+
return g
255+
256+
def create_pool(self):
257+
pool = ProcessPool(cpu_count())
258+
return pool
259+
260+
def test_group_repr(self):
261+
if not PY2:
262+
g = self.create_group()
263+
expect = 'Group(/, 0)\n' \
264+
' store: TempStore; synchronizer: ProcessSynchronizer'
265+
actual = repr(g)
266+
for l1, l2 in zip(expect.split('\n'), actual.split('\n')):
267+
eq(l1, l2)
268+
269+
def test_synchronizer_property(self):
270+
g = self.create_group()
271+
assert_is_instance(g.synchronizer, ProcessSynchronizer)

0 commit comments

Comments
 (0)