Skip to content

Commit 4018851

Browse files
committed
Tree: support offline gateways (#260)
With this change, WorkerTree is now able to safely repropagate commands when the communication channel to a specific gateway cannot be established at all. Such offline gateways were already marked as unreachable, but the 'relaunch' code was missing. A new WorkerTree._relaunch() method has been added for that purpose. Note that if a gateway fails once a propagation channel is "set up", the remote commands cannot be safely repropagated at this point and so will be seen as failed, but this was already the case. Closes #260 Change-Id: I2be52b01638de95f58cedc0f2671f02cfffe8297
1 parent 5fa13e3 commit 4018851

File tree

6 files changed

+42
-27
lines changed

6 files changed

+42
-27
lines changed

lib/ClusterShell/CLI/Error.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from ClusterShell.NodeUtils import GroupSourceNoUpcall
4545
from ClusterShell.NodeSet import NodeSetExternalError, NodeSetParseError
4646
from ClusterShell.NodeSet import RangeSetParseError
47+
from ClusterShell.Propagation import RouteResolvingError
4748
from ClusterShell.Topology import TopologyError
4849
from ClusterShell.Worker.EngineClient import EngineClientError
4950
from ClusterShell.Worker.Worker import WorkerError
@@ -59,6 +60,7 @@
5960
GroupResolverSourceError,
6061
GroupSourceError,
6162
GroupSourceNoUpcall,
63+
RouteResolvingError,
6264
TopologyError,
6365
TypeError,
6466
IOError,
@@ -93,7 +95,7 @@ def handle_generic_error(excobj, prog=os.path.basename(sys.argv[0])):
9395
print(msgfmt % (prog, exc, exc.group_source.name), file=sys.stderr)
9496
except GroupSourceError as exc:
9597
print("%s: Group error: %s" % (prog, exc), file=sys.stderr)
96-
except TopologyError as exc:
98+
except (RouteResolvingError, TopologyError) as exc:
9799
print("%s: TREE MODE: %s" % (prog, exc), file=sys.stderr)
98100
except configparser.Error as exc:
99101
print("%s: %s" % (prog, exc), file=sys.stderr)

lib/ClusterShell/Communication.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,10 @@ def _open(self):
204204
def _close(self):
205205
"""close an already opened channel"""
206206
send_endtag = self.opened
207-
# set to False before sending tag for state test purposes
208-
self.opened = self.setup = False
209207
if send_endtag:
210208
XMLGenerator(self.worker, encoding=ENCODING).endElement('channel')
211209
self.worker.abort()
210+
self.opened = self.setup = False
212211

213212
def ev_start(self, worker):
214213
"""connection established. Open higher level channel"""

lib/ClusterShell/Propagation.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (C) 2010-2016 CEA/DAM
33
# Copyright (C) 2010-2011 Henri Doreau <[email protected]>
4-
# Copyright (C) 2015-2017 Stephane Thiell <[email protected]>
4+
# Copyright (C) 2015-2018 Stephane Thiell <[email protected]>
55
#
66
# This file is part of ClusterShell.
77
#
@@ -41,6 +41,7 @@
4141
class RouteResolvingError(Exception):
4242
"""error raised on invalid conditions during routing operations"""
4343

44+
4445
class PropagationTreeRouter(object):
4546
"""performs routes resolving operations within a propagation tree.
4647
This object provides a next_hop method, that will look for the best
@@ -392,9 +393,13 @@ def ev_close(self, worker, timedout):
392393
self.logger.debug("ev_close gateway=%s %s", gateway, self)
393394
self.logger.debug("ev_close rc=%s", self._rc) # may be None
394395

395-
if self._rc: # got explicit error code
396-
# ev_routing?
397-
self.logger.debug("unreachable gateway %s", gateway)
398-
worker.task.router.mark_unreachable(gateway)
399-
self.logger.debug("worker.task.gateways=%s", worker.task.gateways)
400-
# TODO: find best gateway, update TreeWorker counters, relaunch...
396+
if self._rc:
397+
self.logger.debug("error on gateway %s (setup=%s)", gateway,
398+
self.setup)
399+
self.task.router.mark_unreachable(gateway)
400+
self.logger.debug("gateway %s now set as unreachable", gateway)
401+
402+
if not self.setup:
403+
# channel was not set up: we can safely repropagate commands
404+
for mw in set(self.task.gateways[gateway][1]):
405+
mw._relaunch(gateway)

lib/ClusterShell/Worker/Tree.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):
334334

335335
self._target_count += len(targets)
336336

337-
self.gwtargets[str(gateway)] = targets.copy()
337+
self.gwtargets.setdefault(str(gateway), NodeSet()).add(targets)
338338

339339
# tar commands are built here and launched on targets
340340
if reverse:
@@ -360,13 +360,32 @@ def _execute_remote(self, cmd, targets, gateway, timeout):
360360

361361
self._target_count += len(targets)
362362

363-
self.gwtargets[str(gateway)] = targets.copy()
363+
self.gwtargets.setdefault(str(gateway), NodeSet()).add(targets)
364364

365365
pchan = self.task._pchannel(gateway, self)
366366
pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout,
367367
stderr=self.stderr, gw_invoke_cmd=self.invoke_gateway,
368368
remote=self.remote)
369369

370+
def _relaunch(self, previous_gateway):
371+
"""Redistribute and relaunch commands on targets that were running
372+
on previous_gateway (which is probably marked unreacheable by now)
373+
374+
NOTE: Relaunch is always called after failed remote execution, so
375+
previous_gateway must be defined. However, it is not guaranted that
376+
the relaunch is going to be performed using gateways (that's a feature).
377+
"""
378+
targets = self.gwtargets[previous_gateway].copy()
379+
self.logger.debug("_relaunch on targets %s from previous_gateway %s",
380+
targets, previous_gateway)
381+
382+
for target in targets:
383+
self.gwtargets[previous_gateway].remove(target)
384+
385+
self._check_fini(previous_gateway)
386+
self._target_count -= len(targets)
387+
self._launch(targets)
388+
370389
def _engine_clients(self):
371390
"""
372391
Access underlying engine clients.

tests/TreeGatewayTest.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,6 @@ def test_basic_noop(self):
157157

158158
self.channel_send_stop()
159159
self.recvxml(EndMessage)
160-
self.assertEqual(self.chan.opened, False)
161-
self.assertEqual(self.chan.setup, False)
162160
# ending tag should abort gateway worker without delay
163161
self.gateway.wait()
164162
self.gateway.close()
@@ -179,8 +177,6 @@ def test_channel_err_dup(self):
179177

180178
# gateway should terminate channel session
181179
msg = self.recvxml(EndMessage)
182-
self.assertEqual(self.chan.opened, False)
183-
self.assertEqual(self.chan.setup, False)
184180
self.gateway.wait()
185181
self.gateway.close()
186182

@@ -217,10 +213,6 @@ def _check_channel_err(self, sendmsg, errback, openchan=True,
217213
else:
218214
self.recvxml()
219215

220-
# flags should be reset
221-
self.assertEqual(self.chan.opened, False)
222-
self.assertEqual(self.chan.setup, False)
223-
224216
# gateway task should exit properly
225217
self.gateway.wait()
226218
self.gateway.close()

tests/TreeTaskTest.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from textwrap import dedent
88
import unittest
99

10+
from ClusterShell.Propagation import RouteResolvingError
1011
from ClusterShell.Task import task_self
1112
from ClusterShell.Topology import TopologyError
1213

@@ -33,14 +34,11 @@ def test_shell_auto_tree_dummy(self):
3334
task = task_self()
3435
task.set_default("auto_tree", True)
3536
task.TOPOLOGY_CONFIGS = [topofile.name]
36-
task.run("/bin/hostname", nodes="dummy-node", stderr=True)
37-
# FIXME gateway errors are not yet being handled correctly
37+
38+
self.assertRaises(RouteResolvingError, task.run, "/bin/hostname",
39+
nodes="dummy-node", stderr=True)
40+
# FIXME: should probably be None
3841
self.assertEqual(task.max_retcode(), 255)
39-
# XXX correct results would be:
40-
#self.assertEqual(task.max_retcode(), None)
41-
#expected = "Name or service not known"
42-
#if not task.node_error("dummy-node").endswith(expected):
43-
# self.assertEqual(task.node_error("dummy-node"), expected)
4442

4543
def test_shell_auto_tree_noconf(self):
4644
"""test task shell auto tree [no topology.conf]"""

0 commit comments

Comments
 (0)