Skip to content

Commit baa189f

Browse files
authored
Merge pull request #213 from pycompression/fixflush
Fix flushing behaviour on threaded writing streams
2 parents 5461a6e + a670f98 commit baa189f

File tree

3 files changed

+43
-24
lines changed

3 files changed

+43
-24
lines changed

CHANGELOG.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Changelog
99
1010
version 1.7.1-dev
1111
-----------------
12+
+ Fix a bug where flushing files when writing in threaded mode did not work
13+
properly.
1214
+ Prevent threaded opening from blocking python exit when an error is thrown
1315
in the calling thread.
1416

src/isal/igzip_threaded.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
6060
gzip_file = io.BufferedReader(
6161
_ThreadedGzipReader(filename, block_size=block_size))
6262
else:
63-
gzip_file = io.BufferedWriter(
63+
gzip_file = FlushableBufferedWriter(
6464
_ThreadedGzipWriter(
6565
filename,
6666
mode.replace("t", "b"),
@@ -167,6 +167,12 @@ def closed(self) -> bool:
167167
return self._closed
168168

169169

170+
class FlushableBufferedWriter(io.BufferedWriter):
171+
def flush(self):
172+
super().flush()
173+
self.raw.flush()
174+
175+
170176
class _ThreadedGzipWriter(io.RawIOBase):
171177
"""
172178
Write a gzip file using multiple threads.
@@ -310,30 +316,35 @@ def write(self, b) -> int:
310316
self.input_queues[worker_index].put((data, zdict))
311317
return len(data)
312318

313-
def flush(self):
319+
def _end_gzip_stream(self):
314320
self._check_closed()
315321
# Wait for all data to be compressed
316322
for in_q in self.input_queues:
317323
in_q.join()
318324
# Wait for all data to be written
319325
for out_q in self.output_queues:
320326
out_q.join()
327+
# Write an empty deflate block with a lost block marker.
328+
self.raw.write(isal_zlib.compress(b"", wbits=-15))
329+
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
330+
self.raw.write(trailer)
331+
self._crc = 0
332+
self._size = 0
321333
self.raw.flush()
322334

335+
def flush(self):
336+
self._end_gzip_stream()
337+
self._write_gzip_header()
338+
323339
def close(self) -> None:
324340
if self._closed:
325341
return
326-
self.flush()
342+
self._end_gzip_stream()
327343
self.stop()
328344
if self.exception:
329345
self.raw.close()
330346
self._closed = True
331347
raise self.exception
332-
# Write an empty deflate block with a lost block marker.
333-
self.raw.write(isal_zlib.compress(b"", wbits=-15))
334-
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
335-
self.raw.write(trailer)
336-
self.raw.flush()
337348
if self.closefd:
338349
self.raw.close()
339350
self._closed = True
@@ -366,41 +377,31 @@ def _compress(self, index: int):
366377
def _write(self):
367378
index = 0
368379
output_queues = self.output_queues
369-
fp = self.raw
370-
total_crc = 0
371-
size = 0
372380
while True:
373381
out_index = index % self.threads
374382
output_queue = output_queues[out_index]
375383
try:
376384
compressed, crc, data_length = output_queue.get(timeout=0.05)
377385
except queue.Empty:
378386
if not (self.running and self._calling_thread.is_alive()):
379-
self._crc = total_crc
380-
self._size = size
381387
return
382388
continue
383-
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
384-
size += data_length
385-
fp.write(compressed)
389+
self._crc = isal_zlib.crc32_combine(self._crc, crc, data_length)
390+
self._size += data_length
391+
self.raw.write(compressed)
386392
output_queue.task_done()
387393
index += 1
388394

389395
def _compress_and_write(self):
390396
if not self.threads == 1:
391397
raise SystemError("Compress_and_write is for one thread only")
392-
fp = self.raw
393-
total_crc = 0
394-
size = 0
395398
in_queue = self.input_queues[0]
396399
compressor = self.compressors[0]
397400
while True:
398401
try:
399402
data, zdict = in_queue.get(timeout=0.05)
400403
except queue.Empty:
401404
if not (self.running and self._calling_thread.is_alive()):
402-
self._crc = total_crc
403-
self._size = size
404405
return
405406
continue
406407
try:
@@ -410,9 +411,9 @@ def _compress_and_write(self):
410411
self._set_error_and_empty_queue(e, in_queue)
411412
return
412413
data_length = len(data)
413-
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
414-
size += data_length
415-
fp.write(compressed)
414+
self._crc = isal_zlib.crc32_combine(self._crc, crc, data_length)
415+
self._size += data_length
416+
self.raw.write(compressed)
416417
in_queue.task_done()
417418

418419
def _set_error_and_empty_queue(self, error, q):

tests/test_igzip_threaded.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,3 +239,19 @@ def test_threaded_program_can_exit_on_error(tmp_path, mode, threads):
239239
)
240240
f.write("raise Exception('Error')\n")
241241
subprocess.run([sys.executable, str(program)])
242+
243+
244+
@pytest.mark.parametrize("threads", [1, 2])
245+
def test_flush(tmp_path, threads):
246+
test_file = tmp_path / "output.gz"
247+
with igzip_threaded.open(test_file, "wb", threads=threads) as f:
248+
f.write(b"1")
249+
f.flush()
250+
assert gzip.decompress(test_file.read_bytes()) == b"1"
251+
f.write(b"2")
252+
f.flush()
253+
assert gzip.decompress(test_file.read_bytes()) == b"12"
254+
f.write(b"3")
255+
f.flush()
256+
assert gzip.decompress(test_file.read_bytes()) == b"123"
257+
assert gzip.decompress(test_file.read_bytes()) == b"123"

0 commit comments

Comments
 (0)