@@ -101,6 +101,7 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
101
101
self .worker = threading .Thread (target = self ._decompress )
102
102
self ._closed = False
103
103
self .running = True
104
+ self ._calling_thread = threading .current_thread ()
104
105
self .worker .start ()
105
106
106
107
def _check_closed (self , msg = None ):
@@ -110,15 +111,15 @@ def _check_closed(self, msg=None):
110
111
def _decompress (self ):
111
112
block_size = self .block_size
112
113
block_queue = self .queue
113
- while self .running :
114
+ while self .running and self . _calling_thread . is_alive () :
114
115
try :
115
116
data = self .fileobj .read (block_size )
116
117
except Exception as e :
117
118
self .exception = e
118
119
return
119
120
if not data :
120
121
return
121
- while self .running :
122
+ while self .running and self . _calling_thread . is_alive () :
122
123
try :
123
124
block_queue .put (data , timeout = 0.05 )
124
125
break
@@ -215,6 +216,7 @@ def __init__(self,
215
216
if "b" not in mode :
216
217
mode += "b"
217
218
self .lock = threading .Lock ()
219
+ self ._calling_thread = threading .current_thread ()
218
220
self .exception : Optional [Exception ] = None
219
221
self .level = level
220
222
self .previous_block = b""
@@ -349,7 +351,7 @@ def _compress(self, index: int):
349
351
in_queue = self .input_queues [index ]
350
352
out_queue = self .output_queues [index ]
351
353
compressor : zlib_ng ._ParallelCompress = self .compressors [index ]
352
- while True :
354
+ while self . _calling_thread . is_alive () :
353
355
try :
354
356
data , zdict = in_queue .get (timeout = 0.05 )
355
357
except queue .Empty :
@@ -372,7 +374,7 @@ def _write(self):
372
374
fp = self .raw
373
375
total_crc = 0
374
376
size = 0
375
- while True :
377
+ while self . _calling_thread . is_alive () :
376
378
out_index = index % self .threads
377
379
output_queue = output_queues [out_index ]
378
380
try :
@@ -397,7 +399,7 @@ def _compress_and_write(self):
397
399
size = 0
398
400
in_queue = self .input_queues [0 ]
399
401
compressor = self .compressors [0 ]
400
- while True :
402
+ while self . _calling_thread . is_alive () :
401
403
try :
402
404
data , zdict = in_queue .get (timeout = 0.05 )
403
405
except queue .Empty :
0 commit comments