@@ -108,6 +108,12 @@ def close(self) -> None:
108
108
self .worker .join ()
109
109
self .fileobj .close ()
110
110
111
+ def __enter__ (self ):
112
+ return self
113
+
114
+ def __exit__ (self , exc_type , exc_val , exc_tb ):
115
+ self .close ()
116
+
111
117
112
118
class ThreadedGzipWriter (io .RawIOBase ):
113
119
def __init__ (self ,
@@ -130,7 +136,7 @@ def __init__(self,
130
136
self .running = False
131
137
self ._size = 0
132
138
self .crc_worker = threading .Thread (target = self ._calculate_crc )
133
- self .output_worker = threading .Thread (target = self .write )
139
+ self .output_worker = threading .Thread (target = self ._write )
134
140
self .compression_workers = [
135
141
threading .Thread (target = self ._compress , args = (i ,))
136
142
for i in range (threads )
@@ -194,6 +200,8 @@ def close(self) -> None:
194
200
self .flush ()
195
201
self .crc_queue .join ()
196
202
self .stop_immediately ()
203
+ # Write an empty deflate block with a lost block marker.
204
+ self .raw .write (isal_zlib .compress (b"" , wbits = - 15 ))
197
205
trailer = struct .pack ("<II" , self ._crc , self ._size & 0xFFFFFFFF )
198
206
self .raw .write (trailer )
199
207
self .raw .flush ()
@@ -238,11 +246,21 @@ def _write(self):
238
246
output_queues = self .output_queues
239
247
fp = self .raw
240
248
while self .running :
241
- output_queue = output_queues [index ]
249
+ out_index = index % self .threads
250
+ output_queue = output_queues [out_index ]
242
251
try :
243
252
data = output_queue .get (timeout = 0.05 )
244
253
except queue .Empty :
245
254
continue
246
255
fp .write (data )
247
256
output_queue .task_done ()
248
257
index += 1
258
+
259
+ def writable (self ) -> bool :
260
+ return True
261
+
262
+ def __enter__ (self ):
263
+ return self
264
+
265
+ def __exit__ (self , exc_type , exc_val , exc_tb ):
266
+ self .close ()
0 commit comments