From ea30d6f93f3dde7c773b1807b2444a845e641bcf Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:13:48 +0200 Subject: [PATCH 1/2] Reproduce faulty flushing behavior --- tests/test_igzip_threaded.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/tests/test_igzip_threaded.py b/tests/test_igzip_threaded.py index d211f81f..41c61bfe 100644 --- a/tests/test_igzip_threaded.py +++ b/tests/test_igzip_threaded.py @@ -239,3 +239,19 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads): ) f.write("raise Exception('Error')\n") subprocess.run([sys.executable, str(program)]) + + +@pytest.mark.parametrize("threads", [1, 2]) +def test_flush(tmp_path, threads): + test_file = tmp_path / "output.gz" + with igzip_threaded.open(test_file, "wb", threads=threads) as f: + f.write(b"1") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"1" + f.write(b"2") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"12" + f.write(b"3") + f.flush() + assert gzip.decompress(test_file.read_bytes()) == b"123" + assert gzip.decompress(test_file.read_bytes()) == b"123" From a670f984970c52f47babc8ba656311f81a150ea5 Mon Sep 17 00:00:00 2001 From: Ruben Vorderman Date: Wed, 25 Sep 2024 09:18:04 +0200 Subject: [PATCH 2/2] Make threaded writer streams flushable --- CHANGELOG.rst | 2 ++ src/isal/igzip_threaded.py | 49 +++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index fe22d86e..cdc01206 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -9,6 +9,8 @@ Changelog version 1.7.1-dev ----------------- ++ Fix a bug where flushing files when writing in threaded mode did not work + properly. + Prevent threaded opening from blocking python exit when an error is thrown in the calling thread. diff --git a/src/isal/igzip_threaded.py b/src/isal/igzip_threaded.py index da076e9b..7f1c94fc 100644 --- a/src/isal/igzip_threaded.py +++ b/src/isal/igzip_threaded.py @@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF, gzip_file = io.BufferedReader( _ThreadedGzipReader(filename, block_size=block_size)) else: - gzip_file = io.BufferedWriter( + gzip_file = FlushableBufferedWriter( _ThreadedGzipWriter( filename, mode.replace("t", "b"), @@ -167,6 +167,12 @@ def closed(self) -> bool: return self._closed +class FlushableBufferedWriter(io.BufferedWriter): + def flush(self): + super().flush() + self.raw.flush() + + class _ThreadedGzipWriter(io.RawIOBase): """ Write a gzip file using multiple threads. @@ -310,7 +316,7 @@ def write(self, b) -> int: self.input_queues[worker_index].put((data, zdict)) return len(data) - def flush(self): + def _end_gzip_stream(self): self._check_closed() # Wait for all data to be compressed for in_q in self.input_queues: @@ -318,22 +324,27 @@ def flush(self): # Wait for all data to be written for out_q in self.output_queues: out_q.join() + # Write an empty deflate block with a lost block marker. + self.raw.write(isal_zlib.compress(b"", wbits=-15)) + trailer = struct.pack(" None: if self._closed: return - self.flush() + self._end_gzip_stream() self.stop() if self.exception: self.raw.close() self._closed = True raise self.exception - # Write an empty deflate block with a lost block marker. - self.raw.write(isal_zlib.compress(b"", wbits=-15)) - trailer = struct.pack("