4
4
5
5
import pytest
6
6
from threading import Lock
7
-
8
- import threading
7
+ from multiprocessing .pool import ThreadPool
9
8
10
9
import dask .array as da
11
10
import dask .dataframe as dd
15
14
from dask .utils import tmpfile
16
15
from dask .local import get_sync
17
16
18
- from dask .dataframe .utils import assert_eq
17
+ from dask .dataframe .utils import assert_eq , is_categorical_dtype
19
18
20
19
21
20
####################
@@ -119,13 +118,14 @@ def test_from_array_with_record_dtype():
119
118
120
119
def test_from_bcolz_multiple_threads ():
121
120
bcolz = pytest .importorskip ('bcolz' )
121
+ pool = ThreadPool (processes = 5 )
122
122
123
- def check ():
123
+ def check (i ):
124
124
t = bcolz .ctable ([[1 , 2 , 3 ], [1. , 2. , 3. ], ['a' , 'b' , 'a' ]],
125
125
names = ['x' , 'y' , 'a' ])
126
126
d = dd .from_bcolz (t , chunksize = 2 )
127
127
assert d .npartitions == 2
128
- assert str (d .dtypes ['a' ]) == 'category'
128
+ assert is_categorical_dtype (d .dtypes ['a' ])
129
129
assert list (d .x .compute (get = get_sync )) == [1 , 2 , 3 ]
130
130
assert list (d .a .compute (get = get_sync )) == ['a' , 'b' , 'a' ]
131
131
@@ -139,14 +139,7 @@ def check():
139
139
assert (sorted (dd .from_bcolz (t , chunksize = 2 ).dask ) !=
140
140
sorted (dd .from_bcolz (t , chunksize = 3 ).dask ))
141
141
142
- threads = []
143
- for i in range (5 ):
144
- thread = threading .Thread (target = check )
145
- thread .start ()
146
- threads .append (thread )
147
-
148
- for thread in threads :
149
- thread .join ()
142
+ pool .map (check , range (5 ))
150
143
151
144
152
145
def test_from_bcolz ():
@@ -156,7 +149,7 @@ def test_from_bcolz():
156
149
names = ['x' , 'y' , 'a' ])
157
150
d = dd .from_bcolz (t , chunksize = 2 )
158
151
assert d .npartitions == 2
159
- assert str (d .dtypes ['a' ]) == 'category'
152
+ assert is_categorical_dtype (d .dtypes ['a' ])
160
153
assert list (d .x .compute (get = get_sync )) == [1 , 2 , 3 ]
161
154
assert list (d .a .compute (get = get_sync )) == ['a' , 'b' , 'a' ]
162
155
L = list (d .index .compute (get = get_sync ))
0 commit comments