32
32
33
33
from isal import igzip
34
34
35
- import pytest
36
-
37
35
data1 = b""" int length=DEFAULTALLOC, err = Z_OK;
38
36
PyObject *RetVal;
39
37
int flushmode = Z_FINISH;
46
44
/* See http://www.winimage.com/zLibDll for Windows */
47
45
"""
48
46
49
-
50
47
TEMPDIR = os .path .abspath (tempfile .mkdtemp (suffix = '-gzdir' ))
51
48
52
49
@@ -76,10 +73,10 @@ def tearDown(self):
76
73
class TestGzip (BaseTest ):
77
74
def write_and_read_back (self , data , mode = 'b' ):
78
75
b_data = bytes (data )
79
- with igzip .IGzipFile (self .filename , 'w' + mode ) as f :
80
- l = f .write (data )
81
- self .assertEqual (l , len (b_data ))
82
- with igzip .IGzipFile (self .filename , 'r' + mode ) as f :
76
+ with igzip .IGzipFile (self .filename , 'w' + mode ) as f :
77
+ length = f .write (data )
78
+ self .assertEqual (length , len (b_data ))
79
+ with igzip .IGzipFile (self .filename , 'r' + mode ) as f :
83
80
self .assertEqual (f .read (), b_data )
84
81
85
82
def test_write (self ):
@@ -114,7 +111,7 @@ def test_write_read_with_pathlike_file(self):
114
111
def test_write_memoryview (self ):
115
112
self .write_and_read_back (memoryview (data1 * 50 ))
116
113
m = memoryview (bytes (range (256 )))
117
- data = m .cast ('B' , shape = [8 ,8 , 4 ])
114
+ data = m .cast ('B' , shape = [8 , 8 , 4 ])
118
115
self .write_and_read_back (data )
119
116
120
117
def test_write_bytearray (self ):
@@ -141,7 +138,7 @@ def test_read(self):
141
138
# Try reading.
142
139
with igzip .IGzipFile (self .filename , 'r' ) as f :
143
140
d = f .read ()
144
- self .assertEqual (d , data1 * 50 )
141
+ self .assertEqual (d , data1 * 50 )
145
142
146
143
def test_read1 (self ):
147
144
self .test_write ()
@@ -202,7 +199,7 @@ def test_append(self):
202
199
203
200
with igzip .IGzipFile (self .filename , 'rb' ) as f :
204
201
d = f .read ()
205
- self .assertEqual (d , (data1 * 50 ) + (data2 * 15 ))
202
+ self .assertEqual (d , (data1 * 50 ) + (data2 * 15 ))
206
203
207
204
def test_many_append (self ):
208
205
# Bug #1074261 was triggered when reading a file that contained
@@ -211,7 +208,7 @@ def test_many_append(self):
211
208
with igzip .IGzipFile (self .filename , 'wb' ) as f :
212
209
f .write (b'a' )
213
210
for i in range (0 , 200 ):
214
- with igzip .IGzipFile (self .filename , "ab" ) as f : # append
211
+ with igzip .IGzipFile (self .filename , "ab" ) as f : # append
215
212
f .write (b'a' )
216
213
217
214
# Try reading the file
@@ -220,8 +217,9 @@ def test_many_append(self):
220
217
while 1 :
221
218
ztxt = zgfile .read (8192 )
222
219
contents += ztxt
223
- if not ztxt : break
224
- self .assertEqual (contents , b'a' * 201 )
220
+ if not ztxt :
221
+ break
222
+ self .assertEqual (contents , b'a' * 201 )
225
223
226
224
def test_exclusive_write (self ):
227
225
with igzip .IGzipFile (self .filename , 'xb' ) as f :
@@ -250,7 +248,8 @@ def test_readline(self):
250
248
line_length = 0
251
249
while 1 :
252
250
L = f .readline (line_length )
253
- if not L and line_length != 0 : break
251
+ if not L and line_length != 0 :
252
+ break
254
253
self .assertTrue (len (L ) <= line_length )
255
254
line_length = (line_length + 1 ) % 50
256
255
@@ -264,7 +263,8 @@ def test_readlines(self):
264
263
with igzip .IGzipFile (self .filename , 'rb' ) as f :
265
264
while 1 :
266
265
L = f .readlines (150 )
267
- if L == []: break
266
+ if L == []:
267
+ break
268
268
269
269
def test_seek_read (self ):
270
270
self .test_write ()
@@ -274,10 +274,11 @@ def test_seek_read(self):
274
274
while 1 :
275
275
oldpos = f .tell ()
276
276
line1 = f .readline ()
277
- if not line1 : break
277
+ if not line1 :
278
+ break
278
279
newpos = f .tell ()
279
280
f .seek (oldpos ) # negative seek
280
- if len (line1 )> 10 :
281
+ if len (line1 ) > 10 :
281
282
amount = 10
282
283
else :
283
284
amount = len (line1 )
@@ -324,7 +325,7 @@ def test_paddedfile_getattr(self):
324
325
325
326
def test_mtime (self ):
326
327
mtime = 123456789
327
- with igzip .IGzipFile (self .filename , 'w' , mtime = mtime ) as fWrite :
328
+ with igzip .IGzipFile (self .filename , 'w' , mtime = mtime ) as fWrite :
328
329
fWrite .write (data1 )
329
330
with igzip .IGzipFile (self .filename ) as fRead :
330
331
self .assertTrue (hasattr (fRead , 'mtime' ))
@@ -336,22 +337,22 @@ def test_mtime(self):
336
337
def test_metadata (self ):
337
338
mtime = 123456789
338
339
339
- with igzip .IGzipFile (self .filename , 'w' , mtime = mtime ) as fWrite :
340
+ with igzip .IGzipFile (self .filename , 'w' , mtime = mtime ) as fWrite :
340
341
fWrite .write (data1 )
341
342
342
343
with open (self .filename , 'rb' ) as fRead :
343
344
# see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
344
345
345
346
idBytes = fRead .read (2 )
346
- self .assertEqual (idBytes , b'\x1f \x8b ' ) # igzip ID
347
+ self .assertEqual (idBytes , b'\x1f \x8b ' ) # igzip ID
347
348
348
349
cmByte = fRead .read (1 )
349
- self .assertEqual (cmByte , b'\x08 ' ) # deflate
350
+ self .assertEqual (cmByte , b'\x08 ' ) # deflate
350
351
351
352
try :
352
353
expectedname = os .path .basename (self .filename ).encode (
353
354
'Latin-1' ) + b'\x00 '
354
- expectedflags = b'\x08 ' # only the FNAME flag is set
355
+ expectedflags = b'\x08 ' # only the FNAME flag is set
355
356
except UnicodeEncodeError :
356
357
expectedname = b''
357
358
expectedflags = b'\x00 '
@@ -360,26 +361,28 @@ def test_metadata(self):
360
361
self .assertEqual (flagsByte , expectedflags )
361
362
362
363
mtimeBytes = fRead .read (4 )
363
- self .assertEqual (mtimeBytes , struct .pack ('<i' , mtime )) # little-endian
364
+ self .assertEqual (mtimeBytes ,
365
+ struct .pack ('<i' , mtime )) # little-endian
364
366
365
367
xflByte = fRead .read (1 )
366
- self .assertEqual (xflByte , b'\x02 ' ) # maximum compression
368
+ self .assertEqual (xflByte , b'\x02 ' ) # maximum compression
367
369
368
370
osByte = fRead .read (1 )
369
- self .assertEqual (osByte , b'\xff ' ) # OS "unknown" (OS-independent)
371
+ self .assertEqual (osByte , b'\xff ' ) # OS "unknown" (OS-independent)
370
372
371
- # Since the FNAME flag is set, the zero-terminated filename follows.
372
- # RFC 1952 specifies that this is the name of the input file, if any.
373
- # However, the igzip module defaults to storing the name of the output
374
- # file in this field.
373
+ # Since the FNAME flag is set, the zero-terminated filename
374
+ # follows. RFC 1952 specifies that this is the name of the input
375
+ # file, if any. However, the gzip module defaults to storing the
376
+ # name of the output file in this field.
375
377
nameBytes = fRead .read (len (expectedname ))
376
378
self .assertEqual (nameBytes , expectedname )
377
379
378
380
# Since no other flags were set, the header ends here.
379
- # Rather than process the compressed data, let's seek to the trailer.
381
+ # Rather than process the compressed data, let's seek to the
382
+ # trailer.
380
383
fRead .seek (os .stat (self .filename ).st_size - 8 )
381
384
382
- crc32Bytes = fRead .read (4 ) # CRC32 of uncompressed data [data1]
385
+ crc32Bytes = fRead .read (4 ) # CRC32 of uncompressed data [data1]
383
386
self .assertEqual (crc32Bytes , b'\xaf \xd7 d\x83 ' )
384
387
385
388
isizeBytes = fRead .read (4 )
@@ -401,11 +404,12 @@ def test_compresslevel_metadata(self):
401
404
402
405
for (name , level , expectedXflByte ) in cases :
403
406
major , minor , _ , _ , _ = sys .version_info
404
- if major == 3 and minor <= 7 or major < 3 :
407
+ if major == 3 and minor <= 7 or major < 3 :
405
408
# Specific xfl bytes introduced in 3.7
406
409
expectedXflByte = b'\x02 '
407
410
with self .subTest (name ):
408
- fWrite = igzip .IGzipFile (self .filename , 'w' , compresslevel = level )
411
+ fWrite = igzip .IGzipFile (self .filename , 'w' ,
412
+ compresslevel = level )
409
413
with fWrite :
410
414
fWrite .write (data1 )
411
415
with open (self .filename , 'rb' ) as fRead :
@@ -428,7 +432,7 @@ def test_with_open(self):
428
432
self .fail ("__enter__ on a closed file didn't raise an exception" )
429
433
try :
430
434
with igzip .IGzipFile (self .filename , "wb" ) as f :
431
- 1 / 0
435
+ 1 / 0
432
436
except ZeroDivisionError :
433
437
pass
434
438
else :
@@ -510,7 +514,7 @@ def test_fileobj_from_fdopen(self):
510
514
# fileobj created with os.fdopen().
511
515
fd = os .open (self .filename , os .O_WRONLY | os .O_CREAT )
512
516
with os .fdopen (fd , "wb" ) as f :
513
- with igzip .IGzipFile (fileobj = f , mode = "w" ) as g :
517
+ with igzip .IGzipFile (fileobj = f , mode = "w" ):
514
518
pass
515
519
516
520
def test_fileobj_mode (self ):
@@ -567,7 +571,7 @@ def test_decompress_limited(self):
567
571
self .assertEqual (decomp .read (1 ), b'\0 ' )
568
572
max_decomp = 1 + io .DEFAULT_BUFFER_SIZE
569
573
self .assertLessEqual (decomp ._buffer .raw .tell (), max_decomp ,
570
- "Excessive amount of data was decompressed" )
574
+ "Excessive amount of data was decompressed" )
571
575
572
576
# Testing compress/decompress shortcut functions
573
577
@@ -576,7 +580,8 @@ def test_compress(self):
576
580
for args in [(), (1 ,), (2 ,), (3 ,), (0 ,)]:
577
581
datac = igzip .compress (data , * args )
578
582
self .assertEqual (type (datac ), bytes )
579
- with igzip .IGzipFile (fileobj = io .BytesIO (datac ), mode = "rb" ) as f :
583
+ with igzip .IGzipFile (fileobj = io .BytesIO (datac ),
584
+ mode = "rb" ) as f :
580
585
self .assertEqual (f .read (), data )
581
586
582
587
def test_compress_mtime (self ):
@@ -586,8 +591,9 @@ def test_compress_mtime(self):
586
591
with self .subTest (data = data , args = args ):
587
592
datac = igzip .compress (data , * args , mtime = mtime )
588
593
self .assertEqual (type (datac ), bytes )
589
- with igzip .IGzipFile (fileobj = io .BytesIO (datac ), mode = "rb" ) as f :
590
- f .read (1 ) # to set mtime attribute
594
+ with igzip .IGzipFile (fileobj = io .BytesIO (datac ),
595
+ mode = "rb" ) as f :
596
+ f .read (1 ) # to set mtime attribute
591
597
self .assertEqual (f .mtime , mtime )
592
598
593
599
def test_decompress (self ):
@@ -601,7 +607,7 @@ def test_decompress(self):
601
607
self .assertEqual (igzip .decompress (datac ), data )
602
608
603
609
def test_read_truncated (self ):
604
- data = data1 * 50
610
+ data = data1 * 50
605
611
# Drop the CRC (4 bytes) and file size (4 bytes).
606
612
truncated = igzip .compress (data )[:- 8 ]
607
613
with igzip .IGzipFile (fileobj = io .BytesIO (truncated )) as f :
@@ -629,6 +635,7 @@ def test_prepend_error(self):
629
635
with igzip .open (self .filename , "rb" ) as f :
630
636
f ._buffer .raw ._fp .prepend ()
631
637
638
+
632
639
class TestOpen (BaseTest ):
633
640
def test_binary_modes (self ):
634
641
uncompressed = data1 * 50
@@ -752,8 +759,8 @@ def test_encoding_error_handler(self):
752
759
# Test with non-default encoding error handler.
753
760
with igzip .open (self .filename , "wb" ) as f :
754
761
f .write (b"foo\xff bar" )
755
- with igzip .open (self .filename , "rt" , encoding = "ascii" , errors = "ignore" ) \
756
- as f :
762
+ with igzip .open (self .filename , "rt" , encoding = "ascii" ,
763
+ errors = "ignore" ) as f :
757
764
self .assertEqual (f .read (), "foobar" )
758
765
759
766
def test_newline (self ):
@@ -774,7 +781,9 @@ def wrapper(*args, **kwargs):
774
781
return function (* args , ** kwargs )
775
782
finally :
776
783
shutil .rmtree (directory )
784
+
777
785
return wrapper
786
+
778
787
return decorator
779
788
780
789
@@ -811,7 +820,8 @@ def test_decompress_infile_outfile(self):
811
820
self .assertEqual (err , b'' )
812
821
813
822
def test_decompress_infile_outfile_error (self ):
814
- rc , out , err = assert_python_ok ('-m' , 'isal.igzip' , '-d' , 'thisisatest.out' )
823
+ rc , out , err = assert_python_ok ('-m' , 'isal.igzip' , '-d' ,
824
+ 'thisisatest.out' )
815
825
self .assertIn (b"filename doesn't end in .gz:" , out )
816
826
self .assertEqual (rc , 0 )
817
827
self .assertEqual (err , b'' )
@@ -851,7 +861,9 @@ def test_compress_infile_outfile(self):
851
861
with open (local_testigzip , 'wb' ) as fp :
852
862
fp .write (self .data )
853
863
854
- rc , out , err = assert_python_ok ('-m' , 'isal.igzip' , compress_level , local_testigzip )
864
+ rc , out , err = assert_python_ok ('-m' , 'isal.igzip' ,
865
+ compress_level ,
866
+ local_testigzip )
855
867
856
868
self .assertTrue (os .path .exists (igzipname ))
857
869
self .assertEqual (out , b'' )
@@ -860,13 +872,20 @@ def test_compress_infile_outfile(self):
860
872
self .assertFalse (os .path .exists (igzipname ))
861
873
862
874
def test_compress_fast_best_are_exclusive (self ):
863
- rc , out , err = assert_python_failure ('-m' , 'isal.igzip' , '--fast' , '--best' )
864
- self .assertIn (b"error: argument -3/--best: not allowed with argument -0/--fast" , err )
875
+ rc , out , err = assert_python_failure ('-m' , 'isal.igzip' , '--fast' ,
876
+ '--best' )
877
+ self .assertIn (
878
+ b"error: argument -3/--best: not allowed with argument -0/--fast" ,
879
+ err )
865
880
self .assertEqual (out , b'' )
866
881
867
882
def test_decompress_cannot_have_flags_compression (self ):
868
- rc , out , err = assert_python_failure ('-m' , 'isal.igzip' , '--fast' , '-d' )
869
- self .assertIn (b'error: argument -d/--decompress: not allowed with argument -0/--fast' , err )
883
+ rc , out , err = assert_python_failure ('-m' , 'isal.igzip' , '--fast' ,
884
+ '-d' )
885
+ self .assertIn (
886
+ b'error: argument -d/--decompress: not allowed with argument '
887
+ b'-0/--fast' ,
888
+ err )
870
889
self .assertEqual (out , b'' )
871
890
872
891
0 commit comments