From 0214ac79ccf9b7c2daf6f42244cec9398210c895 Mon Sep 17 00:00:00 2001 From: Stephane Thiell Date: Tue, 5 Aug 2025 19:58:04 -0700 Subject: [PATCH] EngineClient: properly handle abort flag on close StreamWorker.abort() calls Engine.remove() with abort set to True but this flag was never actually used. The problem is that EngineClient._close_stream() is then called and flushes any read buffers, which can lead to unexpected read events and messing with the user's code logic (StreamWorker is used by the Gateway and this issue was discovered there). With this patch, we ensure that worker.abort() actually aborts all engine events. Just a last note that we keep the current behavior when both abort and timeout are set to True in Engine.remove()/EngineClient._close(). In that case, timeout=True prevails as it is Engine-induced and we consider we can flush any read buffers (so potentially generating a few ev_read events). --- lib/ClusterShell/Worker/EngineClient.py | 13 ++++++--- tests/StreamWorkerTest.py | 38 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/lib/ClusterShell/Worker/EngineClient.py b/lib/ClusterShell/Worker/EngineClient.py index 74185e76..58f1f895 100644 --- a/lib/ClusterShell/Worker/EngineClient.py +++ b/lib/ClusterShell/Worker/EngineClient.py @@ -264,18 +264,23 @@ def _close(self, abort, timeout): Derived classes should implement. """ for sname in list(self.streams): - self._close_stream(sname) + # for engine-induced timeout with abort, do not actually abort + # the stream so we can flush buffers (original behavior) + self._close_stream(sname, abort and not timeout) self.invalidate() # set self._engine to None - def _close_stream(self, sname): + def _close_stream(self, sname, abort=False): """ Close specific stream by name (internal, called by engine). This method - is the regular way to close a stream flushing read buffers accordingly. + is the regular way to close a stream flushing read buffers accordingly, + unless we are closing the stream with the abort flag set to True. """ - self._flush_read(sname) # flush_read() is useful but may generate user events (ev_read) that # could lead to worker abort and then ev_close. Be careful there. + # If we are doing a close with abort, skip the flush (this is newer). + if not abort: + self._flush_read(sname) if sname in self.streams: del self.streams[sname] diff --git a/tests/StreamWorkerTest.py b/tests/StreamWorkerTest.py index ff0987b3..77e5cc0e 100644 --- a/tests/StreamWorkerTest.py +++ b/tests/StreamWorkerTest.py @@ -331,3 +331,41 @@ def ev_close(self, worker, timedout): self.run_worker(worker) self.assertEqual(hdlr.check_close, 1) + + def test_010_worker_abort_with_read_buffers(self): + """test StreamWorker abort() with read buffers""" + class TestH(EventHandler): + def __init__(self, testcase): + self.testcase = testcase + self.read_count = 0 + self.timer_called = False + self.worker = None + self.wfd1 = None + + def ev_timer(self, timer): + self.timer_called = True + self.worker.abort() + + def ev_read(self, worker, node, sname, msg): + self.read_count += 1 + os.write(self.wfd1, b"Some unterminated data line") + + # We want to test that a StreamWorker.abort() does not generate any + # additional ev_read events. This only works if timeout is not set. + # For a test with timeout, see test_004_timeout_on_open_stream(). + hdlr = TestH(self) + worker = StreamWorker(handler=hdlr) # no timeout + hdlr.worker = worker + # Create pipe stream + rfd1, wfd1 = os.pipe() + hdlr.wfd1 = wfd1 + worker.set_reader("pipe1", rfd1, closefd=False) + os.write(wfd1, b"Some terminated data line\n") + # TEST: Do not close wfd1 to simulate open stream + # We use an "external" timer to delay the abort() a bit + timer1 = task_self().timer(0.5, handler=hdlr) + self.run_worker(worker) + self.assertTrue(hdlr.timer_called) + self.assertEqual(hdlr.read_count, 1) # single line only + os.close(rfd1) + os.close(wfd1)