Skip to content

Commit bd880bc

Browse files
committed
Start on threaded writer implementation
1 parent 9487e56 commit bd880bc

File tree

1 file changed

+72
-1
lines changed

1 file changed

+72
-1
lines changed

src/isal/igzip_threaded.py

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import io
22
import queue
33
import threading
4+
import typing
45

5-
from . import igzip
6+
from . import igzip, isal_zlib
67

8+
DEFLATE_WINDOW_SIZE = 2 ** 15
79

810
def open(filename, mode="rb", compresslevel=igzip._COMPRESS_LEVEL_TRADEOFF,
911
encoding=None, errors=None, newline=None, *, threads=-1):
@@ -81,3 +83,72 @@ def close(self) -> None:
8183
self.running = False
8284
self.worker.join()
8385
self.fileobj.close()
86+
87+
88+
class ThreadedWriter(io.RawIOBase):
89+
def __init__(self, fp, level: int=isal_zlib.ISAL_DEFAULT_COMPRESSION,
90+
threads: int=1,
91+
queue_size=2):
92+
self.raw = fp
93+
self.level = level
94+
self.previous_block = b""
95+
self.crc_queue = queue.Queue(maxsize=threads * queue_size)
96+
self.input_queues = [queue.Queue(queue_size) for _ in range(threads)]
97+
self.output_queues = [queue.Queue(queue_size) for _ in range(threads)]
98+
self.index = 0
99+
self.threads = threads
100+
self._crc = None
101+
self.running = False
102+
self._size = None
103+
104+
def write(self, b) -> int:
105+
index = self.index
106+
data = bytes(b)
107+
zdict = memoryview(self.previous_block)[:-DEFLATE_WINDOW_SIZE]
108+
self.previous_block = data
109+
self.index += 1
110+
worker_index = index % self.threads
111+
self.crc_queue.put(data)
112+
self.input_queues[worker_index].put((data, zdict))
113+
return len(data)
114+
115+
def _calculate_crc(self):
116+
crc = isal_zlib.crc32(b"")
117+
size = 0
118+
while self.running:
119+
try:
120+
data = self.crc_queue.get(timeout=0.05)
121+
except queue.Empty:
122+
continue
123+
crc = isal_zlib.crc32(data, crc)
124+
size += len(data)
125+
self.crc_queue.task_done()
126+
self._crc = crc
127+
self._size = size
128+
129+
def _compress(self, index: int):
130+
in_queue = self.input_queues[index]
131+
out_queue = self.output_queues[index]
132+
while self.running:
133+
try:
134+
data, zdict = in_queue.get(timeout=0.05)
135+
except queue.Empty:
136+
continue
137+
compressor = isal_zlib.compressobj(self.level, wbits=-15, zdict=zdict)
138+
compressed = compressor.compress(data) + compressor.flush(isal_zlib.Z_SYNC_FLUSH)
139+
out_queue.put(compressed)
140+
in_queue.task_done()
141+
142+
def _write(self):
143+
index = 0
144+
output_queues = self.output_queues
145+
fp = self.raw
146+
while self.running:
147+
output_queue = output_queues[index]
148+
try:
149+
data = output_queue.get(timeout=0.05)
150+
except queue.Empty:
151+
continue
152+
fp.write(data)
153+
output_queue.task_done()
154+
index += 1

0 commit comments

Comments
 (0)