Skip to content

Commit d547257

Browse files
authored
EngineClient: properly handle abort flag on close (#591)
StreamWorker.abort() calls Engine.remove() with abort set to True but this flag was never actually used. The problem is that EngineClient._close_stream() is then called and flushes any read buffers, which can lead to unexpected read events and messing with the user's code logic (StreamWorker is used by the Gateway and this issue was discovered there). With this patch, we ensure that worker.abort() actually aborts all engine events. Just a last note that we keep the current behavior when both abort and timeout are set to True in Engine.remove()/EngineClient._close(). In that case, timeout=True prevails as it is Engine-induced and we consider we can flush any read buffers (so potentially generating a few ev_read events).
1 parent 014a201 commit d547257

File tree

2 files changed

+47
-4
lines changed

2 files changed

+47
-4
lines changed

lib/ClusterShell/Worker/EngineClient.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,18 +264,23 @@ def _close(self, abort, timeout):
264264
Derived classes should implement.
265265
"""
266266
for sname in list(self.streams):
267-
self._close_stream(sname)
267+
# for engine-induced timeout with abort, do not actually abort
268+
# the stream so we can flush buffers (original behavior)
269+
self._close_stream(sname, abort and not timeout)
268270

269271
self.invalidate() # set self._engine to None
270272

271-
def _close_stream(self, sname):
273+
def _close_stream(self, sname, abort=False):
272274
"""
273275
Close specific stream by name (internal, called by engine). This method
274-
is the regular way to close a stream flushing read buffers accordingly.
276+
is the regular way to close a stream flushing read buffers accordingly,
277+
unless we are closing the stream with the abort flag set to True.
275278
"""
276-
self._flush_read(sname)
277279
# flush_read() is useful but may generate user events (ev_read) that
278280
# could lead to worker abort and then ev_close. Be careful there.
281+
# If we are doing a close with abort, skip the flush (this is newer).
282+
if not abort:
283+
self._flush_read(sname)
279284
if sname in self.streams:
280285
del self.streams[sname]
281286

tests/StreamWorkerTest.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,3 +331,41 @@ def ev_close(self, worker, timedout):
331331

332332
self.run_worker(worker)
333333
self.assertEqual(hdlr.check_close, 1)
334+
335+
def test_010_worker_abort_with_read_buffers(self):
336+
"""test StreamWorker abort() with read buffers"""
337+
class TestH(EventHandler):
338+
def __init__(self, testcase):
339+
self.testcase = testcase
340+
self.read_count = 0
341+
self.timer_called = False
342+
self.worker = None
343+
self.wfd1 = None
344+
345+
def ev_timer(self, timer):
346+
self.timer_called = True
347+
self.worker.abort()
348+
349+
def ev_read(self, worker, node, sname, msg):
350+
self.read_count += 1
351+
os.write(self.wfd1, b"Some unterminated data line")
352+
353+
# We want to test that a StreamWorker.abort() does not generate any
354+
# additional ev_read events. This only works if timeout is not set.
355+
# For a test with timeout, see test_004_timeout_on_open_stream().
356+
hdlr = TestH(self)
357+
worker = StreamWorker(handler=hdlr) # no timeout
358+
hdlr.worker = worker
359+
# Create pipe stream
360+
rfd1, wfd1 = os.pipe()
361+
hdlr.wfd1 = wfd1
362+
worker.set_reader("pipe1", rfd1, closefd=False)
363+
os.write(wfd1, b"Some terminated data line\n")
364+
# TEST: Do not close wfd1 to simulate open stream
365+
# We use an "external" timer to delay the abort() a bit
366+
timer1 = task_self().timer(0.5, handler=hdlr)
367+
self.run_worker(worker)
368+
self.assertTrue(hdlr.timer_called)
369+
self.assertEqual(hdlr.read_count, 1) # single line only
370+
os.close(rfd1)
371+
os.close(wfd1)

0 commit comments

Comments
 (0)