Skip to content

Commit d134fa0

Browse files
committed
Tree: implement TreeWorker.abort() (#229)
Now that the TreeWorker code allows us to abort commands per gateway channel, we can implement a more general TreeWorker.abort() method. This method aborts all direct workers and also commands handled via gateways. Closes #229.
1 parent 93e7ee8 commit d134fa0

File tree

4 files changed

+176
-6
lines changed

4 files changed

+176
-6
lines changed

lib/ClusterShell/Communication.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,6 @@ def ev_start(self, worker):
218218
def ev_read(self, worker, node, sname, msg):
219219
"""channel has data to read"""
220220
# sname can be either SNAME_READER or self.SNAME_ERROR
221-
222221
if sname == self.SNAME_ERROR:
223222
if self.initiator:
224223
self.recv(StdErrMessage(node, msg))

lib/ClusterShell/Propagation.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ def recv(self, msg):
251251
"""process incoming messages"""
252252
self.logger.debug("recv: %s", msg)
253253
if msg.type == EndMessage.ident:
254-
#??#self.ptree.notify_close()
255254
self.logger.debug("got EndMessage; closing")
256255
self._close()
257256
elif msg.type == StdErrMessage.ident and msg.srcid == 0:

lib/ClusterShell/Worker/Tree.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,14 @@ def _on_remote_node_msgline(self, node, msg, sname, gateway):
429429

430430
def _on_remote_node_close(self, node, rc, gateway):
431431
"""remote node closing with return code"""
432-
DistantWorker._on_node_close(self, node, rc)
433432
self.logger.debug("_on_remote_node_close %s %s via gw %s rc=%s", node,
434433
self._close_count, gateway, rc)
435434

435+
# this must be done first to avoid recursion via event handlers
436+
self.gwtargets[str(gateway)].remove(node)
437+
438+
DistantWorker._on_node_close(self, node, rc)
439+
436440
# finalize rcopy: extract tar data
437441
if self.source and self.reverse:
438442
if node in self._rcopy_bufs:
@@ -458,7 +462,6 @@ def _on_remote_node_close(self, node, rc, gateway):
458462
else:
459463
self.logger.debug("no rcopy buffer received from %s", node)
460464

461-
self.gwtargets[str(gateway)].remove(node)
462465
self._close_count += 1
463466
self._check_fini(gateway)
464467

@@ -578,8 +581,11 @@ def _gateway_abort(self, gateway):
578581

579582
def abort(self):
580583
"""Abort processing any action by this worker."""
581-
# Not yet supported by TreeWorker
582-
raise NotImplementedError("see github issue #229")
584+
self.logger.debug("abort %s" % self)
585+
for worker in self.workers:
586+
worker.abort()
587+
for gateway in self.gwtargets.copy():
588+
self._gateway_abort(gateway)
583589

584590

585591
# TreeWorker's former name (deprecated as of 1.8)

tests/TreeWorkerTest.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,172 @@ def test_tree_worker_name_compat(self):
547547
"""test TreeWorker former name (WorkerTree)"""
548548
self.assertEqual(TreeWorker, WorkerTree)
549549

550+
def test_tree_run_abort_on_start(self):
551+
"""test tree run abort on ev_start"""
552+
class TEventAbortOnStartHandler(TEventHandler):
553+
"""Test Event Abort On Start Handler"""
554+
555+
def __init__(self, testcase):
556+
TEventHandler.__init__(self)
557+
self.testcase = testcase
558+
559+
def ev_start(self, worker):
560+
TEventHandler.ev_start(self, worker)
561+
worker.abort()
562+
563+
def ev_hup(self, worker, node, rc):
564+
TEventHandler.ev_hup(self, worker, node, rc)
565+
self.testcase.assertEqual(rc, os.EX_PROTOCOL)
566+
567+
teh = TEventAbortOnStartHandler(self)
568+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
569+
self.assertEqual(teh.ev_start_cnt, 1)
570+
#self.assertEqual(teh.ev_pickup_cnt, 0) # XXX to be improved
571+
self.assertEqual(teh.ev_read_cnt, 0)
572+
self.assertEqual(teh.ev_written_cnt, 0)
573+
self.assertEqual(teh.ev_hup_cnt, 1)
574+
self.assertEqual(teh.ev_timedout_cnt, 0)
575+
self.assertEqual(teh.ev_close_cnt, 1)
576+
self.assertEqual(teh.last_read, None)
577+
578+
def test_tree_run_abort_on_pickup(self):
579+
"""test tree run abort on ev_pickup"""
580+
class TEventAbortOnPickupHandler(TEventHandler):
581+
"""Test Event Abort On Pickup Handler"""
582+
583+
def __init__(self, testcase):
584+
TEventHandler.__init__(self)
585+
self.testcase = testcase
586+
587+
def ev_pickup(self, worker, node):
588+
TEventHandler.ev_pickup(self, worker, node)
589+
worker.abort()
590+
591+
def ev_hup(self, worker, node, rc):
592+
TEventHandler.ev_hup(self, worker, node, rc)
593+
self.testcase.assertEqual(rc, os.EX_PROTOCOL)
594+
595+
teh = TEventAbortOnPickupHandler(self)
596+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
597+
self.assertEqual(teh.ev_start_cnt, 1)
598+
self.assertEqual(teh.ev_pickup_cnt, 1)
599+
self.assertEqual(teh.ev_read_cnt, 0)
600+
self.assertEqual(teh.ev_written_cnt, 0)
601+
self.assertEqual(teh.ev_hup_cnt, 1)
602+
self.assertEqual(teh.ev_timedout_cnt, 0)
603+
self.assertEqual(teh.ev_close_cnt, 1)
604+
self.assertEqual(teh.last_read, None)
605+
606+
def test_tree_run_abort_on_read(self):
607+
"""test tree run abort on ev_read"""
608+
class TEventAbortOnReadHandler(TEventHandler):
609+
"""Test Event Abort On Start Handler"""
610+
611+
def __init__(self, testcase):
612+
TEventHandler.__init__(self)
613+
self.testcase = testcase
614+
615+
def ev_read(self, worker, node, sname, msg):
616+
TEventHandler.ev_read(self, worker, node, sname, msg)
617+
worker.abort()
618+
619+
def ev_hup(self, worker, node, rc):
620+
TEventHandler.ev_hup(self, worker, node, rc)
621+
self.testcase.assertEqual(rc, os.EX_PROTOCOL)
622+
623+
teh = TEventAbortOnReadHandler(self)
624+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
625+
self.assertEqual(teh.ev_start_cnt, 1)
626+
self.assertEqual(teh.ev_pickup_cnt, 1)
627+
self.assertEqual(teh.ev_read_cnt, 1)
628+
self.assertEqual(teh.ev_written_cnt, 0)
629+
self.assertEqual(teh.ev_hup_cnt, 1)
630+
self.assertEqual(teh.ev_timedout_cnt, 0)
631+
self.assertEqual(teh.ev_close_cnt, 1)
632+
self.assertEqual(teh.last_read, b'Lorem Ipsum')
633+
634+
def test_tree_run_abort_on_hup(self):
635+
"""test tree run abort on ev_hup"""
636+
class TEventAbortOnHupHandler(TEventHandler):
637+
"""Test Event Abort On Hup Handler"""
638+
639+
def __init__(self, testcase):
640+
TEventHandler.__init__(self)
641+
self.testcase = testcase
642+
643+
def ev_hup(self, worker, node, rc):
644+
TEventHandler.ev_hup(self, worker, node, rc)
645+
worker.abort()
646+
647+
teh = TEventAbortOnHupHandler(self)
648+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
649+
self.assertEqual(teh.ev_start_cnt, 1)
650+
self.assertEqual(teh.ev_pickup_cnt, 1)
651+
self.assertEqual(teh.ev_read_cnt, 1)
652+
self.assertEqual(teh.ev_written_cnt, 0)
653+
self.assertEqual(teh.ev_hup_cnt, 1)
654+
self.assertEqual(teh.ev_timedout_cnt, 0)
655+
self.assertEqual(teh.ev_close_cnt, 1)
656+
self.assertEqual(teh.last_read, b'Lorem Ipsum')
657+
658+
def test_tree_run_abort_on_close(self):
659+
"""test tree run abort on ev_close"""
660+
class TEventAbortOnCloseHandler(TEventHandler):
661+
"""Test Event Abort On Close Handler"""
662+
663+
def __init__(self, testcase):
664+
TEventHandler.__init__(self)
665+
self.testcase = testcase
666+
667+
def ev_close(self, worker, timedout):
668+
TEventHandler.ev_close(self, worker, timedout)
669+
self.testcase.assertEqual(type(worker), TreeWorker)
670+
worker.abort()
671+
672+
teh = TEventAbortOnCloseHandler(self)
673+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
674+
self.assertEqual(teh.ev_start_cnt, 1)
675+
self.assertEqual(teh.ev_pickup_cnt, 1)
676+
self.assertEqual(teh.ev_read_cnt, 1)
677+
self.assertEqual(teh.ev_written_cnt, 0)
678+
self.assertEqual(teh.ev_hup_cnt, 1)
679+
self.assertEqual(teh.ev_timedout_cnt, 0)
680+
self.assertEqual(teh.ev_close_cnt, 1)
681+
self.assertEqual(teh.last_read, b'Lorem Ipsum')
682+
683+
def test_tree_run_abort_on_timer(self):
684+
"""test tree run abort on timer"""
685+
class TEventAbortOnTimerHandler(TEventHandler):
686+
"""Test Event Abort On Timer Handler"""
687+
688+
def __init__(self, testcase):
689+
TEventHandler.__init__(self)
690+
self.testcase = testcase
691+
self.worker = None
692+
693+
def ev_timer(self, timer):
694+
self.worker.abort()
695+
696+
def ev_hup(self, worker, node, rc):
697+
TEventHandler.ev_hup(self, worker, node, rc)
698+
self.testcase.assertEqual(rc, os.EX_PROTOCOL)
699+
700+
# Test abort from a timer's event handler
701+
teh = TEventAbortOnTimerHandler(self)
702+
# channel might take some time to set up; hard to time it
703+
# we play it safe here and don't expect anything to read
704+
teh.worker = self.task.shell('sleep 10; echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh)
705+
timer1 = self.task.timer(3, handler=teh)
706+
self.task.run()
707+
self.assertEqual(teh.ev_start_cnt, 1)
708+
self.assertEqual(teh.ev_pickup_cnt, 1)
709+
self.assertEqual(teh.ev_read_cnt, 0)
710+
self.assertEqual(teh.ev_written_cnt, 0)
711+
self.assertEqual(teh.ev_hup_cnt, 1)
712+
self.assertEqual(teh.ev_timedout_cnt, 0)
713+
self.assertEqual(teh.ev_close_cnt, 1)
714+
self.assertEqual(teh.last_read, None)
715+
550716

551717
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
552718
class TreeWorkerGW2Test(TreeWorkerTestBase):

0 commit comments

Comments
 (0)