7
7
8
8
import gzip
9
9
import io
10
+ import itertools
10
11
import random
11
12
import tempfile
12
13
from pathlib import Path
@@ -26,10 +27,11 @@ def test_threaded_read():
26
27
assert thread_data == data
27
28
28
29
29
- @pytest .mark .parametrize ("mode" , ["wb" , "wt" ])
30
- def test_threaded_write (mode ):
30
+ @pytest .mark .parametrize (["mode" , "threads" ],
31
+ itertools .product (["wb" , "wt" ], [1 , 3 ]))
32
+ def test_threaded_write (mode , threads ):
31
33
with tempfile .NamedTemporaryFile ("wb" , delete = False ) as tmp :
32
- with igzip_threaded .open (tmp , mode , threads = - 1 ) as out_file :
34
+ with igzip_threaded .open (tmp , mode , threads = threads ) as out_file :
33
35
gzip_open_mode = "rb" if "b" in mode else "rt"
34
36
with gzip .open (TEST_FILE , gzip_open_mode ) as in_file :
35
37
while True :
@@ -74,11 +76,13 @@ def test_threaded_read_error():
74
76
75
77
76
78
@pytest .mark .timeout (5 )
77
- def test_threaded_write_error ():
79
+ @pytest .mark .parametrize ("threads" , [1 , 3 ])
80
+ def test_threaded_write_error (threads ):
78
81
# parallel_deflate_and_crc method is called in a worker thread.
79
82
with pytest .raises (OverflowError ) as error :
80
83
with igzip_threaded .open (
81
- io .BytesIO (), "wb" , compresslevel = 3 ) as writer :
84
+ io .BytesIO (), "wb" , compresslevel = 3 , threads = threads
85
+ ) as writer :
82
86
writer .write (random .randbytes (1024 * 1024 * 50 ))
83
87
error .match ("Compressed output exceeds buffer size" )
84
88
@@ -92,8 +96,10 @@ def test_close_reader():
92
96
f .close ()
93
97
94
98
95
- def test_close_writer ():
96
- f = igzip_threaded ._ThreadedGzipWriter (io .BytesIO ())
99
+ @pytest .mark .parametrize ("threads" , [1 , 3 ])
100
+ def test_close_writer (threads ):
101
+ f = igzip_threaded ._ThreadedGzipWriter (
102
+ io .BytesIO (), threads = threads )
97
103
f .close ()
98
104
assert f .closed
99
105
# Make sure double closing does not raise errors
@@ -117,6 +123,13 @@ def test_writer_wrong_level():
117
123
error .match ("42" )
118
124
119
125
126
+ def test_writer_too_low_threads ():
127
+ with pytest .raises (ValueError ) as error :
128
+ igzip_threaded ._ThreadedGzipWriter (io .BytesIO (), threads = 0 )
129
+ error .match ("threads" )
130
+ error .match ("at least 1" )
131
+
132
+
120
133
def test_reader_read_after_close ():
121
134
with open (TEST_FILE , "rb" ) as test_f :
122
135
f = igzip_threaded ._ThreadedGzipReader (test_f )
@@ -126,8 +139,9 @@ def test_reader_read_after_close():
126
139
error .match ("closed" )
127
140
128
141
129
- def test_writer_write_after_close ():
130
- f = igzip_threaded ._ThreadedGzipWriter (io .BytesIO ())
142
+ @pytest .mark .parametrize ("threads" , [1 , 3 ])
143
+ def test_writer_write_after_close (threads ):
144
+ f = igzip_threaded ._ThreadedGzipWriter (io .BytesIO (), threads = threads )
131
145
f .close ()
132
146
with pytest .raises (ValueError ) as error :
133
147
f .write (b"abc" )
0 commit comments