|
25 | 25 | import gzip
|
26 | 26 | import io
|
27 | 27 | import os
|
| 28 | +import re |
28 | 29 | import shutil
|
29 | 30 | import sys
|
30 | 31 | import tempfile
|
| 32 | +import zlib |
31 | 33 | from pathlib import Path
|
32 | 34 |
|
33 | 35 | from isal import igzip
|
@@ -145,3 +147,70 @@ def test_compress_infile_stdout(capsysbinary, tmp_path):
|
145 | 147 | out, err = capsysbinary.readouterr()
|
146 | 148 | assert gzip.decompress(out) == DATA
|
147 | 149 | assert err == b''
|
| 150 | + |
| 151 | + |
| 152 | +def test_decompress(): |
| 153 | + assert igzip.decompress(COMPRESSED_DATA) == DATA |
| 154 | + |
| 155 | + |
| 156 | +def test_decompress_concatenated(): |
| 157 | + assert igzip.decompress(COMPRESSED_DATA + COMPRESSED_DATA) == DATA + DATA |
| 158 | + |
| 159 | + |
| 160 | +def test_decompress_concatenated_with_nulls(): |
| 161 | + data = COMPRESSED_DATA + b"\x00\00\x00" + COMPRESSED_DATA |
| 162 | + assert igzip.decompress(data) == DATA + DATA |
| 163 | + |
| 164 | + |
| 165 | +def test_decompress_missing_trailer(): |
| 166 | + with pytest.raises(EOFError) as error: |
| 167 | + igzip.decompress(COMPRESSED_DATA[:-8]) |
| 168 | + error.match("Compressed file ended before the end-of-stream marker was " |
| 169 | + "reached") |
| 170 | + |
| 171 | + |
| 172 | +def test_decompress_truncated_trailer(): |
| 173 | + with pytest.raises(EOFError) as error: |
| 174 | + igzip.decompress(COMPRESSED_DATA[:-4]) |
| 175 | + error.match("Compressed file ended before the end-of-stream marker was " |
| 176 | + "reached") |
| 177 | + |
| 178 | + |
| 179 | +def test_decompress_incorrect_length(): |
| 180 | + fake_length = 27890 |
| 181 | + # Assure our test is not bogus |
| 182 | + assert fake_length != len(DATA) |
| 183 | + incorrect_length_trailer = fake_length.to_bytes(4, "little", signed=False) |
| 184 | + corrupted_data = COMPRESSED_DATA[:-4] + incorrect_length_trailer |
| 185 | + with pytest.raises(igzip.BadGzipFile) as error: |
| 186 | + igzip.decompress(corrupted_data) |
| 187 | + error.match("Incorrect length of data produced") |
| 188 | + |
| 189 | + |
| 190 | +def test_decompress_incorrect_checksum(): |
| 191 | + # Create a wrong checksum by using a non-default seed. |
| 192 | + wrong_checksum = zlib.crc32(DATA, 50) |
| 193 | + wrong_crc_bytes = wrong_checksum.to_bytes(4, "little", signed=False) |
| 194 | + corrupted_data = (COMPRESSED_DATA[:-8] + |
| 195 | + wrong_crc_bytes + |
| 196 | + COMPRESSED_DATA[-4:]) |
| 197 | + with pytest.raises(igzip.BadGzipFile) as error: |
| 198 | + igzip.decompress(corrupted_data) |
| 199 | + error.match("CRC check failed") |
| 200 | + |
| 201 | + |
| 202 | +def test_decompress_not_a_gzip(): |
| 203 | + with pytest.raises(igzip.BadGzipFile) as error: |
| 204 | + igzip.decompress(b"This is not a gzip data stream.") |
| 205 | + assert error.match(re.escape("Not a gzipped file (b'Th')")) |
| 206 | + |
| 207 | + |
| 208 | +def test_decompress_unknown_compression_method(): |
| 209 | + corrupted_data = COMPRESSED_DATA[:2] + b'\x09' + COMPRESSED_DATA[3:] |
| 210 | + with pytest.raises(igzip.BadGzipFile) as error: |
| 211 | + igzip.decompress(corrupted_data) |
| 212 | + assert error.match("Unknown compression method") |
| 213 | + |
| 214 | + |
| 215 | +def test_decompress_empty(): |
| 216 | + assert igzip.decompress(b"") == b"" |
0 commit comments