Skip to content

Commit c9fdb90

Browse files
committed
Revert "Start threads only after calling read and write on ThreadedGzip"
This reverts commit 1c3f210.
1 parent 6b942fb commit c9fdb90

File tree

2 files changed

+20
-30
lines changed

2 files changed

+20
-30
lines changed

src/zlib_ng/gzip_ng_threaded.py

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,11 @@ def __init__(self, filename, queue_size=2, block_size=1024 * 1024):
9898
self.exception = None
9999
self.buffer = io.BytesIO()
100100
self.block_size = block_size
101-
self.worker = threading.Thread(target=self._decompress)
101+
# Using a daemon thread prevents programs freezing on error.
102+
self.worker = threading.Thread(target=self._decompress, daemon=True)
102103
self._closed = False
103-
self.running = False
104+
self.running = True
105+
self.worker.start()
104106

105107
def _check_closed(self, msg=None):
106108
if self._closed:
@@ -124,19 +126,8 @@ def _decompress(self):
124126
except queue.Full:
125127
pass
126128

127-
def _start(self):
128-
if not self.running:
129-
self.running = True
130-
self.worker.start()
131-
132-
def _stop(self):
133-
if self.running:
134-
self.running = False
135-
self.worker.join()
136-
137129
def readinto(self, b):
138130
self._check_closed()
139-
self._start()
140131
result = self.buffer.readinto(b)
141132
if result == 0:
142133
while True:
@@ -164,7 +155,8 @@ def tell(self) -> int:
164155
def close(self) -> None:
165156
if self._closed:
166157
return
167-
self._stop()
158+
self.running = False
159+
self.worker.join()
168160
self.fileobj.close()
169161
if self.closefd:
170162
self.raw.close()
@@ -240,17 +232,18 @@ def __init__(self,
240232
queue.Queue(queue_size) for _ in range(threads)]
241233
self.output_queues: List[queue.Queue[Tuple[bytes, int, int]]] = [
242234
queue.Queue(queue_size) for _ in range(threads)]
243-
self.output_worker = threading.Thread(target=self._write)
235+
# Using daemon threads prevents a program freezing on error.
236+
self.output_worker = threading.Thread(target=self._write, daemon=True)
244237
self.compression_workers = [
245-
threading.Thread(target=self._compress, args=(i,))
238+
threading.Thread(target=self._compress, args=(i,), daemon=True)
246239
for i in range(threads)
247240
]
248241
elif threads == 1:
249242
self.input_queues = [queue.Queue(queue_size)]
250243
self.output_queues = []
251244
self.compression_workers = []
252245
self.output_worker = threading.Thread(
253-
target=self._compress_and_write)
246+
target=self._compress_and_write, daemon=True)
254247
else:
255248
raise ValueError(f"threads should be at least 1, got {threads}")
256249
self.threads = threads
@@ -261,6 +254,7 @@ def __init__(self,
261254
self.raw, self.closefd = open_as_binary_stream(filename, mode)
262255
self._closed = False
263256
self._write_gzip_header()
257+
self.start()
264258

265259
def _check_closed(self, msg=None):
266260
if self._closed:
@@ -283,24 +277,21 @@ def _write_gzip_header(self):
283277
self.raw.write(struct.pack(
284278
"BBBBIBB", magic1, magic2, method, flags, mtime, os, xfl))
285279

286-
def _start(self):
287-
if not self.running:
288-
self.running = True
289-
self.output_worker.start()
290-
for worker in self.compression_workers:
291-
worker.start()
280+
def start(self):
281+
self.running = True
282+
self.output_worker.start()
283+
for worker in self.compression_workers:
284+
worker.start()
292285

293286
def stop(self):
294287
"""Stop, but do not care for remaining work"""
295-
if self.running:
296-
self.running = False
297-
for worker in self.compression_workers:
298-
worker.join()
299-
self.output_worker.join()
288+
self.running = False
289+
for worker in self.compression_workers:
290+
worker.join()
291+
self.output_worker.join()
300292

301293
def write(self, b) -> int:
302294
self._check_closed()
303-
self._start()
304295
with self.lock:
305296
if self.exception:
306297
raise self.exception

tests/test_gzip_ng_threaded.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def test_threaded_write_error(threads):
105105
threads=threads, block_size=8 * 1024)
106106
# Bypass the write method which should not allow blocks larger than
107107
# block_size.
108-
f._start()
109108
f.input_queues[0].put((os.urandom(1024 * 64), b""))
110109
with pytest.raises(OverflowError) as error:
111110
f.close()

0 commit comments

Comments
 (0)