From da3175123a4be5da73c3f0e92a4ca64a717aafde Mon Sep 17 00:00:00 2001 From: Stephane Thiell Date: Mon, 4 Aug 2025 10:51:59 -0700 Subject: [PATCH] Tree: invoke _ev_routing() on relaunch and add tests EventHandler._ev_routing() has been defined since CS 1.6 or so, but was never called. It can be used by the user program to get notified for tree related propagation events. For now it is relatively simple and we only generate "reroute" events when a gateway is failing and commands are tentatively redistributed to other gateways. Note that with this change, the parents nodes need to be upgraded first, otherwise they won't understand the new RoutingMessage coming from the gateways. --- lib/ClusterShell/Communication.py | 14 ++++++ lib/ClusterShell/Gateway.py | 10 +++- lib/ClusterShell/Propagation.py | 26 +++++----- lib/ClusterShell/Worker/Tree.py | 14 ++++-- tests/TreeWorkerTest.py | 81 ++++++++++++++++++++++++++++++- 5 files changed, 125 insertions(+), 20 deletions(-) diff --git a/lib/ClusterShell/Communication.py b/lib/ClusterShell/Communication.py index 70290257..d817f0ba 100644 --- a/lib/ClusterShell/Communication.py +++ b/lib/ClusterShell/Communication.py @@ -139,6 +139,7 @@ def _draft_new(self, attributes): StdErrMessage.ident: StdErrMessage, RetcodeMessage.ident: RetcodeMessage, TimeoutMessage.ident: TimeoutMessage, + RoutingMessage.ident: RoutingMessage, } try: msg_type = attributes['type'] @@ -457,6 +458,19 @@ def __init__(self, nodes='', srcid=0): self.attr.update({'nodes': str}) self.nodes = nodes +class RoutingMessage(RoutedMessageBase): + """container message for routing notification""" + ident = 'RTR' + + def __init__(self, event='', gateway='', targets='', srcid=0): + """ + """ + RoutedMessageBase.__init__(self, srcid) + self.attr.update({'event': str, 'gateway': str, 'targets': str}) + self.event = event + self.gateway = gateway + self.targets = targets + class StartMessage(Message): """message indicating the start of a channel communication""" ident = 'CHA' diff --git a/lib/ClusterShell/Gateway.py b/lib/ClusterShell/Gateway.py index 94379e06..c025a3f7 100755 --- a/lib/ClusterShell/Gateway.py +++ b/lib/ClusterShell/Gateway.py @@ -40,7 +40,7 @@ from ClusterShell.Communication import Channel, ConfigurationMessage, \ ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \ StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \ - MessageProcessingError + RoutingMessage, MessageProcessingError def _gw_print_debug(task, line): @@ -148,6 +148,14 @@ def ev_close(self, worker, timedout): self.ev_timer(None) self.timer.invalidate() + def _ev_routing(self, worker, arg): + """ + Routing event (private). Called to indicate that a (meta)worker has just + updated one of its route path. + """ + self.logger.debug("TreeWorkerResponder: ev_routing arg=%s", arg) + self.gwchan.send(RoutingMessage(**arg, srcid=self.srcwkr)) + class GatewayChannel(Channel): """high level logic for gateways""" diff --git a/lib/ClusterShell/Propagation.py b/lib/ClusterShell/Propagation.py index a53ae010..7c3f07dc 100644 --- a/lib/ClusterShell/Propagation.py +++ b/lib/ClusterShell/Propagation.py @@ -29,12 +29,12 @@ from ClusterShell.Defaults import DEFAULTS from ClusterShell.NodeSet import NodeSet -from ClusterShell.Communication import Channel -from ClusterShell.Communication import ControlMessage, StdOutMessage -from ClusterShell.Communication import StdErrMessage, RetcodeMessage -from ClusterShell.Communication import StartMessage, EndMessage -from ClusterShell.Communication import RoutedMessageBase, ErrorMessage -from ClusterShell.Communication import ConfigurationMessage, TimeoutMessage +from ClusterShell.Communication import (Channel, ControlMessage, StdOutMessage, + StdErrMessage, RetcodeMessage, + StartMessage, EndMessage, + RoutedMessageBase, ErrorMessage, + ConfigurationMessage, TimeoutMessage, + RoutingMessage) from ClusterShell.Topology import TopologyError @@ -376,19 +376,17 @@ def recv_ctl(self, msg): self.logger.debug("TimeoutMessage for %s", msg.nodes) for node in NodeSet(msg.nodes): metaworker._on_remote_node_timeout(node, self.gateway) + elif msg.type == RoutingMessage.ident: + self.logger.debug("RoutingMessage for %s (gw %s)", msg.targets, + msg.gateway) + metaworker._on_routing_event({ "event": msg.event, + "gateway": msg.gateway, + "targets": msg.targets }) elif msg.type == ErrorMessage.ident: # tree runtime error, could generate a new event later raise TopologyError("%s: %s" % (self.gateway, msg.reason)) else: self.logger.debug("recv_ctl: unhandled msg %s", msg) - """ - return - if self.ptree.upchannel is not None: - self.logger.debug("_state_gather ->upchan %s" % msg) - self.ptree.upchannel.send(msg) # send to according event handler passed by shell() - else: - assert False - """ def ev_hup(self, worker, node, rc): """Channel command is closing""" diff --git a/lib/ClusterShell/Worker/Tree.py b/lib/ClusterShell/Worker/Tree.py index ddb42873..da130696 100644 --- a/lib/ClusterShell/Worker/Tree.py +++ b/lib/ClusterShell/Worker/Tree.py @@ -394,13 +394,15 @@ def _relaunch(self, previous_gateway): the relaunch is going to be performed using gateways (that's a feature). """ targets = NodeSet.fromlist(self.gwtargets[previous_gateway]) - self.logger.debug("_relaunch on targets %s from previous_gateway %s", + self.logger.debug("_relaunch on targets %s from previous gateway %s", targets, previous_gateway) - self.gwtargets[previous_gateway].difference_update(targets) - self._check_fini(previous_gateway) self._target_count -= len(targets) + if self.eh is not None: + self.eh._ev_routing(self, { "event": "reroute", + "gateway": previous_gateway, + "targets": targets }) self._launch(targets) def _engine_clients(self): @@ -486,10 +488,14 @@ def _on_node_timeout(self, node): self._close_count += 1 self._has_timeout = True + def _on_routing_event(self, arg): + self.logger.debug("_on_routing_event %s", arg) + self.eh._ev_routing(self, arg) + def _check_ini(self): self.logger.debug("TreeWorker: _check_ini (%d, %d)", self._start_count, self._child_count) - if self.eh and self._start_count >= self._child_count: + if self.eh is not None and self._start_count >= self._child_count: # this part is called once self.eh.ev_start(self) # Blindly generate pickup events: this could maybe be improved, for diff --git a/tests/TreeWorkerTest.py b/tests/TreeWorkerTest.py index 469e1e79..cb10ab56 100644 --- a/tests/TreeWorkerTest.py +++ b/tests/TreeWorkerTest.py @@ -9,6 +9,9 @@ Host your_hostname localhost 127.0.0.* StrictHostKeyChecking no LogLevel ERROR + +The hostname mock command available in tests/bin needs to be +used on the remote nodes (tests/bin added to PATH in .bashrc). """ import logging @@ -18,6 +21,8 @@ import warnings from ClusterShell.NodeSet import NodeSet +from ClusterShell.Propagation import RouteResolvingError +from ClusterShell.Event import EventHandler from ClusterShell.Task import task_self, task_terminate, task_wait from ClusterShell.Task import Task, task_cleanup from ClusterShell.Topology import TopologyGraph @@ -36,7 +41,7 @@ NODE_FOREIGN = '127.0.0.5' -class TEventHandlerBase(object): +class TEventHandlerBase(EventHandler): """Base Test class for EventHandler""" def __init__(self): @@ -103,6 +108,16 @@ def ev_close(self, worker, timedout): if timedout: self.ev_timedout_cnt += 1 +class TRoutingEventHandler(TEventHandler): + """Test Routing Event Handler""" + + def __init__(self): + TEventHandler.__init__(self) + self.routing_events = [] + + def _ev_routing(self, worker, arg): + self.routing_events.append((worker, arg)) + class TreeWorkerTestBase(unittest.TestCase): """ @@ -348,6 +363,38 @@ def test_tree_run_event(self): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, b'Lorem Ipsum') + def test_tree_run_event_multiple(self): + """test multiple tree runs with EventHandler (1.8+)""" + # Test for GH#566 + teh = TEventHandler() + self.task.run('echo Lorem Ipsum Unum', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 1) + self.assertEqual(teh.ev_read_cnt, 1) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 1) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum Unum') + self.task.run('echo Lorem Ipsum Duo', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 2) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 2) + self.assertEqual(teh.last_read, b'Lorem Ipsum Duo') + self.task.run('echo Lorem Ipsum Tres', nodes=NODE_DISTANT, handler=teh) + self.assertEqual(teh.ev_start_cnt, 3) + self.assertEqual(teh.ev_pickup_cnt, 3) + self.assertEqual(teh.ev_read_cnt, 3) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 3) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 3) + self.assertEqual(teh.last_read, b'Lorem Ipsum Tres') + def test_tree_run_event_timeout(self): """test tree run with EventHandler (1.8+) with timeout""" teh = TEventHandler() @@ -713,6 +760,17 @@ def ev_hup(self, worker, node, rc): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, None) + def test_tree_gateway_bogus_single(self): + """test tree run with bogus single gateway""" + # Part of GH#566 + teh = TEventHandler() + os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] = '/test/bogus' + try: + self.assertRaises(RouteResolvingError, self.task.run, 'echo Lorem Ipsum', + nodes=NODE_DISTANT, handler=teh) + finally: + del os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] + @unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") class TreeWorkerGW2Test(TreeWorkerTestBase): @@ -858,3 +916,24 @@ def test_tree_run_gw2f1_write_distant(self): def test_tree_run_gw2f1_write_distant2_mt(self): """test tree run with write(), 1/2 gateways, distant 2 targets, separate thread""" self._tree_run_write(NODE_DISTANT2, separate_thread=True) + + def test_tree_run_gw2f1_reroute(self): + """test tree run with reroute event, 1/2 gateways""" + teh = TRoutingEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(len(teh.routing_events), 1) + worker, arg = teh.routing_events[0] + self.assertEqual(worker.command, "echo Lorem Ipsum") + self.assertEqual(arg["event"], "reroute") + self.assertIn(arg["targets"], NodeSet(NODE_DISTANT2)) + # event handler checks + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + # read_cnt += 1 for gateway error on stderr (so currently not fully + # transparent to the user) + self.assertEqual(teh.ev_read_cnt, 3) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum')