diff --git a/lib/ClusterShell/Worker/EngineClient.py b/lib/ClusterShell/Worker/EngineClient.py index 74185e76..58f1f895 100644 --- a/lib/ClusterShell/Worker/EngineClient.py +++ b/lib/ClusterShell/Worker/EngineClient.py @@ -264,18 +264,23 @@ def _close(self, abort, timeout): Derived classes should implement. """ for sname in list(self.streams): - self._close_stream(sname) + # for engine-induced timeout with abort, do not actually abort + # the stream so we can flush buffers (original behavior) + self._close_stream(sname, abort and not timeout) self.invalidate() # set self._engine to None - def _close_stream(self, sname): + def _close_stream(self, sname, abort=False): """ Close specific stream by name (internal, called by engine). This method - is the regular way to close a stream flushing read buffers accordingly. + is the regular way to close a stream flushing read buffers accordingly, + unless we are closing the stream with the abort flag set to True. """ - self._flush_read(sname) # flush_read() is useful but may generate user events (ev_read) that # could lead to worker abort and then ev_close. Be careful there. + # If we are doing a close with abort, skip the flush (this is newer). + if not abort: + self._flush_read(sname) if sname in self.streams: del self.streams[sname] diff --git a/tests/StreamWorkerTest.py b/tests/StreamWorkerTest.py index ff0987b3..77e5cc0e 100644 --- a/tests/StreamWorkerTest.py +++ b/tests/StreamWorkerTest.py @@ -331,3 +331,41 @@ def ev_close(self, worker, timedout): self.run_worker(worker) self.assertEqual(hdlr.check_close, 1) + + def test_010_worker_abort_with_read_buffers(self): + """test StreamWorker abort() with read buffers""" + class TestH(EventHandler): + def __init__(self, testcase): + self.testcase = testcase + self.read_count = 0 + self.timer_called = False + self.worker = None + self.wfd1 = None + + def ev_timer(self, timer): + self.timer_called = True + self.worker.abort() + + def ev_read(self, worker, node, sname, msg): + self.read_count += 1 + os.write(self.wfd1, b"Some unterminated data line") + + # We want to test that a StreamWorker.abort() does not generate any + # additional ev_read events. This only works if timeout is not set. + # For a test with timeout, see test_004_timeout_on_open_stream(). + hdlr = TestH(self) + worker = StreamWorker(handler=hdlr) # no timeout + hdlr.worker = worker + # Create pipe stream + rfd1, wfd1 = os.pipe() + hdlr.wfd1 = wfd1 + worker.set_reader("pipe1", rfd1, closefd=False) + os.write(wfd1, b"Some terminated data line\n") + # TEST: Do not close wfd1 to simulate open stream + # We use an "external" timer to delay the abort() a bit + timer1 = task_self().timer(0.5, handler=hdlr) + self.run_worker(worker) + self.assertTrue(hdlr.timer_called) + self.assertEqual(hdlr.read_count, 1) # single line only + os.close(rfd1) + os.close(wfd1)