Skip to content

Commit 4351981

Browse files
committed
Worker: make abort() safe to call on an already closing client
Fix the underlying EngineClient.abort() call so that abort() can safely be called on an already closing (or aborting) worker. This fix (and API clarification) will help for upcoming patches regarding aborting gateway channels (#260). Change-Id: Ic714dc71db1aa9a0d5b16607780891f019c10859
1 parent 76f8d57 commit 4351981

File tree

9 files changed

+86
-7
lines changed

9 files changed

+86
-7
lines changed

lib/ClusterShell/Worker/EngineClient.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,8 @@ def _close(self, abort, timeout):
266266
for sname in list(self.streams):
267267
self._close_stream(sname)
268268

269+
self.invalidate() # set self._engine to None
270+
269271
def _close_stream(self, sname):
270272
"""
271273
Close specific stream by name (internal, called by engine). This method
@@ -318,7 +320,8 @@ def _handle_write(self, sname):
318320
wfile = self.streams[sname]
319321
if not wfile.wbuf and wfile.eof:
320322
# remove stream from engine (not directly)
321-
self._engine.remove_stream(self, wfile)
323+
if self._engine:
324+
self._engine.remove_stream(self, wfile)
322325
elif len(wfile.wbuf) > 0:
323326
try:
324327
wcnt = os.write(wfile.fd, wfile.wbuf)
@@ -340,7 +343,8 @@ def _handle_write(self, sname):
340343
if wfile.eof and not wfile.wbuf:
341344
self.worker._on_written(self.key, wcnt, sname)
342345
# remove stream from engine (not directly)
343-
self._engine.remove_stream(self, wfile)
346+
if self._engine:
347+
self._engine.remove_stream(self, wfile)
344348
else:
345349
self._set_writing(sname)
346350
self.worker._on_written(self.key, wcnt, sname)
@@ -424,9 +428,15 @@ def _set_write_eof(self, sname):
424428
self._engine.remove_stream(self, wfile)
425429

426430
def abort(self):
427-
"""Abort processing any action by this client."""
428-
if self._engine:
429-
self._engine.remove(self, abort=True)
431+
"""Abort processing any action by this client.
432+
433+
Safe to call on an already closing or aborting client.
434+
"""
435+
engine = self._engine
436+
if engine:
437+
self.invalidate() # set self._engine to None
438+
engine.remove(self, abort=True)
439+
430440

431441
class EnginePort(EngineClient):
432442
"""
@@ -512,6 +522,7 @@ def _close(self, abort, timeout):
512522
self._msgq = None
513523
del self.streams['out']
514524
del self.streams['in']
525+
self.invalidate()
515526

516527
def _handle_read(self, sname):
517528
"""

lib/ClusterShell/Worker/Exec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def _close(self, abort, timeout):
128128
prc = self.popen.wait()
129129

130130
self.streams.clear()
131+
self.invalidate()
131132

132133
if prc >= 0:
133134
self._on_nodeset_close(self.key, prc)

lib/ClusterShell/Worker/Pdsh.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def _close(self, abort, timeout):
9696
raise WorkerError("Cannot run pdsh (error %d)" % prc)
9797

9898
self.streams.clear()
99+
self.invalidate()
99100

100101
if timeout:
101102
assert abort, "abort flag not set on timeout"

lib/ClusterShell/Worker/Popen.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ def _close(self, abort, timeout):
7777
prc = self.popen.wait()
7878

7979
self.streams.clear()
80+
self.invalidate()
8081

8182
if prc >= 0: # filter valid rc
8283
self.rc = prc

lib/ClusterShell/Worker/Worker.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,10 @@ def read(self, node=None, sname='stdout'):
209209
# Base actions
210210

211211
def abort(self):
212-
"""Abort processing any action by this worker."""
212+
"""Abort processing any action by this worker.
213+
214+
Safe to call on an already closing or aborting worker.
215+
"""
213216
raise NotImplementedError("Derived classes must implement.")
214217

215218
def flush_buffers(self):
@@ -599,7 +602,10 @@ def _on_timeout(self, key):
599602
self.eh.ev_timeout(self)
600603

601604
def abort(self):
602-
"""Abort processing any action by this worker."""
605+
"""Abort processing any action by this worker.
606+
607+
Safe to call on an already closing or aborting worker.
608+
"""
603609
self.clients[0].abort()
604610

605611
def read(self, node=None, sname='stdout'):

tests/StreamWorkerTest.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ def ev_written(self, worker, node, sname, size):
219219
self.check_written += 1
220220
self.testcase.assertEqual(os.read(self.rfd, 1024), b"initial")
221221
worker.abort()
222+
worker.abort() # safe but no effect
222223

223224
rfd, wfd = os.pipe()
224225

@@ -248,6 +249,7 @@ def ev_written(self, worker, node, sname, size):
248249
self.check_written += 1
249250
self.testcase.assertEqual(os.read(self.rfd, 1024), b"initial")
250251
worker.abort()
252+
worker.abort() # safe but no effect
251253

252254
rfd, wfd = os.pipe()
253255

@@ -299,3 +301,31 @@ def ev_written(self, worker, node, sname, size):
299301
self.run_worker(worker)
300302
self.assertEqual(hdlr.check_hup, 1)
301303
self.assertEqual(hdlr.check_written, 1)
304+
305+
def test_009_worker_abort_on_close(self):
306+
"""test StreamWorker abort() on closing worker"""
307+
308+
class TestH(EventHandler):
309+
def __init__(self, testcase, rfd):
310+
self.testcase = testcase
311+
self.rfd = rfd
312+
self.check_close = 0
313+
314+
def ev_close(self, worker, timedout):
315+
self.check_close += 1
316+
self.testcase.assertFalse(timedout)
317+
os.close(self.rfd)
318+
worker.abort()
319+
worker.abort() # safe but no effect
320+
321+
rfd, wfd = os.pipe()
322+
323+
hdlr = TestH(self, rfd)
324+
worker = StreamWorker(handler=hdlr)
325+
326+
worker.set_writer("test", wfd) # closefd=True
327+
worker.write(b"initial", "test")
328+
worker.set_write_eof()
329+
330+
self.run_worker(worker)
331+
self.assertEqual(hdlr.check_close, 1)

tests/TaskDistantMixin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -612,6 +612,7 @@ def __init__(self, worker):
612612
self.testtimer = False
613613
def ev_timer(self, timer):
614614
self.ext_worker.abort()
615+
self.ext_worker.abort() # safe but no effect
615616
self.testtimer = True
616617

617618
aot = AbortOnTimer(task.shell("sleep 10", nodes=HOSTNAME))

tests/TaskLocalMixin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,7 @@ def __init__(self, worker):
827827
self.testtimer = False
828828
def ev_timer(self, timer):
829829
self.ext_worker.abort()
830+
self.ext_worker.abort() # safe but no effect
830831
self.testtimer = True
831832

832833
aot = AbortOnTimer(task.shell("sleep 10"))

tests/WorkerExecTest.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from TLib import HOSTNAME, make_temp_file, make_temp_filename, make_temp_dir
1010

11+
from ClusterShell.Event import EventHandler
1112
from ClusterShell.Worker.Exec import ExecWorker, WorkerError
1213
from ClusterShell.Task import task_self
1314

@@ -150,3 +151,29 @@ def test_rcopy_wrong_directory(self):
150151
stderr=True, reverse=True)
151152
finally:
152153
os.rmdir(dstbasedir)
154+
155+
def test_abort_on_read(self):
156+
"""test ExecWorker.abort() on read"""
157+
158+
class TestH(EventHandler):
159+
def ev_read(self, worker):
160+
worker.abort()
161+
worker.abort() # safe but no effect
162+
163+
self.execw(nodes='localhost', handler=TestH(),
164+
command="echo ok; tail -f /dev/null")
165+
self.assertEqual(task_self().max_retcode(), None)
166+
self.assertEqual(task_self().node_buffer('localhost'), b'ok')
167+
168+
def test_abort_on_close(self):
169+
"""test ExecWorker.abort() on close"""
170+
171+
class TestH(EventHandler):
172+
def ev_close(self, worker, timedout):
173+
worker.abort()
174+
worker.abort() # safe but no effect
175+
176+
self.execw(nodes='localhost', handler=TestH(),
177+
command="echo ok; sleep .1")
178+
self.assertEqual(task_self().max_retcode(), 0)
179+
self.assertEqual(task_self().node_buffer('localhost'), b'ok')

0 commit comments

Comments
 (0)