Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/ClusterShell/Communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def _draft_new(self, attributes):
StdErrMessage.ident: StdErrMessage,
RetcodeMessage.ident: RetcodeMessage,
TimeoutMessage.ident: TimeoutMessage,
RoutingMessage.ident: RoutingMessage,
}
try:
msg_type = attributes['type']
Expand Down Expand Up @@ -457,6 +458,19 @@ def __init__(self, nodes='', srcid=0):
self.attr.update({'nodes': str})
self.nodes = nodes

class RoutingMessage(RoutedMessageBase):
"""container message for routing notification"""
ident = 'RTR'

def __init__(self, event='', gateway='', targets='', srcid=0):
"""
"""
RoutedMessageBase.__init__(self, srcid)
self.attr.update({'event': str, 'gateway': str, 'targets': str})
self.event = event
self.gateway = gateway
self.targets = targets

class StartMessage(Message):
"""message indicating the start of a channel communication"""
ident = 'CHA'
Expand Down
10 changes: 9 additions & 1 deletion lib/ClusterShell/Gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from ClusterShell.Communication import Channel, ConfigurationMessage, \
ControlMessage, ACKMessage, ErrorMessage, StartMessage, EndMessage, \
StdOutMessage, StdErrMessage, RetcodeMessage, TimeoutMessage, \
MessageProcessingError
RoutingMessage, MessageProcessingError


def _gw_print_debug(task, line):
Expand Down Expand Up @@ -148,6 +148,14 @@ def ev_close(self, worker, timedout):
self.ev_timer(None)
self.timer.invalidate()

def _ev_routing(self, worker, arg):
"""
Routing event (private). Called to indicate that a (meta)worker has just
updated one of its route path.
"""
self.logger.debug("TreeWorkerResponder: ev_routing arg=%s", arg)
self.gwchan.send(RoutingMessage(**arg, srcid=self.srcwkr))


class GatewayChannel(Channel):
"""high level logic for gateways"""
Expand Down
26 changes: 12 additions & 14 deletions lib/ClusterShell/Propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@

from ClusterShell.Defaults import DEFAULTS
from ClusterShell.NodeSet import NodeSet
from ClusterShell.Communication import Channel
from ClusterShell.Communication import ControlMessage, StdOutMessage
from ClusterShell.Communication import StdErrMessage, RetcodeMessage
from ClusterShell.Communication import StartMessage, EndMessage
from ClusterShell.Communication import RoutedMessageBase, ErrorMessage
from ClusterShell.Communication import ConfigurationMessage, TimeoutMessage
from ClusterShell.Communication import (Channel, ControlMessage, StdOutMessage,
StdErrMessage, RetcodeMessage,
StartMessage, EndMessage,
RoutedMessageBase, ErrorMessage,
ConfigurationMessage, TimeoutMessage,
RoutingMessage)
from ClusterShell.Topology import TopologyError


Expand Down Expand Up @@ -376,19 +376,17 @@ def recv_ctl(self, msg):
self.logger.debug("TimeoutMessage for %s", msg.nodes)
for node in NodeSet(msg.nodes):
metaworker._on_remote_node_timeout(node, self.gateway)
elif msg.type == RoutingMessage.ident:
self.logger.debug("RoutingMessage for %s (gw %s)", msg.targets,
msg.gateway)
metaworker._on_routing_event({ "event": msg.event,
"gateway": msg.gateway,
"targets": msg.targets })
elif msg.type == ErrorMessage.ident:
# tree runtime error, could generate a new event later
raise TopologyError("%s: %s" % (self.gateway, msg.reason))
else:
self.logger.debug("recv_ctl: unhandled msg %s", msg)
"""
return
if self.ptree.upchannel is not None:
self.logger.debug("_state_gather ->upchan %s" % msg)
self.ptree.upchannel.send(msg) # send to according event handler passed by shell()
else:
assert False
"""

def ev_hup(self, worker, node, rc):
"""Channel command is closing"""
Expand Down
14 changes: 10 additions & 4 deletions lib/ClusterShell/Worker/Tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,13 +394,15 @@ def _relaunch(self, previous_gateway):
the relaunch is going to be performed using gateways (that's a feature).
"""
targets = NodeSet.fromlist(self.gwtargets[previous_gateway])
self.logger.debug("_relaunch on targets %s from previous_gateway %s",
self.logger.debug("_relaunch on targets %s from previous gateway %s",
targets, previous_gateway)

self.gwtargets[previous_gateway].difference_update(targets)

self._check_fini(previous_gateway)
self._target_count -= len(targets)
if self.eh is not None:
self.eh._ev_routing(self, { "event": "reroute",
"gateway": previous_gateway,
"targets": targets })
self._launch(targets)

def _engine_clients(self):
Expand Down Expand Up @@ -486,10 +488,14 @@ def _on_node_timeout(self, node):
self._close_count += 1
self._has_timeout = True

def _on_routing_event(self, arg):
self.logger.debug("_on_routing_event %s", arg)
self.eh._ev_routing(self, arg)

def _check_ini(self):
self.logger.debug("TreeWorker: _check_ini (%d, %d)", self._start_count,
self._child_count)
if self.eh and self._start_count >= self._child_count:
if self.eh is not None and self._start_count >= self._child_count:
# this part is called once
self.eh.ev_start(self)
# Blindly generate pickup events: this could maybe be improved, for
Expand Down
81 changes: 80 additions & 1 deletion tests/TreeWorkerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
Host your_hostname localhost 127.0.0.*
StrictHostKeyChecking no
LogLevel ERROR

The hostname mock command available in tests/bin needs to be
used on the remote nodes (tests/bin added to PATH in .bashrc).
"""

import logging
Expand All @@ -18,6 +21,8 @@
import warnings

from ClusterShell.NodeSet import NodeSet
from ClusterShell.Propagation import RouteResolvingError
from ClusterShell.Event import EventHandler
from ClusterShell.Task import task_self, task_terminate, task_wait
from ClusterShell.Task import Task, task_cleanup
from ClusterShell.Topology import TopologyGraph
Expand All @@ -36,7 +41,7 @@
NODE_FOREIGN = '127.0.0.5'


class TEventHandlerBase(object):
class TEventHandlerBase(EventHandler):
"""Base Test class for EventHandler"""

def __init__(self):
Expand Down Expand Up @@ -103,6 +108,16 @@ def ev_close(self, worker, timedout):
if timedout:
self.ev_timedout_cnt += 1

class TRoutingEventHandler(TEventHandler):
"""Test Routing Event Handler"""

def __init__(self):
TEventHandler.__init__(self)
self.routing_events = []

def _ev_routing(self, worker, arg):
self.routing_events.append((worker, arg))


class TreeWorkerTestBase(unittest.TestCase):
"""
Expand Down Expand Up @@ -348,6 +363,38 @@ def test_tree_run_event(self):
self.assertEqual(teh.ev_close_cnt, 1)
self.assertEqual(teh.last_read, b'Lorem Ipsum')

def test_tree_run_event_multiple(self):
"""test multiple tree runs with EventHandler (1.8+)"""
# Test for GH#566
teh = TEventHandler()
self.task.run('echo Lorem Ipsum Unum', nodes=NODE_DISTANT, handler=teh)
self.assertEqual(teh.ev_start_cnt, 1)
self.assertEqual(teh.ev_pickup_cnt, 1)
self.assertEqual(teh.ev_read_cnt, 1)
self.assertEqual(teh.ev_written_cnt, 0)
self.assertEqual(teh.ev_hup_cnt, 1)
self.assertEqual(teh.ev_timedout_cnt, 0)
self.assertEqual(teh.ev_close_cnt, 1)
self.assertEqual(teh.last_read, b'Lorem Ipsum Unum')
self.task.run('echo Lorem Ipsum Duo', nodes=NODE_DISTANT, handler=teh)
self.assertEqual(teh.ev_start_cnt, 2)
self.assertEqual(teh.ev_pickup_cnt, 2)
self.assertEqual(teh.ev_read_cnt, 2)
self.assertEqual(teh.ev_written_cnt, 0)
self.assertEqual(teh.ev_hup_cnt, 2)
self.assertEqual(teh.ev_timedout_cnt, 0)
self.assertEqual(teh.ev_close_cnt, 2)
self.assertEqual(teh.last_read, b'Lorem Ipsum Duo')
self.task.run('echo Lorem Ipsum Tres', nodes=NODE_DISTANT, handler=teh)
self.assertEqual(teh.ev_start_cnt, 3)
self.assertEqual(teh.ev_pickup_cnt, 3)
self.assertEqual(teh.ev_read_cnt, 3)
self.assertEqual(teh.ev_written_cnt, 0)
self.assertEqual(teh.ev_hup_cnt, 3)
self.assertEqual(teh.ev_timedout_cnt, 0)
self.assertEqual(teh.ev_close_cnt, 3)
self.assertEqual(teh.last_read, b'Lorem Ipsum Tres')

def test_tree_run_event_timeout(self):
"""test tree run with EventHandler (1.8+) with timeout"""
teh = TEventHandler()
Expand Down Expand Up @@ -713,6 +760,17 @@ def ev_hup(self, worker, node, rc):
self.assertEqual(teh.ev_close_cnt, 1)
self.assertEqual(teh.last_read, None)

def test_tree_gateway_bogus_single(self):
"""test tree run with bogus single gateway"""
# Part of GH#566
teh = TEventHandler()
os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE'] = '/test/bogus'
try:
self.assertRaises(RouteResolvingError, self.task.run, 'echo Lorem Ipsum',
nodes=NODE_DISTANT, handler=teh)
finally:
del os.environ['CLUSTERSHELL_GW_PYTHON_EXECUTABLE']


@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'")
class TreeWorkerGW2Test(TreeWorkerTestBase):
Expand Down Expand Up @@ -858,3 +916,24 @@ def test_tree_run_gw2f1_write_distant(self):
def test_tree_run_gw2f1_write_distant2_mt(self):
"""test tree run with write(), 1/2 gateways, distant 2 targets, separate thread"""
self._tree_run_write(NODE_DISTANT2, separate_thread=True)

def test_tree_run_gw2f1_reroute(self):
"""test tree run with reroute event, 1/2 gateways"""
teh = TRoutingEventHandler()
self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh)
self.assertEqual(len(teh.routing_events), 1)
worker, arg = teh.routing_events[0]
self.assertEqual(worker.command, "echo Lorem Ipsum")
self.assertEqual(arg["event"], "reroute")
self.assertIn(arg["targets"], NodeSet(NODE_DISTANT2))
# event handler checks
self.assertEqual(teh.ev_start_cnt, 1)
self.assertEqual(teh.ev_pickup_cnt, 2)
# read_cnt += 1 for gateway error on stderr (so currently not fully
# transparent to the user)
self.assertEqual(teh.ev_read_cnt, 3)
self.assertEqual(teh.ev_written_cnt, 0)
self.assertEqual(teh.ev_hup_cnt, 2)
self.assertEqual(teh.ev_timedout_cnt, 0)
self.assertEqual(teh.ev_close_cnt, 1)
self.assertEqual(teh.last_read, b'Lorem Ipsum')
Loading