|
1 | 1 | import itertools |
2 | 2 |
|
3 | 3 | import numpy as np |
| 4 | +import subprocess |
4 | 5 | import pytest |
5 | 6 |
|
6 | 7 | try: |
@@ -96,22 +97,66 @@ def test_streaming_decompression(): |
96 | 97 | # Test input frames with unknown frame content size |
97 | 98 | codec = Zstd() |
98 | 99 |
|
| 100 | + # If the zstd command line interface is available, check the bytes |
| 101 | + cli = zstd_cli_available() |
| 102 | + |
99 | 103 | # Encode bytes directly that were the result of streaming compression |
100 | 104 | bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!' |
101 | 105 | dec = codec.decode(bytes_val) |
102 | | - assert dec == b'Hello World!' |
| 106 | + dec_expected = b'Hello World!' |
| 107 | + assert dec == dec_expected |
| 108 | + if cli: |
| 109 | + assert bytes_val == generate_zstd_streaming_bytes(dec_expected) |
| 110 | + assert dec_expected == generate_zstd_streaming_bytes(bytes_val, decompress=True) |
103 | 111 |
|
104 | 112 | # Two consecutive frames given as input |
105 | 113 | bytes2 = bytes(bytearray(bytes_val * 2)) |
106 | 114 | dec2 = codec.decode(bytes2) |
107 | | - assert dec2 == b'Hello World!Hello World!' |
| 115 | + dec2_expected = b'Hello World!Hello World!' |
| 116 | + assert dec2 == dec2_expected |
| 117 | + if cli: |
| 118 | + assert dec2_expected == generate_zstd_streaming_bytes(bytes2, decompress=True) |
108 | 119 |
|
109 | 120 | # Single long frame that decompresses to a large output |
110 | 121 | bytes3 = b'(\xb5/\xfd\x00X$\x02\x00\xa4\x03ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz\x01\x00:\xfc\xdfs\x05\x05L\x00\x00\x08s\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08k\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08c\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08[\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08S\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08K\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08C\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08u\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08m\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08e\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08]\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08U\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08M\x01\x00\xfc\xff9\x10\x02M\x00\x00\x08E\x01\x00\xfc\x7f\x1d\x08\x01' |
111 | 122 | dec3 = codec.decode(bytes3) |
112 | | - assert dec3 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 |
| 123 | + dec3_expected = b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 |
| 124 | + assert dec3 == dec3_expected |
| 125 | + if cli: |
| 126 | + assert bytes3 == generate_zstd_streaming_bytes(dec3_expected) |
| 127 | + assert dec3_expected == generate_zstd_streaming_bytes(bytes3, decompress=True) |
113 | 128 |
|
114 | 129 | # Garbage input results in an error |
115 | 130 | bytes4 = bytes(bytearray([0, 0, 0, 0, 0, 0, 0, 0])) |
116 | 131 | with pytest.raises(RuntimeError, match='Zstd decompression error: invalid input data'): |
117 | 132 | codec.decode(bytes4) |
| 133 | + |
| 134 | +def generate_zstd_streaming_bytes(input: bytes, *, decompress: bool =False) -> bytes: |
| 135 | + """ |
| 136 | + Use the zstd command line interface to compress or decompress bytes in streaming mode. |
| 137 | + """ |
| 138 | + if decompress: |
| 139 | + args = ["-d"] |
| 140 | + else: |
| 141 | + args = [] |
| 142 | + |
| 143 | + p = subprocess.run( |
| 144 | + ["zstd", "--no-check", *args], |
| 145 | + input=input, |
| 146 | + capture_output=True |
| 147 | + ) |
| 148 | + return p.stdout |
| 149 | + |
| 150 | +def view_zstd_streaming_bytes(): |
| 151 | + bytes_val = generate_zstd_streaming_bytes(b"Hello world!") |
| 152 | + print(f" bytes_val = {bytes_val}") |
| 153 | + |
| 154 | + bytes3 = generate_zstd_streaming_bytes(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz" * 1024 * 32) |
| 155 | + print(f" bytes3 = {bytes3}") |
| 156 | + |
| 157 | +def zstd_cli_available() -> bool: |
| 158 | + return not subprocess.run( |
| 159 | + ["zstd", "-V"], |
| 160 | + stdout=subprocess.DEVNULL, |
| 161 | + stderr=subprocess.DEVNULL |
| 162 | + ).returncode |
0 commit comments