Skip to content

Commit 71b7d61

Browse files
authored
Merge pull request #162 from pycompression/specialcaseonethread
Write a specialized threaded compressor and special case writing with one thread
2 parents 8c5980d + e8fd360 commit 71b7d61

File tree

5 files changed

+327
-52
lines changed

5 files changed

+327
-52
lines changed

CHANGELOG.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,14 @@ Changelog
77
.. This document is user facing. Please word the changes in such a way
88
.. that users understand how the changes affect the new version.
99
10+
version 1.5.0-dev
11+
-----------------
12+
+ Make a special case for threads==1 in ``igzip_threaded.open`` for writing
13+
files. This now combines the writing and compression thread for less
14+
overhead.
15+
+ Write a specialized function for compressing blocks in a threaded fashion.
16+
This function maximizes time spent outside the GIL.
17+
1018
version 1.4.1
1119
-----------------
1220
+ Fix several errors related to unclosed files and buffers.

src/isal/igzip_threaded.py

Lines changed: 94 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020

2121

2222
def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
23-
encoding=None, errors=None, newline=None, *, threads=1):
23+
encoding=None, errors=None, newline=None, *, threads=1,
24+
block_size=1024 * 1024):
2425
"""
2526
Utilize threads to read and write gzip objects and escape the GIL.
2627
Comparable to gzip.open. This method is only usable for streamed reading
@@ -39,6 +40,8 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
3940
:param threads: If 0 will defer to igzip.open, if < 0 will use all threads
4041
available to the system. Reading gzip can only
4142
use one thread.
43+
:param block_size: Determines how large the blocks in the read/write
44+
queues are for threaded reading and writing.
4245
:return: An io.BufferedReader, io.BufferedWriter, or io.TextIOWrapper,
4346
depending on the mode.
4447
"""
@@ -61,21 +64,31 @@ def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
6164
else:
6265
raise TypeError("filename must be a str or bytes object, or a file")
6366
if "r" in mode:
64-
gzip_file = io.BufferedReader(_ThreadedGzipReader(binary_file))
67+
gzip_file = io.BufferedReader(
68+
_ThreadedGzipReader(binary_file, block_size=block_size))
6569
else:
70+
# Deflating random data results in an output a little larger than the
71+
# input. Making the output buffer 10% larger is sufficient overkill.
72+
compress_buffer_size = block_size + max(
73+
block_size // 10, 500)
6674
gzip_file = io.BufferedWriter(
67-
_ThreadedGzipWriter(binary_file, compresslevel, threads),
68-
buffer_size=1024 * 1024
75+
_ThreadedGzipWriter(
76+
fp=binary_file,
77+
buffer_size=compress_buffer_size,
78+
level=compresslevel,
79+
threads=threads
80+
),
81+
buffer_size=block_size
6982
)
7083
if "t" in mode:
7184
return io.TextIOWrapper(gzip_file, encoding, errors, newline)
7285
return gzip_file
7386

7487

7588
class _ThreadedGzipReader(io.RawIOBase):
76-
def __init__(self, fp, queue_size=4, block_size=8 * 1024 * 1024):
89+
def __init__(self, fp, queue_size=2, block_size=1024 * 1024):
7790
self.raw = fp
78-
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * 1024 * 1024)
91+
self.fileobj = igzip._IGzipReader(fp, buffersize=8 * block_size)
7992
self.pos = 0
8093
self.read_file = False
8194
self.queue = queue.Queue(queue_size)
@@ -179,35 +192,49 @@ class _ThreadedGzipWriter(io.RawIOBase):
179192
180193
The writer thread reads from output queues and uses the crc32_combine
181194
function to calculate the total crc. It also writes the compressed block.
195+
196+
When only one thread is requested, only the input queue is used and
197+
compressing and output is handled in one thread.
182198
"""
183199
def __init__(self,
184200
fp: BinaryIO,
185201
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
186202
threads: int = 1,
187-
queue_size: int = 2):
188-
if level < 0 or level > 3:
189-
raise ValueError(
190-
f"Invalid compression level, "
191-
f"level should be between 0 and 3: {level}")
203+
queue_size: int = 1,
204+
buffer_size: int = 1024 * 1024,
205+
):
192206
self.lock = threading.Lock()
193207
self.exception: Optional[Exception] = None
194208
self.raw = fp
195209
self.level = level
196210
self.previous_block = b""
197-
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
198-
queue.Queue(queue_size) for _ in range(threads)]
199-
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
200-
queue.Queue(queue_size) for _ in range(threads)]
201-
self.index = 0
211+
self.compressors: List[isal_zlib._ParallelCompress] = [
212+
isal_zlib._ParallelCompress(buffersize=buffer_size,
213+
level=level) for _ in range(threads)
214+
]
215+
if threads > 1:
216+
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
217+
queue.Queue(queue_size) for _ in range(threads)]
218+
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
219+
queue.Queue(queue_size) for _ in range(threads)]
220+
self.output_worker = threading.Thread(target=self._write)
221+
self.compression_workers = [
222+
threading.Thread(target=self._compress, args=(i,))
223+
for i in range(threads)
224+
]
225+
elif threads == 1:
226+
self.input_queues = [queue.Queue(queue_size)]
227+
self.output_queues = []
228+
self.compression_workers = []
229+
self.output_worker = threading.Thread(
230+
target=self._compress_and_write)
231+
else:
232+
raise ValueError(f"threads should be at least 1, got {threads}")
202233
self.threads = threads
234+
self.index = 0
203235
self._crc = 0
204236
self.running = False
205237
self._size = 0
206-
self.output_worker = threading.Thread(target=self._write)
207-
self.compression_workers = [
208-
threading.Thread(target=self._compress, args=(i,))
209-
for i in range(threads)
210-
]
211238
self._closed = False
212239
self._write_gzip_header()
213240
self.start()
@@ -289,6 +316,7 @@ def closed(self) -> bool:
289316
def _compress(self, index: int):
290317
in_queue = self.input_queues[index]
291318
out_queue = self.output_queues[index]
319+
compressor: isal_zlib._ParallelCompress = self.compressors[index]
292320
while True:
293321
try:
294322
data, zdict = in_queue.get(timeout=0.05)
@@ -297,23 +325,11 @@ def _compress(self, index: int):
297325
return
298326
continue
299327
try:
300-
compressor = isal_zlib.compressobj(
301-
self.level, wbits=-15, zdict=zdict)
302-
compressed = compressor.compress(data) + compressor.flush(
303-
isal_zlib.Z_SYNC_FLUSH)
304-
crc = isal_zlib.crc32(data)
328+
compressed, crc = compressor.compress_and_crc(data, zdict)
305329
except Exception as e:
306-
with self.lock:
307-
self.exception = e
308-
# Abort everything and empty the queue
309-
in_queue.task_done()
310-
self.running = False
311-
while True:
312-
try:
313-
_ = in_queue.get(timeout=0.05)
314-
in_queue.task_done()
315-
except queue.Empty:
316-
return
330+
in_queue.task_done()
331+
self._set_error_and_empty_queue(e, in_queue)
332+
return
317333
data_length = len(data)
318334
out_queue.put((compressed, crc, data_length))
319335
in_queue.task_done()
@@ -341,5 +357,46 @@ def _write(self):
341357
output_queue.task_done()
342358
index += 1
343359

360+
def _compress_and_write(self):
361+
if not self.threads == 1:
362+
raise SystemError("Compress_and_write is for one thread only")
363+
fp = self.raw
364+
total_crc = 0
365+
size = 0
366+
in_queue = self.input_queues[0]
367+
compressor = self.compressors[0]
368+
while True:
369+
try:
370+
data, zdict = in_queue.get(timeout=0.05)
371+
except queue.Empty:
372+
if not self.running:
373+
self._crc = total_crc
374+
self._size = size
375+
return
376+
continue
377+
try:
378+
compressed, crc = compressor.compress_and_crc(data, zdict)
379+
except Exception as e:
380+
in_queue.task_done()
381+
self._set_error_and_empty_queue(e, in_queue)
382+
return
383+
data_length = len(data)
384+
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
385+
size += data_length
386+
fp.write(compressed)
387+
in_queue.task_done()
388+
389+
def _set_error_and_empty_queue(self, error, q):
390+
with self.lock:
391+
self.exception = error
392+
# Abort everything and empty the queue
393+
self.running = False
394+
while True:
395+
try:
396+
_ = q.get(timeout=0.05)
397+
q.task_done()
398+
except queue.Empty:
399+
return
400+
344401
def writable(self) -> bool:
345402
return True

src/isal/isal_zlib.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def adler32(__data, __value: int = 1) -> int: ...
3838
def crc32(__data, __value: int = 0) -> int: ...
3939
def crc32_combine(__crc1: int, __crc2: int, __crc2_length: int) -> int: ...
4040

41+
class _ParallelCompress:
42+
def __init__(self, buffersize: int, level: int): ...
43+
def compress_and_crc(self, __data, __zdict) -> typing.Tuple[bytes, int]: ...
44+
4145
def compress(__data,
4246
level: int = ISAL_DEFAULT_COMPRESSION,
4347
wbits: int = MAX_WBITS) -> bytes: ...

0 commit comments

Comments
 (0)