diff --git a/lib/ClusterShell/Communication.py b/lib/ClusterShell/Communication.py index 70290257..d817f0ba 100644 --- a/lib/ClusterShell/Communication.py +++ b/lib/ClusterShell/Communication.py @@ -139,6 +139,7 @@ def _draft_new(self, attributes): StdErrMessage.ident: StdErrMessage, RetcodeMessage.ident: RetcodeMessage, TimeoutMessage.ident: TimeoutMessage, + RoutingMessage.ident: RoutingMessage, } try: msg_type = attributes['type'] @@ -457,6 +458,19 @@ def __init__(self, nodes='', srcid=0): self.attr.update({'nodes': str}) self.nodes = nodes +class RoutingMessage(RoutedMessageBase): + """container message for routing notification""" + ident = 'RTR' + + def __init__(self, event='', gateway='', targets='', srcid=0): + """ + """ + RoutedMessageBase.__init__(self, srcid) + self.attr.update({'event': str, 'gateway': str, 'targets': str}) + self.event = event + self.gateway = gateway + self.targets = targets + class StartMessage(Message): """message indicating the start of a channel communication""" ident = 'CHA' diff --git a/lib/ClusterShell/Gateway.py b/lib/ClusterShell/Gateway.py index 94379e06..c025a3f7 100755 --- a/lib/ClusterShell/Gateway.py +++ b/lib/ClusterShell/Gateway.py @@ -40,7 +40,7 @@ from ClusterShell.Communication import Channel, ConfigurationMessage, \ ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \ StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \ - MessageProcessingError + RoutingMessage, MessageProcessingError def _gw_print_debug(task, line): @@ -148,6 +148,14 @@ def ev_close(self, worker, timedout): self.ev_timer(None) self.timer.invalidate() + def _ev_routing(self, worker, arg): + """ + Routing event (private). Called to indicate that a (meta)worker has just + updated one of its route path. + """ + self.logger.debug("TreeWorkerResponder: ev_routing arg=%s", arg) + self.gwchan.send(RoutingMessage(**arg, srcid=self.srcwkr)) + class GatewayChannel(Channel): """high level logic for gateways""" diff --git a/lib/ClusterShell/Propagation.py b/lib/ClusterShell/Propagation.py index a53ae010..7c3f07dc 100644 --- a/lib/ClusterShell/Propagation.py +++ b/lib/ClusterShell/Propagation.py @@ -29,12 +29,12 @@ from ClusterShell.Defaults import DEFAULTS from ClusterShell.NodeSet import NodeSet -from ClusterShell.Communication import Channel -from ClusterShell.Communication import ControlMessage, StdOutMessage -from ClusterShell.Communication import StdErrMessage, RetcodeMessage -from ClusterShell.Communication import StartMessage, EndMessage -from ClusterShell.Communication import RoutedMessageBase, ErrorMessage -from ClusterShell.Communication import ConfigurationMessage, TimeoutMessage +from ClusterShell.Communication import (Channel, ControlMessage, StdOutMessage, + StdErrMessage, RetcodeMessage, + StartMessage, EndMessage, + RoutedMessageBase, ErrorMessage, + ConfigurationMessage, TimeoutMessage, + RoutingMessage) from ClusterShell.Topology import TopologyError @@ -376,19 +376,17 @@ def recv_ctl(self, msg): self.logger.debug("TimeoutMessage for %s", msg.nodes) for node in NodeSet(msg.nodes): metaworker._on_remote_node_timeout(node, self.gateway) + elif msg.type == RoutingMessage.ident: + self.logger.debug("RoutingMessage for %s (gw %s)", msg.targets, + msg.gateway) + metaworker._on_routing_event({ "event": msg.event, + "gateway": msg.gateway, + "targets": msg.targets }) elif msg.type == ErrorMessage.ident: # tree runtime error, could generate a new event later raise TopologyError("%s: %s" % (self.gateway, msg.reason)) else: self.logger.debug("recv_ctl: unhandled msg %s", msg) - """ - return - if self.ptree.upchannel is not None: - self.logger.debug("_state_gather ->upchan %s" % msg) - self.ptree.upchannel.send(msg) # send to according event handler passed by shell() - else: - assert False - """ def ev_hup(self, worker, node, rc): """Channel command is closing""" diff --git a/lib/ClusterShell/Worker/Tree.py b/lib/ClusterShell/Worker/Tree.py index ddb42873..da130696 100644 --- a/lib/ClusterShell/Worker/Tree.py +++ b/lib/ClusterShell/Worker/Tree.py @@ -394,13 +394,15 @@ def _relaunch(self, previous_gateway): the relaunch is going to be performed using gateways (that's a feature). """ targets = NodeSet.fromlist(self.gwtargets[previous_gateway]) - self.logger.debug("_relaunch on targets %s from previous_gateway %s", + self.logger.debug("_relaunch on targets %s from previous gateway %s", targets, previous_gateway) - self.gwtargets[previous_gateway].difference_update(targets) - self._check_fini(previous_gateway) self._target_count -= len(targets) + if self.eh is not None: + self.eh._ev_routing(self, { "event": "reroute", + "gateway": previous_gateway, + "targets": targets }) self._launch(targets) def _engine_clients(self): @@ -486,10 +488,14 @@ def _on_node_timeout(self, node): self._close_count += 1 self._has_timeout = True + def _on_routing_event(self, arg): + self.logger.debug("_on_routing_event %s", arg) + self.eh._ev_routing(self, arg) + def _check_ini(self): self.logger.debug("TreeWorker: _check_ini (%d, %d)", self._start_count, self._child_count) - if self.eh and self._start_count >= self._child_count: + if self.eh is not None and self._start_count >= self._child_count: # this part is called once self.eh.ev_start(self) # Blindly generate pickup events: this could maybe be improved, for diff --git a/tests/TreeWorkerTest.py b/tests/TreeWorkerTest.py index 469e1e79..cb10ab56 100644 --- a/tests/TreeWorkerTest.py +++ b/tests/TreeWorkerTest.py @@ -9,6 +9,9 @@ Host your_hostname localhost 127.0.0.* StrictHostKeyChecking no LogLevel ERROR + +The hostname mock command available in tests/bin needs to be +used on the remote nodes (tests/bin added to PATH in .bashrc). """ import logging @@ -18,6 +21,8 @@ import warnings from ClusterShell.NodeSet import NodeSet +from ClusterShell.Propagation import RouteResolvingError +from ClusterShell.Event import EventHandler from ClusterShell.Task import task_self, task_terminate, task_wait from ClusterShell.Task import Task, task_cleanup from ClusterShell.Topology import TopologyGraph @@ -36,7 +41,7 @@ NODE_FOREIGN = '127.0.0.5' -class TEventHandlerBase(object): +class TEventHandlerBase(EventHandler): """Base Test class for EventHandler""" def __init__(self): @@ -103,6 +108,16 @@ def ev_close(self, worker, timedout): if timedout: self.ev_timedout_cnt += 1 +class TRoutingEventHandler(TEventHandler): + """Test Routing Event Handler""" + + def __init__(self): + TEventHandler.__init__(self) + self.routing_events = [] + + def _ev_routing(self, worker, arg): + self.routing_events.append((worker, arg)) + class TreeWorkerTestBase(unittest.TestCase): """ @@ -348,6 +363,38 @@ def test_tree_run_event(self): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, b'Lorem Ipsum') + def test_tree_run_event_multiple(self): + """test multiple tree runs with EventHandler (1.8+)""" + # Test for GH#566 + teh = TEventHandler() + self.task.run('echo Lorem Ipsum Unum', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 1) + self.assertEqual(teh.ev_read_cnt, 1) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 1) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum Unum') + self.task.run('echo Lorem Ipsum Duo', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 2) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 2) + self.assertEqual(teh.last_read, b'Lorem Ipsum Duo') + self.task.run('echo Lorem Ipsum Tres', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 3) + self.assertEqual(teh.ev_pickup_cnt, 3) + self.assertEqual(teh.ev_read_cnt, 3) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 3) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 3) + self.assertEqual(teh.last_read, b'Lorem Ipsum Tres') + def test_tree_run_event_timeout(self): """test tree run with EventHandler (1.8+) with timeout""" teh = TEventHandler() @@ -713,6 +760,17 @@ def ev_hup(self, worker, node, rc): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, None) + def test_tree_gateway_bogus_single(self): + """test tree run with bogus single gateway""" + # Part of GH#566 + teh = TEventHandler() + os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] = '/test/bogus' + try: + self.assertRaises(RouteResolvingError, self.task.run, 'echo Lorem Ipsum', + nodes=NODE_DISTANT, handler=teh) + finally: + del os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] + @unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") class TreeWorkerGW2Test(TreeWorkerTestBase): @@ -858,3 +916,24 @@ def test_tree_run_gw2f1_write_distant(self): def test_tree_run_gw2f1_write_distant2_mt(self): """test tree run with write(), 1/2 gateways, distant 2 targets, separate thread""" self._tree_run_write(NODE_DISTANT2, separate_thread=True) + + def test_tree_run_gw2f1_reroute(self): + """test tree run with reroute event, 1/2 gateways""" + teh = TRoutingEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(len(teh.routing_events), 1) + worker, arg = teh.routing_events[0] + self.assertEqual(worker.command, "echo Lorem Ipsum") + self.assertEqual(arg["event"], "reroute") + self.assertIn(arg["targets"], NodeSet(NODE_DISTANT2)) + # event handler checks + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + # read_cnt += 1 for gateway error on stderr (so currently not fully + # transparent to the user) + self.assertEqual(teh.ev_read_cnt, 3) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum')