15
15
import re
16
16
import shutil
17
17
import struct
18
+ import subprocess
18
19
import sys
19
20
import tempfile
20
21
import zlib
28
29
DATA = b'This is a simple test with igzip'
29
30
COMPRESSED_DATA = gzip .compress (DATA )
30
31
TEST_FILE = str ((Path (__file__ ).parent / "data" / "test.fastq.gz" ))
32
+ PYPY = sys .implementation .name == "pypy"
33
+
34
+
35
+ def run_isal_igzip (* args , stdin = None ):
36
+ """Calling isal.igzip externally seems to solve some issues on PyPy where
37
+ files would not be written properly when igzip.main() was called. This is
38
+ probably due to some out of order execution that PyPy tries to pull.
39
+ Running the process externally is detrimental to the coverage report,
40
+ so this is only done for PyPy."""
41
+ process = subprocess .Popen (["python" , "-m" , "isal.igzip" , * args ],
42
+ stdout = subprocess .PIPE ,
43
+ stderr = subprocess .PIPE ,
44
+ stdin = subprocess .PIPE )
45
+
46
+ return process .communicate (stdin )
31
47
32
48
33
49
def test_wrong_compresslevel_igzipfile ():
@@ -112,10 +128,13 @@ def test_decompress_infile_outfile(tmp_path, capsysbinary):
112
128
def test_compress_infile_outfile (tmp_path , capsysbinary ):
113
129
test_file = tmp_path / "test"
114
130
test_file .write_bytes (DATA )
115
- sys .argv = ['' , str (test_file )]
116
- igzip .main ()
131
+ if PYPY :
132
+ out , err = run_isal_igzip (str (test_file ))
133
+ else :
134
+ sys .argv = ['' , str (test_file )]
135
+ igzip .main ()
136
+ out , err = capsysbinary .readouterr ()
117
137
out_file = test_file .with_suffix (".gz" )
118
- out , err = capsysbinary .readouterr ()
119
138
assert err == b''
120
139
assert out == b''
121
140
assert out_file .exists ()
@@ -176,9 +195,13 @@ def test_compress_infile_out_file(tmp_path, capsysbinary):
176
195
test = tmp_path / "test"
177
196
test .write_bytes (DATA )
178
197
out_file = tmp_path / "compressed.gz"
179
- sys .argv = ['' , '-o' , str (out_file ), str (test )]
180
- igzip .main ()
181
- out , err = capsysbinary .readouterr ()
198
+ args = ['-o' , str (out_file ), str (test )]
199
+ if PYPY :
200
+ out , err = run_isal_igzip (* args )
201
+ else :
202
+ sys .argv = ['' , * args ]
203
+ igzip .main ()
204
+ out , err = capsysbinary .readouterr ()
182
205
assert gzip .decompress (out_file .read_bytes ()) == DATA
183
206
assert err == b''
184
207
assert out == b''
@@ -189,9 +212,13 @@ def test_compress_infile_out_file_force(tmp_path, capsysbinary):
189
212
test .write_bytes (DATA )
190
213
out_file = tmp_path / "compressed.gz"
191
214
out_file .touch ()
192
- sys .argv = ['' , '-f' , '-o' , str (out_file ), str (test )]
193
- igzip .main ()
194
- out , err = capsysbinary .readouterr ()
215
+ args = ['-f' , '-o' , str (out_file ), str (test )]
216
+ if PYPY :
217
+ out , err = run_isal_igzip (* args )
218
+ else :
219
+ sys .argv = ['' , * args ]
220
+ igzip .main ()
221
+ out , err = capsysbinary .readouterr ()
195
222
assert gzip .decompress (out_file .read_bytes ()) == DATA
196
223
assert err == b''
197
224
assert out == b''
@@ -234,23 +261,30 @@ def test_compress_infile_out_file_inmplicit_name_prompt_accept(
234
261
test .write_bytes (DATA )
235
262
out_file = tmp_path / "test.gz"
236
263
out_file .touch ()
237
- sys .argv = ['' , str (test )]
238
- mock_stdin = io .BytesIO (b"y" )
239
- sys .stdin = io .TextIOWrapper (mock_stdin )
240
- igzip .main ()
241
- out , err = capsysbinary .readouterr ()
242
- assert gzip .decompress (out_file .read_bytes ()) == DATA
264
+ if PYPY :
265
+ out , err = run_isal_igzip (str (test ), stdin = b"y\n " )
266
+ else :
267
+ sys .argv = ['' , str (test )]
268
+ mock_stdin = io .BytesIO (b"y" )
269
+ sys .stdin = io .TextIOWrapper (mock_stdin )
270
+ igzip .main ()
271
+ out , err = capsysbinary .readouterr ()
243
272
assert b"already exists; do you wish to overwrite" in out
244
273
assert err == b""
274
+ assert gzip .decompress (out_file .read_bytes ()) == DATA
245
275
246
276
247
277
def test_compress_infile_out_file_no_name (tmp_path , capsysbinary ):
248
278
test = tmp_path / "test"
249
279
test .write_bytes (DATA )
250
280
out_file = tmp_path / "compressed.gz"
251
- sys .argv = ['' , '-n' , '-o' , str (out_file ), str (test )]
252
- igzip .main ()
253
- out , err = capsysbinary .readouterr ()
281
+ args = ['-n' , '-o' , str (out_file ), str (test )]
282
+ if PYPY :
283
+ out , err = run_isal_igzip (* args )
284
+ else :
285
+ sys .argv = ['' , '-n' , '-o' , str (out_file ), str (test )]
286
+ igzip .main ()
287
+ out , err = capsysbinary .readouterr ()
254
288
output = out_file .read_bytes ()
255
289
assert gzip .decompress (output ) == DATA
256
290
assert err == b''
0 commit comments