Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/ClusterShell/Worker/EngineClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,23 @@ def _close(self, abort, timeout):
Derived classes should implement.
"""
for sname in list(self.streams):
self._close_stream(sname)
# for engine-induced timeout with abort, do not actually abort
# the stream so we can flush buffers (original behavior)
self._close_stream(sname, abort and not timeout)

self.invalidate() # set self._engine to None

def _close_stream(self, sname):
def _close_stream(self, sname, abort=False):
"""
Close specific stream by name (internal, called by engine). This method
is the regular way to close a stream flushing read buffers accordingly.
is the regular way to close a stream flushing read buffers accordingly,
unless we are closing the stream with the abort flag set to True.
"""
self._flush_read(sname)
# flush_read() is useful but may generate user events (ev_read) that
# could lead to worker abort and then ev_close. Be careful there.
# If we are doing a close with abort, skip the flush (this is newer).
if not abort:
self._flush_read(sname)
if sname in self.streams:
del self.streams[sname]

Expand Down
38 changes: 38 additions & 0 deletions tests/StreamWorkerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,41 @@ def ev_close(self, worker, timedout):

self.run_worker(worker)
self.assertEqual(hdlr.check_close, 1)

def test_010_worker_abort_with_read_buffers(self):
"""test StreamWorker abort() with read buffers"""
class TestH(EventHandler):
def __init__(self, testcase):
self.testcase = testcase
self.read_count = 0
self.timer_called = False
self.worker = None
self.wfd1 = None

def ev_timer(self, timer):
self.timer_called = True
self.worker.abort()

def ev_read(self, worker, node, sname, msg):
self.read_count += 1
os.write(self.wfd1, b"Some unterminated data line")

# We want to test that a StreamWorker.abort() does not generate any
# additional ev_read events. This only works if timeout is not set.
# For a test with timeout, see test_004_timeout_on_open_stream().
hdlr = TestH(self)
worker = StreamWorker(handler=hdlr) # no timeout
hdlr.worker = worker
# Create pipe stream
rfd1, wfd1 = os.pipe()
hdlr.wfd1 = wfd1
worker.set_reader("pipe1", rfd1, closefd=False)
os.write(wfd1, b"Some terminated data line\n")
# TEST: Do not close wfd1 to simulate open stream
# We use an "external" timer to delay the abort() a bit
timer1 = task_self().timer(0.5, handler=hdlr)
self.run_worker(worker)
self.assertTrue(hdlr.timer_called)
self.assertEqual(hdlr.read_count, 1) # single line only
os.close(rfd1)
os.close(wfd1)
Loading