Skip to content

Commit 3453a7a

Browse files
committed
Fully featured threaded open method
1 parent a070545 commit 3453a7a

File tree

2 files changed

+51
-20
lines changed

2 files changed

+51
-20
lines changed

src/isal/igzip_threaded.py

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,46 @@
11
import io
2+
import multiprocessing
3+
import os
24
import queue
35
import struct
46
import threading
5-
import typing
7+
from typing import BinaryIO, List, Tuple
68

79
from . import igzip, isal_zlib
810

911
DEFLATE_WINDOW_SIZE = 2 ** 15
1012

13+
1114
def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
1215
encoding=None, errors=None, newline=None, *, threads=-1):
13-
if threads == 0 or "w" in mode:
16+
if threads == 0:
1417
return igzip.open(filename, mode, compresslevel, encoding, errors,
1518
newline)
16-
if hasattr(filename, "read"):
17-
fp = filename
19+
elif threads < 0:
20+
try:
21+
threads = len(os.sched_getaffinity(0))
22+
except: # noqa: E722
23+
try:
24+
threads = multiprocessing.cpu_count()
25+
except: # noqa: E722
26+
threads = 1
27+
open_mode = mode.replace("t", "b")
28+
if isinstance(filename, (str, bytes)) or hasattr(filename, "__fspath__"):
29+
binary_file = io.open(filename, open_mode)
30+
elif hasattr(filename, "read") or hasattr(filename, "write"):
31+
binary_file = filename
32+
else:
33+
raise TypeError("filename must be a str or bytes object, or a file")
34+
if "r" in mode:
35+
gzip_file = io.BufferedReader(ThreadedGzipReader(binary_file))
1836
else:
19-
fp = io.open(filename, "rb")
20-
return io.BufferedReader(ThreadedGzipReader(fp))
37+
gzip_file = io.BufferedWriter(
38+
ThreadedGzipWriter(binary_file, compresslevel, threads),
39+
buffer_size=128 * 1024
40+
)
41+
if "t" in mode:
42+
return io.TextIOWrapper(gzip_file, encoding, errors, newline)
43+
return gzip_file
2144

2245

2346
class ThreadedGzipReader(io.RawIOBase):
@@ -86,21 +109,26 @@ def close(self) -> None:
86109
self.fileobj.close()
87110

88111

89-
class ThreadedWriter(io.RawIOBase):
90-
def __init__(self, fp: typing.BinaryIO, level: int=isal_zlib.ISAL_DEFAULT_COMPRESSION,
91-
threads: int=1,
92-
queue_size=2):
112+
class ThreadedGzipWriter(io.RawIOBase):
113+
def __init__(self,
114+
fp: BinaryIO,
115+
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
116+
threads: int = 1,
117+
queue_size: int = 2):
93118
self.raw = fp
94119
self.level = level
95120
self.previous_block = b""
96-
self.crc_queue = queue.Queue(maxsize=threads * queue_size)
97-
self.input_queues = [queue.Queue(queue_size) for _ in range(threads)]
98-
self.output_queues = [queue.Queue(queue_size) for _ in range(threads)]
121+
self.crc_queue: queue.Queue[bytes] = queue.Queue(
122+
maxsize=threads * queue_size)
123+
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
124+
queue.Queue(queue_size) for _ in range(threads)]
125+
self.output_queues: List[queue.Queue[bytes]] = [
126+
queue.Queue(queue_size) for _ in range(threads)]
99127
self.index = 0
100128
self.threads = threads
101-
self._crc = None
129+
self._crc = 0
102130
self.running = False
103-
self._size = None
131+
self._size = 0
104132
self.crc_worker = threading.Thread(target=self._calculate_crc)
105133
self.output_worker = threading.Thread(target=self.write)
106134
self.compression_workers = [
@@ -121,7 +149,7 @@ def _write_gzip_header(self):
121149
os = 0xff
122150
xfl = 4 if self.level == 0 else 0
123151
self.raw.write(struct.pack(
124-
"BBBBIBB", magic1,magic2, method, flags, mtime, os, xfl))
152+
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))
125153

126154
def start(self):
127155
self.running = True
@@ -172,6 +200,7 @@ def close(self) -> None:
172200
self.raw.close()
173201
self._closed = True
174202

203+
@property
175204
def closed(self) -> bool:
176205
return self._closed
177206

@@ -197,8 +226,10 @@ def _compress(self, index: int):
197226
data, zdict = in_queue.get(timeout=0.05)
198227
except queue.Empty:
199228
continue
200-
compressor = isal_zlib.compressobj(self.level, wbits=-15, zdict=zdict)
201-
compressed = compressor.compress(data) + compressor.flush(isal_zlib.Z_SYNC_FLUSH)
229+
compressor = isal_zlib.compressobj(
230+
self.level, wbits=-15, zdict=zdict)
231+
compressed = compressor.compress(data) + compressor.flush(
232+
isal_zlib.Z_SYNC_FLUSH)
202233
out_queue.put(compressed)
203234
in_queue.task_done()
204235

tests/test_igzip_threaded.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
import io
1010
from pathlib import Path
1111

12-
import pytest
13-
1412
from isal import igzip_threaded
1513

14+
import pytest
15+
1616
TEST_FILE = str((Path(__file__).parent / "data" / "test.fastq.gz"))
1717

1818

0 commit comments

Comments
 (0)