Skip to content

Commit a070545

Browse files
committed
Implement threadedwriter further
1 parent bd880bc commit a070545

File tree

1 file changed

+64
-1
lines changed

1 file changed

+64
-1
lines changed

src/isal/igzip_threaded.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import io
22
import queue
3+
import struct
34
import threading
45
import typing
56

@@ -86,7 +87,7 @@ def close(self) -> None:
8687

8788

8889
class ThreadedWriter(io.RawIOBase):
89-
def __init__(self, fp, level: int=isal_zlib.ISAL_DEFAULT_COMPRESSION,
90+
def __init__(self, fp: typing.BinaryIO, level: int=isal_zlib.ISAL_DEFAULT_COMPRESSION,
9091
threads: int=1,
9192
queue_size=2):
9293
self.raw = fp
@@ -100,8 +101,46 @@ def __init__(self, fp, level: int=isal_zlib.ISAL_DEFAULT_COMPRESSION,
100101
self._crc = None
101102
self.running = False
102103
self._size = None
104+
self.crc_worker = threading.Thread(target=self._calculate_crc)
105+
self.output_worker = threading.Thread(target=self.write)
106+
self.compression_workers = [
107+
threading.Thread(target=self._compress, args=(i,))
108+
for i in range(threads)
109+
]
110+
self._closed = False
111+
self._write_gzip_header()
112+
self.start()
113+
114+
def _write_gzip_header(self):
115+
"""Simple gzip header. Only xfl flag is set according to level."""
116+
magic1 = 0x1f
117+
magic2 = 0x8b
118+
method = 0x08
119+
flags = 0
120+
mtime = 0
121+
os = 0xff
122+
xfl = 4 if self.level == 0 else 0
123+
self.raw.write(struct.pack(
124+
"BBBBIBB", magic1,magic2, method, flags, mtime, os, xfl))
125+
126+
def start(self):
127+
self.running = True
128+
self.crc_worker.start()
129+
self.output_worker.start()
130+
for worker in self.compression_workers:
131+
worker.start()
132+
133+
def stop_immediately(self):
134+
"""Stop, but do not care for remaining work"""
135+
self.running = False
136+
self.crc_worker.join()
137+
self.output_worker.join()
138+
for worker in self.compression_workers:
139+
worker.join()
103140

104141
def write(self, b) -> int:
142+
if self._closed:
143+
raise IOError("Can not write closed file")
105144
index = self.index
106145
data = bytes(b)
107146
zdict = memoryview(self.previous_block)[:-DEFLATE_WINDOW_SIZE]
@@ -112,6 +151,30 @@ def write(self, b) -> int:
112151
self.input_queues[worker_index].put((data, zdict))
113152
return len(data)
114153

154+
def flush(self):
155+
if self._closed:
156+
raise IOError("Can not write closed file")
157+
# Wait for all data to be compressed
158+
for in_q in self.input_queues:
159+
in_q.join()
160+
# Wait for all data to be written
161+
for out_q in self.output_queues:
162+
out_q.join()
163+
self.raw.flush()
164+
165+
def close(self) -> None:
166+
self.flush()
167+
self.crc_queue.join()
168+
self.stop_immediately()
169+
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
170+
self.raw.write(trailer)
171+
self.raw.flush()
172+
self.raw.close()
173+
self._closed = True
174+
175+
def closed(self) -> bool:
176+
return self._closed
177+
115178
def _calculate_crc(self):
116179
crc = isal_zlib.crc32(b"")
117180
size = 0

0 commit comments

Comments
 (0)