20
20
from zarr .sync import ThreadSynchronizer , ProcessSynchronizer
21
21
from zarr .core import Array
22
22
from zarr .attrs import Attributes
23
- from zarr .storage import init_array , TempStore , init_group , atexit_rmtree
23
+ from zarr .storage import init_array , DirectoryStore , init_group , atexit_rmtree
24
24
from zarr .compat import PY2
25
25
from zarr .codecs import Zlib
26
26
from zarr .hierarchy import Group
@@ -79,31 +79,33 @@ def _set_arange(arg):
79
79
class MixinArraySyncTests (object ):
80
80
81
81
def test_parallel_setitem (self ):
82
- n = 99
82
+ n = 200
83
83
84
84
# setup
85
85
arr = self .create_array (shape = n * 1000 , chunks = 999 , dtype = 'i4' )
86
86
arr [:] = 0
87
87
pool = self .create_pool ()
88
88
89
89
# parallel setitem
90
- results = pool .map_async (_set_arange , zip ([arr ] * n , range (n )))
91
- print (results .get (20 ))
92
-
90
+ future = pool .map_async (_set_arange , zip ([arr ] * n , range (n )))
91
+ results = future .get (30 )
92
+ print (results )
93
+ eq (list (range (n )), sorted (results ))
93
94
assert_array_equal (np .arange (n * 1000 ), arr [:])
94
95
95
96
def test_parallel_append (self ):
96
- n = 99
97
+ n = 200
97
98
98
99
# setup
99
100
arr = self .create_array (shape = 1000 , chunks = 999 , dtype = 'i4' )
100
101
arr [:] = 0
101
102
pool = self .create_pool ()
102
103
103
104
# parallel append
104
- results = pool .map_async (_append , zip ([arr ] * n , range (n )))
105
- print (results .get (20 ))
106
-
105
+ future = pool .map_async (_append , zip ([arr ] * n , range (n )))
106
+ results = future .get (30 )
107
+ print (results )
108
+ eq ([((i + 2 )* 1000 ,) for i in range (n )], sorted (results ))
107
109
eq (((n + 1 )* 1000 ,), arr .shape )
108
110
109
111
@@ -138,7 +140,7 @@ def create_pool(self):
138
140
class TestArrayWithProcessSynchronizer (TestArray , MixinArraySyncTests ):
139
141
140
142
def create_array (self , read_only = False , ** kwargs ):
141
- store = TempStore ( )
143
+ store = DirectoryStore ( TemporaryDirectory (). name )
142
144
init_array (store , ** kwargs )
143
145
synchronizer = ProcessSynchronizer (TemporaryDirectory ().name )
144
146
return Array (store , synchronizer = synchronizer ,
@@ -153,7 +155,7 @@ def test_repr(self):
153
155
expect = """Array((100,), float32, chunks=(10,), order=C)
154
156
nbytes: 400; nbytes_stored: 245; ratio: 1.6; initialized: 0/10
155
157
compressor: Zlib(level=1)
156
- store: TempStore ; synchronizer: ProcessSynchronizer
158
+ store: DirectoryStore ; synchronizer: ProcessSynchronizer
157
159
"""
158
160
actual = repr (z )
159
161
for l1 , l2 in zip (expect .split ('\n ' ), actual .split ('\n ' )):
@@ -185,10 +187,10 @@ def test_parallel_create_group(self):
185
187
pool = self .create_pool ()
186
188
187
189
# parallel create group
188
- n = 1000
190
+ n = 100
189
191
results = pool .map_async (
190
192
_create_group , zip ([g ] * n , [str (i ) for i in range (n )]))
191
- print (results .get (20 ))
193
+ print (results .get (30 ))
192
194
193
195
eq (n , len (g ))
194
196
@@ -199,10 +201,10 @@ def test_parallel_require_group(self):
199
201
pool = self .create_pool ()
200
202
201
203
# parallel require group
202
- n = 1000
204
+ n = 100
203
205
results = pool .map_async (
204
206
_require_group , zip ([g ] * n , [str (i // 10 ) for i in range (n )]))
205
- print (results .get (20 ))
207
+ print (results .get (30 ))
206
208
207
209
eq (n // 10 , len (g ))
208
210
@@ -240,12 +242,12 @@ def test_synchronizer_property(self):
240
242
class TestGroupWithProcessSynchronizer (TestGroup , MixinGroupSyncTests ):
241
243
242
244
def create_store (self ):
243
- return TempStore ( ), None
245
+ return DirectoryStore ( TemporaryDirectory (). name ), None
244
246
245
247
def create_group (self , store = None , path = None , read_only = False ,
246
248
chunk_store = None , synchronizer = None ):
247
249
if store is None :
248
- store = TempStore ( )
250
+ store = DirectoryStore ( TemporaryDirectory (). name )
249
251
chunk_store = None
250
252
init_group (store , path = path , chunk_store = chunk_store )
251
253
synchronizer = ProcessSynchronizer (TemporaryDirectory ().name )
@@ -261,7 +263,7 @@ def test_group_repr(self):
261
263
if not PY2 :
262
264
g = self .create_group ()
263
265
expect = 'Group(/, 0)\n ' \
264
- ' store: TempStore ; synchronizer: ProcessSynchronizer'
266
+ ' store: DirectoryStore ; synchronizer: ProcessSynchronizer'
265
267
actual = repr (g )
266
268
for l1 , l2 in zip (expect .split ('\n ' ), actual .split ('\n ' )):
267
269
eq (l1 , l2 )
0 commit comments