Skip to content

Commit c5985de

Browse files
Support EOF in read write stream
This allows clean exiting of the runner when used with a ReadWriteStream
1 parent ba19182 commit c5985de

File tree

1 file changed

+17
-5
lines changed

1 file changed

+17
-5
lines changed

runner/precise_runner/runner.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import time
1818
from subprocess import PIPE, Popen
19-
from threading import Thread, Event
19+
from threading import Thread, Event, current_thread
2020

2121

2222
class Engine(object):
@@ -82,6 +82,7 @@ def __init__(self, s=b'', chop_samples=-1):
8282
self.buffer = s
8383
self.write_event = Event()
8484
self.chop_samples = chop_samples
85+
self.eof = False
8586

8687
def __len__(self):
8788
return len(self.buffer)
@@ -95,14 +96,18 @@ def read(self, n=-1, timeout=None):
9596
return_time = 1e10 if timeout is None else (
9697
timeout + time.time()
9798
)
98-
while len(self.buffer) < n:
99+
while len(self.buffer) < n and not self.eof:
99100
self.write_event.clear()
100101
if not self.write_event.wait(return_time - time.time()):
101102
return b''
102103
chunk = self.buffer[:n]
103104
self.buffer = self.buffer[n:]
104105
return chunk
105106

107+
def close(self):
108+
self.write_event.set()
109+
self.eof = True
110+
106111
def write(self, s):
107112
self.buffer += s
108113
self.write_event.set()
@@ -210,10 +215,12 @@ def start(self):
210215
def stop(self):
211216
"""Stop listening and close stream"""
212217
if self.thread:
213-
self.running = False
214218
if isinstance(self.stream, ReadWriteStream):
215-
self.stream.write(b'\0' * self.chunk_size)
216-
self.thread.join()
219+
self.stream.close()
220+
else:
221+
self.running = False
222+
if current_thread() is not self.thread:
223+
self.thread.join()
217224
self.thread = None
218225

219226
self.engine.stop()
@@ -234,6 +241,11 @@ def _handle_predictions(self):
234241
while self.running:
235242
chunk = self.stream.read(self.chunk_size)
236243

244+
if len(chunk) < self.chunk_size: # EOF
245+
self.stop()
246+
self.running = False
247+
return
248+
237249
if self.is_paused:
238250
continue
239251

0 commit comments

Comments
 (0)