|
| 1 | +import itertools |
| 2 | +import unittest |
| 3 | + |
| 4 | +from test.support import import_helper, threading_helper |
| 5 | +from test.support.threading_helper import run_concurrently |
| 6 | + |
| 7 | +zlib = import_helper.import_module("zlib") |
| 8 | + |
| 9 | +from test.test_zlib import HAMLET_SCENE |
| 10 | + |
| 11 | + |
| 12 | +NTHREADS = 10 |
| 13 | + |
| 14 | + |
| 15 | +@threading_helper.requires_working_threading() |
| 16 | +class TestZlib(unittest.TestCase): |
| 17 | + def test_compressor(self): |
| 18 | + comp = zlib.compressobj() |
| 19 | + |
| 20 | + # First compress() outputs zlib header |
| 21 | + header = comp.compress(HAMLET_SCENE) |
| 22 | + self.assertGreater(len(header), 0) |
| 23 | + |
| 24 | + def worker(): |
| 25 | + # it should return empty bytes as it buffers data internally |
| 26 | + data = comp.compress(HAMLET_SCENE) |
| 27 | + self.assertEqual(data, b"") |
| 28 | + |
| 29 | + run_concurrently(worker_func=worker, nthreads=NTHREADS - 1) |
| 30 | + full_compressed = header + comp.flush() |
| 31 | + decompressed = zlib.decompress(full_compressed) |
| 32 | + # The decompressed data should be HAMLET_SCENE repeated NTHREADS times |
| 33 | + self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) |
| 34 | + |
| 35 | + def test_decompressor_concurrent_attribute_reads(self): |
| 36 | + input_data = HAMLET_SCENE * NTHREADS |
| 37 | + compressed = zlib.compress(input_data) |
| 38 | + |
| 39 | + decomp = zlib.decompressobj() |
| 40 | + decomp_size_per_loop = len(input_data) // 1000 |
| 41 | + decompressed_parts = [] |
| 42 | + |
| 43 | + def decomp_worker(): |
| 44 | + # Decompress in chunks, which updates eof, unused_data, unconsumed_tail |
| 45 | + decompressed_parts.append( |
| 46 | + decomp.decompress(compressed, decomp_size_per_loop) |
| 47 | + ) |
| 48 | + while decomp.unconsumed_tail: |
| 49 | + decompressed_parts.append( |
| 50 | + decomp.decompress( |
| 51 | + decomp.unconsumed_tail, decomp_size_per_loop |
| 52 | + ) |
| 53 | + ) |
| 54 | + |
| 55 | + def decomp_attr_reader(): |
| 56 | + # Read attributes concurrently while another thread decompresses |
| 57 | + for _ in range(1000): |
| 58 | + _ = decomp.unused_data |
| 59 | + _ = decomp.unconsumed_tail |
| 60 | + _ = decomp.eof |
| 61 | + |
| 62 | + counter = itertools.count() |
| 63 | + |
| 64 | + def worker(): |
| 65 | + # First thread decompresses, others read attributes |
| 66 | + if next(counter) == 0: |
| 67 | + decomp_worker() |
| 68 | + else: |
| 69 | + decomp_attr_reader() |
| 70 | + |
| 71 | + run_concurrently(worker_func=worker, nthreads=NTHREADS) |
| 72 | + |
| 73 | + self.assertTrue(decomp.eof) |
| 74 | + self.assertEqual(decomp.unused_data, b"") |
| 75 | + decompressed = b"".join(decompressed_parts) |
| 76 | + self.assertEqual(decompressed, HAMLET_SCENE * NTHREADS) |
| 77 | + |
| 78 | + |
| 79 | +if __name__ == "__main__": |
| 80 | + unittest.main() |
0 commit comments