Skip to content

Commit 1e8933d

Browse files
committed
Ensure errors are propagated to the main thread
1 parent 4961eae commit 1e8933d

File tree

1 file changed

+36
-12
lines changed

1 file changed

+36
-12
lines changed

src/isal/igzip_threaded.py

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ def __init__(self,
150150
level: int = isal_zlib.ISAL_DEFAULT_COMPRESSION,
151151
threads: int = 1,
152152
queue_size: int = 2):
153+
self.lock = threading.Lock()
154+
self.exception = None
153155
self.raw = fp
154156
self.level = level
155157
self.previous_block = b""
@@ -189,14 +191,17 @@ def start(self):
189191
for worker in self.compression_workers:
190192
worker.start()
191193

192-
def stop_immediately(self):
194+
def stop(self):
193195
"""Stop, but do not care for remaining work"""
194196
self.running = False
195-
self.output_worker.join()
196197
for worker in self.compression_workers:
197198
worker.join()
199+
self.output_worker.join()
198200

199201
def write(self, b) -> int:
202+
with self.lock:
203+
if self.exception:
204+
raise self.exception
200205
if self._closed:
201206
raise IOError("Can not write closed file")
202207
index = self.index
@@ -221,7 +226,9 @@ def flush(self):
221226

222227
def close(self) -> None:
223228
self.flush()
224-
self.stop_immediately()
229+
self.stop()
230+
if self.exception:
231+
raise self.exception
225232
# Write an empty deflate block with a lost block marker.
226233
self.raw.write(isal_zlib.compress(b"", wbits=-15))
227234
trailer = struct.pack("<II", self._crc, self._size & 0xFFFFFFFF)
@@ -237,16 +244,31 @@ def closed(self) -> bool:
237244
def _compress(self, index: int):
238245
in_queue = self.input_queues[index]
239246
out_queue = self.output_queues[index]
240-
while self.running:
247+
while True:
241248
try:
242249
data, zdict = in_queue.get(timeout=0.05)
243250
except queue.Empty:
251+
if not self.running:
252+
return
244253
continue
245-
compressor = isal_zlib.compressobj(
246-
self.level, wbits=-15, zdict=zdict)
247-
compressed = compressor.compress(data) + compressor.flush(
248-
isal_zlib.Z_SYNC_FLUSH)
249-
crc = isal_zlib.crc32(data)
254+
try:
255+
compressor = isal_zlib.compressobj(
256+
self.level, wbits=-15, zdict=zdict)
257+
compressed = compressor.compress(data) + compressor.flush(
258+
isal_zlib.Z_SYNC_FLUSH)
259+
crc = isal_zlib.crc32(data)
260+
except Exception as e:
261+
with self.lock:
262+
self.exception = e
263+
# Abort everything and empty the queue
264+
in_queue.task_done()
265+
self.running = False
266+
while True:
267+
try:
268+
_ = in_queue.get(timeout=0.05)
269+
in_queue.task_done()
270+
except queue.Empty:
271+
return
250272
data_length = len(data)
251273
out_queue.put((compressed, crc, data_length))
252274
in_queue.task_done()
@@ -257,20 +279,22 @@ def _write(self):
257279
fp = self.raw
258280
total_crc = 0
259281
size = 0
260-
while self.running:
282+
while True:
261283
out_index = index % self.threads
262284
output_queue = output_queues[out_index]
263285
try:
264286
compressed, crc, data_length = output_queue.get(timeout=0.05)
265287
except queue.Empty:
288+
if not self.running:
289+
self._crc = total_crc
290+
self._size = size
291+
return
266292
continue
267293
total_crc = isal_zlib.crc32_combine(total_crc, crc, data_length)
268294
size += data_length
269295
fp.write(compressed)
270296
output_queue.task_done()
271297
index += 1
272-
self._crc = total_crc
273-
self._size = size
274298

275299
def writable(self) -> bool:
276300
return True

0 commit comments

Comments
 (0)