4
4
import atexit
5
5
import json
6
6
import shutil
7
+ from multiprocessing .pool import ThreadPool , Pool as ProcessPool
8
+ from multiprocessing import cpu_count
9
+ import os
10
+ import tempfile
7
11
8
12
13
+ import numpy as np
9
14
from nose .tools import eq_ as eq
10
15
11
16
14
19
from zarr .sync import ThreadSynchronizer , ProcessSynchronizer
15
20
from zarr .core import Array
16
21
from zarr .attrs import Attributes
17
- from zarr .storage import init_array
22
+ from zarr .storage import init_array , TempStore
18
23
from zarr .compat import PY2
19
24
from zarr .codecs import Zlib
20
25
@@ -41,15 +46,36 @@ def init_attributes(self, store, read_only=False):
41
46
read_only = read_only )
42
47
43
48
44
- class TestThreadSynchronizedArray ( TestArray ):
49
+ class MixinArraySyncTests ( object ):
45
50
46
- def create_array (self , store = None , path = None , read_only = False ,
47
- chunk_store = None , ** kwargs ):
48
- if store is None :
49
- store = dict ()
50
- init_array (store , path = path , chunk_store = chunk_store , ** kwargs )
51
- return Array (store , path = path , synchronizer = ThreadSynchronizer (),
52
- read_only = read_only , chunk_store = chunk_store )
51
+ def test_parallel_append (self ):
52
+
53
+ # setup
54
+ arr = self .create_array (shape = 1000 , chunks = 100 , dtype = 'i4' )
55
+ arr [:] = 0
56
+ pool = self .create_pool (cpu_count ())
57
+
58
+ def f (i ):
59
+ x = np .empty (1000 , dtype = 'i4' )
60
+ x [:] = i
61
+ arr .append (x )
62
+
63
+ pool .map_async (f , range (1 , 40 , 1 ))
64
+
65
+ pool .close ()
66
+ pool .join ()
67
+ pool .terminate ()
68
+
69
+ eq ((40000 ,), arr .shape )
70
+
71
+
72
+ class TestThreadSynchronizedArray (TestArray , MixinArraySyncTests ):
73
+
74
+ def create_array (self , read_only = False , ** kwargs ):
75
+ store = dict ()
76
+ init_array (store , ** kwargs )
77
+ return Array (store , synchronizer = ThreadSynchronizer (),
78
+ read_only = read_only )
53
79
54
80
def test_repr (self ):
55
81
if not PY2 :
@@ -66,20 +92,19 @@ def test_repr(self):
66
92
for l1 , l2 in zip (expect .split ('\n ' ), actual .split ('\n ' )):
67
93
eq (l1 , l2 )
68
94
95
+ def create_pool (self , size ):
96
+ pool = ThreadPool (size )
97
+ return pool
69
98
70
- class TestProcessSynchronizedArray (TestArray ):
71
99
72
- def create_array (self , store = None , path = None , read_only = False ,
73
- chunk_store = None , ** kwargs ):
74
- if store is None :
75
- store = dict ()
76
- init_array (store , path = path , chunk_store = chunk_store , ** kwargs )
77
- sync_path = mkdtemp ()
78
- atexit .register (shutil .rmtree , sync_path )
79
- synchronizer = ProcessSynchronizer (sync_path )
80
- return Array (store , path = path , synchronizer = synchronizer ,
81
- read_only = read_only , chunk_store = chunk_store ,
82
- cache_metadata = False )
100
+ class TestProcessSynchronizedArray (TestArray , MixinArraySyncTests ):
101
+
102
+ def create_array (self , read_only = False , ** kwargs ):
103
+ store = TempStore ()
104
+ init_array (store , ** kwargs )
105
+ synchronizer = ProcessSynchronizer (tempfile .TemporaryDirectory ().name )
106
+ return Array (store , synchronizer = synchronizer ,
107
+ read_only = read_only , cache_metadata = False )
83
108
84
109
def test_repr (self ):
85
110
if not PY2 :
@@ -90,8 +115,12 @@ def test_repr(self):
90
115
expect = """Array((100,), float32, chunks=(10,), order=C)
91
116
nbytes: 400; nbytes_stored: 245; ratio: 1.6; initialized: 0/10
92
117
compressor: Zlib(level=1)
93
- store: dict ; synchronizer: ProcessSynchronizer
118
+ store: TempStore ; synchronizer: ProcessSynchronizer
94
119
"""
95
120
actual = repr (z )
96
121
for l1 , l2 in zip (expect .split ('\n ' ), actual .split ('\n ' )):
97
122
eq (l1 , l2 )
123
+
124
+ def create_pool (self , size ):
125
+ pool = ProcessPool (size )
126
+ return pool
0 commit comments