Skip to content

Commit f903a68

Browse files
committed
Tree: invoke _ev_routing() on relaunch and add tests
EventHandler._ev_routing() has been defined since CS 1.6 or so, but was never called. It can be used by the user program to get notified for tree related propagation events. For now it is relatively simple and we only generate "reroute" events when a gateway is failing and commands are tentatively redistributed to other gateways. Note that with this change, the parents nodes need to be upgraded first, otherwise they won't understand the new RoutingMessage coming from the gateways.
1 parent d134fa0 commit f903a68

File tree

5 files changed

+126
-12
lines changed

5 files changed

+126
-12
lines changed

lib/ClusterShell/Communication.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def _draft_new(self, attributes):
139139
StdErrMessage.ident: StdErrMessage,
140140
RetcodeMessage.ident: RetcodeMessage,
141141
TimeoutMessage.ident: TimeoutMessage,
142+
RoutingMessage.ident: RoutingMessage,
142143
}
143144
try:
144145
msg_type = attributes['type']
@@ -457,6 +458,19 @@ def __init__(self, nodes='', srcid=0):
457458
self.attr.update({'nodes': str})
458459
self.nodes = nodes
459460

461+
class RoutingMessage(RoutedMessageBase):
462+
"""container message for routing notification"""
463+
ident = 'RTR'
464+
465+
def __init__(self, event='', gateway='', targets='', srcid=0):
466+
"""
467+
"""
468+
RoutedMessageBase.__init__(self, srcid)
469+
self.attr.update({'event': str, 'gateway': str, 'targets': str})
470+
self.event = event
471+
self.gateway = gateway
472+
self.targets = targets
473+
460474
class StartMessage(Message):
461475
"""message indicating the start of a channel communication"""
462476
ident = 'CHA'

lib/ClusterShell/Gateway.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
from ClusterShell.Communication import Channel, ConfigurationMessage, \
4141
ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \
4242
StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \
43-
MessageProcessingError
43+
RoutingMessage, MessageProcessingError
4444

4545

4646
def _gw_print_debug(task, line):
@@ -148,6 +148,14 @@ def ev_close(self, worker, timedout):
148148
self.ev_timer(None)
149149
self.timer.invalidate()
150150

151+
def _ev_routing(self, worker, arg):
152+
"""
153+
Routing event (private). Called to indicate that a (meta)worker has just
154+
updated one of its route path.
155+
"""
156+
self.logger.debug("TreeWorkerResponder: ev_routing arg=%s", arg)
157+
self.gwchan.send(RoutingMessage(**arg, srcid=self.srcwkr))
158+
151159

152160
class GatewayChannel(Channel):
153161
"""high level logic for gateways"""

lib/ClusterShell/Propagation.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@
2929

3030
from ClusterShell.Defaults import DEFAULTS
3131
from ClusterShell.NodeSet import NodeSet
32-
from ClusterShell.Communication import Channel
33-
from ClusterShell.Communication import ControlMessage, StdOutMessage
34-
from ClusterShell.Communication import StdErrMessage, RetcodeMessage
35-
from ClusterShell.Communication import StartMessage, EndMessage
36-
from ClusterShell.Communication import RoutedMessageBase, ErrorMessage
37-
from ClusterShell.Communication import ConfigurationMessage, TimeoutMessage
32+
from ClusterShell.Communication import (Channel, ControlMessage, StdOutMessage,
33+
StdErrMessage, RetcodeMessage,
34+
StartMessage, EndMessage,
35+
RoutedMessageBase, ErrorMessage,
36+
ConfigurationMessage, TimeoutMessage,
37+
RoutingMessage)
3838
from ClusterShell.Topology import TopologyError
3939

4040

@@ -376,6 +376,13 @@ def recv_ctl(self, msg):
376376
self.logger.debug("TimeoutMessage for %s", msg.nodes)
377377
for node in NodeSet(msg.nodes):
378378
metaworker._on_remote_node_timeout(node, self.gateway)
379+
elif msg.type == RoutingMessage.ident:
380+
self.logger.debug("RoutingMessage for %s (gw %s)", msg.targets,
381+
msg.gateway)
382+
routing_arg = { "event": msg.event,
383+
"targets": msg.targets,
384+
"gateway": msg.gateway }
385+
metaworker._on_routing_event(routing_arg)
379386
elif msg.type == ErrorMessage.ident:
380387
# tree runtime error, could generate a new event later
381388
raise TopologyError("%s: %s" % (self.gateway, msg.reason))

lib/ClusterShell/Worker/Tree.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -394,13 +394,15 @@ def _relaunch(self, previous_gateway):
394394
the relaunch is going to be performed using gateways (that's a feature).
395395
"""
396396
targets = NodeSet.fromlist(self.gwtargets[previous_gateway])
397-
self.logger.debug("_relaunch on targets %s from previous_gateway %s",
397+
self.logger.debug("_relaunch on targets %s from previous gateway %s",
398398
targets, previous_gateway)
399-
400399
self.gwtargets[previous_gateway].difference_update(targets)
401-
402400
self._check_fini(previous_gateway)
403401
self._target_count -= len(targets)
402+
if self.eh is not None:
403+
self.eh._ev_routing(self, { "event": "reroute",
404+
"gateway": previous_gateway,
405+
"targets": targets })
404406
self._launch(targets)
405407

406408
def _engine_clients(self):
@@ -486,10 +488,14 @@ def _on_node_timeout(self, node):
486488
self._close_count += 1
487489
self._has_timeout = True
488490

491+
def _on_routing_event(self, arg):
492+
self.logger.debug("_on_routing_event %s", arg)
493+
self.eh._ev_routing(self, arg)
494+
489495
def _check_ini(self):
490496
self.logger.debug("TreeWorker: _check_ini (%d, %d)", self._start_count,
491497
self._child_count)
492-
if self.eh and self._start_count >= self._child_count:
498+
if self.eh is not None and self._start_count >= self._child_count:
493499
# this part is called once
494500
self.eh.ev_start(self)
495501
# Blindly generate pickup events: this could maybe be improved, for

tests/TreeWorkerTest.py

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99
Host your_hostname localhost 127.0.0.*
1010
StrictHostKeyChecking no
1111
LogLevel ERROR
12+
13+
The hostname mock command available in tests/bin needs to be
14+
used on the remote nodes (tests/bin added to PATH in .bashrc).
1215
"""
1316

1417
import logging
@@ -18,6 +21,8 @@
1821
import warnings
1922

2023
from ClusterShell.NodeSet import NodeSet
24+
from ClusterShell.Propagation import RouteResolvingError
25+
from ClusterShell.Event import EventHandler
2126
from ClusterShell.Task import task_self, task_terminate, task_wait
2227
from ClusterShell.Task import Task, task_cleanup
2328
from ClusterShell.Topology import TopologyGraph
@@ -36,7 +41,7 @@
3641
NODE_FOREIGN = '127.0.0.5'
3742

3843

39-
class TEventHandlerBase(object):
44+
class TEventHandlerBase(EventHandler):
4045
"""Base Test class for EventHandler"""
4146

4247
def __init__(self):
@@ -103,6 +108,16 @@ def ev_close(self, worker, timedout):
103108
if timedout:
104109
self.ev_timedout_cnt += 1
105110

111+
class TRoutingEventHandler(TEventHandler):
112+
"""Test Routing Event Handler"""
113+
114+
def __init__(self):
115+
TEventHandler.__init__(self)
116+
self.routing_events = []
117+
118+
def _ev_routing(self, worker, arg):
119+
self.routing_events.append((worker, arg))
120+
106121

107122
class TreeWorkerTestBase(unittest.TestCase):
108123
"""
@@ -348,6 +363,38 @@ def test_tree_run_event(self):
348363
self.assertEqual(teh.ev_close_cnt, 1)
349364
self.assertEqual(teh.last_read, b'Lorem Ipsum')
350365

366+
def test_tree_run_event_multiple(self):
367+
"""test multiple tree runs with EventHandler (1.8+)"""
368+
# Test for GH#566
369+
teh = TEventHandler()
370+
self.task.run('echo Lorem Ipsum Unum', nodes=NODE_DISTANT, handler=teh)
371+
self.assertEqual(teh.ev_start_cnt, 1)
372+
self.assertEqual(teh.ev_pickup_cnt, 1)
373+
self.assertEqual(teh.ev_read_cnt, 1)
374+
self.assertEqual(teh.ev_written_cnt, 0)
375+
self.assertEqual(teh.ev_hup_cnt, 1)
376+
self.assertEqual(teh.ev_timedout_cnt, 0)
377+
self.assertEqual(teh.ev_close_cnt, 1)
378+
self.assertEqual(teh.last_read, b'Lorem Ipsum Unum')
379+
self.task.run('echo Lorem Ipsum Duo', nodes=NODE_DISTANT, handler=teh)
380+
self.assertEqual(teh.ev_start_cnt, 2)
381+
self.assertEqual(teh.ev_pickup_cnt, 2)
382+
self.assertEqual(teh.ev_read_cnt, 2)
383+
self.assertEqual(teh.ev_written_cnt, 0)
384+
self.assertEqual(teh.ev_hup_cnt, 2)
385+
self.assertEqual(teh.ev_timedout_cnt, 0)
386+
self.assertEqual(teh.ev_close_cnt, 2)
387+
self.assertEqual(teh.last_read, b'Lorem Ipsum Duo')
388+
self.task.run('echo Lorem Ipsum Tres', nodes=NODE_DISTANT, handler=teh)
389+
self.assertEqual(teh.ev_start_cnt, 3)
390+
self.assertEqual(teh.ev_pickup_cnt, 3)
391+
self.assertEqual(teh.ev_read_cnt, 3)
392+
self.assertEqual(teh.ev_written_cnt, 0)
393+
self.assertEqual(teh.ev_hup_cnt, 3)
394+
self.assertEqual(teh.ev_timedout_cnt, 0)
395+
self.assertEqual(teh.ev_close_cnt, 3)
396+
self.assertEqual(teh.last_read, b'Lorem Ipsum Tres')
397+
351398
def test_tree_run_event_timeout(self):
352399
"""test tree run with EventHandler (1.8+) with timeout"""
353400
teh = TEventHandler()
@@ -713,6 +760,17 @@ def ev_hup(self, worker, node, rc):
713760
self.assertEqual(teh.ev_close_cnt, 1)
714761
self.assertEqual(teh.last_read, None)
715762

763+
def test_tree_gateway_bogus_single(self):
764+
"""test tree run with bogus single gateway"""
765+
# Part of GH#566
766+
teh = TEventHandler()
767+
os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] = '/test/bogus'
768+
try:
769+
self.assertRaises(RouteResolvingError, self.task.run, 'echo Lorem Ipsum',
770+
nodes=NODE_DISTANT, handler=teh)
771+
finally:
772+
del os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE']
773+
716774

717775
@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
718776
class TreeWorkerGW2Test(TreeWorkerTestBase):
@@ -858,3 +916,24 @@ def test_tree_run_gw2f1_write_distant(self):
858916
def test_tree_run_gw2f1_write_distant2_mt(self):
859917
"""test tree run with write(), 1/2 gateways, distant 2 targets, separate thread"""
860918
self._tree_run_write(NODE_DISTANT2, separate_thread=True)
919+
920+
def test_tree_run_gw2f1_reroute(self):
921+
"""test tree run with reroute event, 1/2 gateways"""
922+
teh = TRoutingEventHandler()
923+
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh)
924+
self.assertEqual(len(teh.routing_events), 1)
925+
worker, arg = teh.routing_events[0]
926+
self.assertEqual(worker.command, "echo Lorem Ipsum")
927+
self.assertEqual(arg["event"], "reroute")
928+
self.assertIn(arg["targets"], NodeSet(NODE_DISTANT2))
929+
# event handler checks
930+
self.assertEqual(teh.ev_start_cnt, 1)
931+
self.assertEqual(teh.ev_pickup_cnt, 2)
932+
# read_cnt += 1 for gateway error on stderr (so currently not fully
933+
# transparent to the user)
934+
self.assertEqual(teh.ev_read_cnt, 3)
935+
self.assertEqual(teh.ev_written_cnt, 0)
936+
self.assertEqual(teh.ev_hup_cnt, 2)
937+
self.assertEqual(teh.ev_timedout_cnt, 0)
938+
self.assertEqual(teh.ev_close_cnt, 1)
939+
self.assertEqual(teh.last_read, b'Lorem Ipsum')

0 commit comments

Comments
 (0)