Skip to content

Commit 424a5b9

Browse files
committed
Combine queues and threads for threads equals 1, for greater efficiency
1 parent dff95a3 commit 424a5b9

File tree

1 file changed

+64
-21
lines changed

1 file changed

+64
-21
lines changed

src/isal/igzip_threaded.py

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ class _ThreadedGzipWriter(io.RawIOBase):
192192

193193
The writer thread reads from output queues and uses the crc32_combine
194194
function to calculate the total crc. It also writes the compressed block.
195+
196+
When only one thread is requested, only the input queue is used and
197+
compressing and output is handled in one thread.
195198
"""
196199
def __init__(self,
197200
fp: BinaryIO,
@@ -208,20 +211,29 @@ def __init__(self,
208211
isal_zlib._ParallelCompress(buffersize=buffer_size,
209212
level=level) for _ in range(threads)
210213
]
211-
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
212-
queue.Queue(queue_size) for _ in range(threads)]
213-
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
214-
queue.Queue(queue_size) for _ in range(threads)]
215-
self.index = 0
214+
if threads > 1:
215+
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
216+
queue.Queue(queue_size) for _ in range(threads)]
217+
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
218+
queue.Queue(queue_size) for _ in range(threads)]
219+
self.output_worker = threading.Thread(target=self._write)
220+
self.compression_workers = [
221+
threading.Thread(target=self._compress, args=(i,))
222+
for i in range(threads)
223+
]
224+
elif threads == 1:
225+
self.input_queues = [queue.Queue(queue_size)]
226+
self.output_queues = []
227+
self.compression_workers = []
228+
self.output_worker = threading.Thread(
229+
target=self._compress_and_write)
230+
else:
231+
raise ValueError(f"threads should be 1 or greater, got {threads}")
216232
self.threads = threads
233+
self.index = 0
217234
self._crc = 0
218235
self.running = False
219236
self._size = 0
220-
self.output_worker = threading.Thread(target=self._write)
221-
self.compression_workers = [
222-
threading.Thread(target=self._compress, args=(i,))
223-
for i in range(threads)
224-
]
225237
self._closed = False
226238
self._write_gzip_header()
227239
self.start()
@@ -314,17 +326,9 @@ def _compress(self, index: int):
314326
try:
315327
compressed, crc = compressor.compress_and_crc(data, zdict)
316328
except Exception as e:
317-
with self.lock:
318-
self.exception = e
319-
# Abort everything and empty the queue
320-
in_queue.task_done()
321-
self.running = False
322-
while True:
323-
try:
324-
_ = in_queue.get(timeout=0.05)
325-
in_queue.task_done()
326-
except queue.Empty:
327-
return
329+
in_queue.task_done()
330+
self._set_error_and_empty_queue(e, in_queue)
331+
return
328332
data_length = len(data)
329333
out_queue.put((compressed, crc, data_length))
330334
in_queue.task_done()
@@ -352,5 +356,44 @@ def _write(self):
352356
output_queue.task_done()
353357
index += 1
354358

359+
def _compress_and_write(self):
360+
if not self.threads == 1:
361+
raise SystemError("Compress_and_write is for one thread only")
362+
fp = self.raw
363+
total_crc = 0
364+
size = 0
365+
in_queue = self.input_queues[0]
366+
compressor = self.compressors[0]
367+
while True:
368+
try:
369+
data, zdict = in_queue.get(timeout=0.05)
370+
except queue.Empty:
371+
if not self.running:
372+
return
373+
continue
374+
try:
375+
compressed, crc = compressor.compress_and_crc(data, zdict)
376+
except Exception as e:
377+
in_queue.task_done()
378+
self._set_error_and_empty_queue(e, in_queue)
379+
return
380+
data_length = len(data)
381+
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
382+
size += data_length
383+
fp.write(compressed)
384+
in_queue.task_done()
385+
386+
def _set_error_and_empty_queue(self, error, q):
387+
with self.lock:
388+
self.exception = error
389+
# Abort everything and empty the queue
390+
self.running = False
391+
while True:
392+
try:
393+
_ = q.get(timeout=0.05)
394+
q.task_done()
395+
except queue.Empty:
396+
return
397+
355398
def writable(self) -> bool:
356399
return True

0 commit comments

Comments
 (0)