Skip to content

Commit 02b8634

Browse files
committed
Use CRC combination for better threading
1 parent 8c9b8ee commit 02b8634

File tree

1 file changed

+12
-25
lines changed

1 file changed

+12
-25
lines changed

src/isal/igzip_threaded.py

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -124,18 +124,15 @@ def __init__(self,
124124
self.raw = fp
125125
self.level = level
126126
self.previous_block = b""
127-
self.crc_queue: queue.Queue[bytes] = queue.Queue(
128-
maxsize=threads * queue_size)
129127
self.input_queues: List[queue.Queue[Tuple[bytes, memoryview]]] = [
130128
queue.Queue(queue_size) for _ in range(threads)]
131-
self.output_queues: List[queue.Queue[bytes]] = [
129+
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
132130
queue.Queue(queue_size) for _ in range(threads)]
133131
self.index = 0
134132
self.threads = threads
135133
self._crc = 0
136134
self.running = False
137135
self._size = 0
138-
self.crc_worker = threading.Thread(target=self._calculate_crc)
139136
self.output_worker = threading.Thread(target=self._write)
140137
self.compression_workers = [
141138
threading.Thread(target=self._compress, args=(i,))
@@ -159,15 +156,13 @@ def _write_gzip_header(self):
159156

160157
def start(self):
161158
self.running = True
162-
self.crc_worker.start()
163159
self.output_worker.start()
164160
for worker in self.compression_workers:
165161
worker.start()
166162

167163
def stop_immediately(self):
168164
"""Stop, but do not care for remaining work"""
169165
self.running = False
170-
self.crc_worker.join()
171166
self.output_worker.join()
172167
for worker in self.compression_workers:
173168
worker.join()
@@ -181,7 +176,6 @@ def write(self, b) -> int:
181176
self.previous_block = data
182177
self.index += 1
183178
worker_index = index % self.threads
184-
self.crc_queue.put(data)
185179
self.input_queues[worker_index].put((data, zdict))
186180
return len(data)
187181

@@ -198,7 +192,6 @@ def flush(self):
198192

199193
def close(self) -> None:
200194
self.flush()
201-
self.crc_queue.join()
202195
self.stop_immediately()
203196
# Write an empty deflate block with a lost block marker.
204197
self.raw.write(isal_zlib.compress(b"", wbits=-15))
@@ -212,20 +205,6 @@ def close(self) -> None:
212205
def closed(self) -> bool:
213206
return self._closed
214207

215-
def _calculate_crc(self):
216-
crc = isal_zlib.crc32(b"")
217-
size = 0
218-
while self.running:
219-
try:
220-
data = self.crc_queue.get(timeout=0.05)
221-
except queue.Empty:
222-
continue
223-
crc = isal_zlib.crc32(data, crc)
224-
size += len(data)
225-
self.crc_queue.task_done()
226-
self._crc = crc
227-
self._size = size
228-
229208
def _compress(self, index: int):
230209
in_queue = self.input_queues[index]
231210
out_queue = self.output_queues[index]
@@ -238,23 +217,31 @@ def _compress(self, index: int):
238217
self.level, wbits=-15, zdict=zdict)
239218
compressed = compressor.compress(data) + compressor.flush(
240219
isal_zlib.Z_SYNC_FLUSH)
241-
out_queue.put(compressed)
220+
crc = isal_zlib.crc32(data)
221+
data_length = len(data)
222+
out_queue.put((compressed, crc, data_length))
242223
in_queue.task_done()
243224

244225
def _write(self):
245226
index = 0
246227
output_queues = self.output_queues
247228
fp = self.raw
229+
total_crc = 0
230+
size = 0
248231
while self.running:
249232
out_index = index % self.threads
250233
output_queue = output_queues[out_index]
251234
try:
252-
data = output_queue.get(timeout=0.05)
235+
compressed, crc, data_length = output_queue.get(timeout=0.05)
253236
except queue.Empty:
254237
continue
255-
fp.write(data)
238+
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
239+
size += data_length
240+
fp.write(compressed)
256241
output_queue.task_done()
257242
index += 1
243+
self._crc = total_crc
244+
self._size = size
258245

259246
def writable(self) -> bool:
260247
return True

0 commit comments

Comments
 (0)