From 1e4bbfab4414a2f41b98dd2d0f992288aded4e19 Mon Sep 17 00:00:00 2001 From: Stephane Thiell Date: Sun, 3 Aug 2025 21:23:56 -0700 Subject: [PATCH 1/2] Tree: fix error handling when gateway channel is closing Fix PropagationChannel.ev_close() where gateway channel termination is handled. If we get an actual rc > 0, that comes from the gateway command itself and that means the gateway is defective/misconfigured, in that case, we mark it as unreachable at the Task level. In addition, in that case, if we have not launched the remote commands yet, they are redistributed to other available gateways. rc=None is now handled as a normal termination of the propagation channel and the corresponding gateway is not marked as unreachable anymore. Fixes #566. --- lib/ClusterShell/Propagation.py | 24 ++++++++++++++++-------- lib/ClusterShell/Worker/Tree.py | 4 ++-- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/lib/ClusterShell/Propagation.py b/lib/ClusterShell/Propagation.py index b1e358fd..6303195e 100644 --- a/lib/ClusterShell/Propagation.py +++ b/lib/ClusterShell/Propagation.py @@ -394,6 +394,7 @@ def recv_ctl(self, msg): def ev_hup(self, worker, node, rc): """Channel command is closing""" + self.logger.debug("ev_hup gateway=%s %s", str(worker.nodes), self) self._rc = rc def ev_close(self, worker, timedout): @@ -401,17 +402,24 @@ def ev_close(self, worker, timedout): # do not use worker buffer or rc accessors here as we doesn't use # common stream names gateway = str(worker.nodes) - self.logger.debug("ev_close gateway=%s %s", gateway, self) - self.logger.debug("ev_close rc=%s", self._rc) # may be None - - # NOTE: self._rc may be None if the communication channel has aborted - if self._rc != 0: - self.logger.debug("error on gateway %s (setup=%s)", gateway, - self.setup) + self.logger.debug("ev_close gateway=%s rc=%s %s", gateway, self._rc, + self) + + # NOTE: self._rc is set None when _we_ close the channel (abort) + if self._rc is None and not self.setup: + # aborting before the channel is setup is worth a warning + self.logger.warning("ev_close: rc=%s with channel not setup", + self._rc) + + if self._rc is not None and self._rc != 0: + # handle gateway channel error + self.logger.debug("error on gateway %s (rc=%s, setup=%s)", gateway, + self._rc, self.setup) self.task.router.mark_unreachable(gateway) self.logger.debug("gateway %s now set as unreachable", gateway) if not self.setup: - # channel was not set up: we can safely repropagate commands + # channel was not set up: we can safely redistribute commands + self.logger.debug("channel was not set up: redistributing...") for mw in set(self.task.gateways[gateway][1]): mw._relaunch(gateway) diff --git a/lib/ClusterShell/Worker/Tree.py b/lib/ClusterShell/Worker/Tree.py index bb326418..47d3a28f 100644 --- a/lib/ClusterShell/Worker/Tree.py +++ b/lib/ClusterShell/Worker/Tree.py @@ -430,8 +430,8 @@ def _on_remote_node_msgline(self, node, msg, sname, gateway): def _on_remote_node_close(self, node, rc, gateway): """remote node closing with return code""" DistantWorker._on_node_close(self, node, rc) - self.logger.debug("_on_remote_node_close %s %s via gw %s", node, - self._close_count, gateway) + self.logger.debug("_on_remote_node_close %s %s via gw %s rc=%s", node, + self._close_count, gateway, rc) # finalize rcopy: extract tar data if self.source and self.reverse: From 812b20cdd25b565f5f1c2642da2b3071dc8b3f6a Mon Sep 17 00:00:00 2001 From: Stephane Thiell Date: Wed, 6 Aug 2025 23:46:44 -0700 Subject: [PATCH 2/2] Tree: implement per-gateway abort This commit adds the functionality to abort a specific gateway channel from the initiator. Until now, this was not properly handled. This also fixes gateway failover. Changes: * Implement TreeWorker._gateway_abort() that can be used to abort/cancel all tasks being done by the TreeWorker via the specified gateway. In case of such abort (likely due to some gateway failure), a special return code 76 (os.EX_PROTOCOL) is used for closing all running remote commands via this gateway. This return code is sometimes used to specify a "Remote protocol error" / "An error occurred in a remote communication protocol" which seems appropriate here. * Implement a new Task._pchannel_closing() method that is called on PropagationChannel.ev_close(), so deterministically every time a gateway channel is closing (self-initiated or not). This method performs necessary cleanup actions, but most notably calls TreeWorker._gateway_abort(gateway) on each worker currently using the gateway channel. * Update Task._pchannel_release() so that it now calls PropagationChannel._close() instead of Worker.abort() to properly reset the channel's opened/setup flags. * Updated TreeWorkerTest with tests to better cover the above and gateway failover. Part of #229 and extended work on #566. --- lib/ClusterShell/Communication.py | 5 +- lib/ClusterShell/Propagation.py | 6 +- lib/ClusterShell/Task.py | 32 +- lib/ClusterShell/Worker/Tree.py | 12 + tests/TreeWorkerTest.py | 495 ++++++++++++++++++++---------- 5 files changed, 378 insertions(+), 172 deletions(-) diff --git a/lib/ClusterShell/Communication.py b/lib/ClusterShell/Communication.py index 23b20756..5cf50707 100644 --- a/lib/ClusterShell/Communication.py +++ b/lib/ClusterShell/Communication.py @@ -203,10 +203,9 @@ def _open(self): xmlgen = XMLGenerator(self.worker, encoding=ENCODING) xmlgen.startElement('channel', {'version': __version__}) - def _close(self): + def _close(self, abort=False): """close an already opened channel""" - send_endtag = self.opened - if send_endtag: + if self.opened and not abort: XMLGenerator(self.worker, encoding=ENCODING).endElement('channel') self.worker.abort() self.opened = self.setup = False diff --git a/lib/ClusterShell/Propagation.py b/lib/ClusterShell/Propagation.py index 6303195e..3ad22cf4 100644 --- a/lib/ClusterShell/Propagation.py +++ b/lib/ClusterShell/Propagation.py @@ -253,8 +253,7 @@ def recv(self, msg): if msg.type == EndMessage.ident: #??#self.ptree.notify_close() self.logger.debug("got EndMessage; closing") - # abort worker (now working) - self.worker.abort() + self._close() elif msg.type == StdErrMessage.ident and msg.srcid == 0: # Handle error messages when channel is not established yet # or if messages are non-routed (eg. gateway-related) @@ -423,3 +422,6 @@ def ev_close(self, worker, timedout): self.logger.debug("channel was not set up: redistributing...") for mw in set(self.task.gateways[gateway][1]): mw._relaunch(gateway) + + # update Task that we are closing + worker.task._pchannel_close(gateway, worker) diff --git a/lib/ClusterShell/Task.py b/lib/ClusterShell/Task.py index f220949a..3a596953 100644 --- a/lib/ClusterShell/Task.py +++ b/lib/ClusterShell/Task.py @@ -1365,8 +1365,10 @@ def _pchannel(self, gateway, metaworker): def _pchannel_release(self, gateway, metaworker): """Release propagation channel associated to gateway. - Lookup by gateway, decref associated metaworker set and release - channel worker if needed. + Lookup by gateway, decref associated metaworker set and abort channel + worker if not used anymore. + + Called by TreeWorker._check_fini() """ logger = logging.getLogger(__name__) logger.debug("pchannel_release %s %s", gateway, metaworker) @@ -1381,11 +1383,29 @@ def _pchannel_release(self, gateway, metaworker): chanworker, metaworkers = self.gateways[gwstr] metaworkers.remove(metaworker) if len(metaworkers) == 0: - logger.debug("pchannel_release: destroying channel %s", + logger.debug("pchannel_release: closing channel %s", chanworker.eh) - chanworker.abort() - # delete gateway reference - del self.gateways[gwstr] + # Call PropagationChannel._close() that will close the channel + # properly, update the opened/setup flags and abort the worker. + # We might be in an event handler and we want to make sure we + # ignore any pending messages from this gateway from now on. + chanworker.eh._close(abort=True) + + def _pchannel_close(self, gateway, chanworker): + """A propagation channel is closing. + + Perform necessary cleanup actions when a gateway channel is closing. + + Called by PropagationChannel.ev_close(). + """ + logger = logging.getLogger(__name__) + logger.debug("pchannel_closing: %s", gateway) + chwrk, metaworkers = self.gateways[gateway] + assert chwrk is chanworker, (chwrk, chanworker) + metaworkers_copy = list(metaworkers) + for mw in metaworkers_copy: + mw._gateway_abort(gateway) + del self.gateways[gateway] def task_self(defaults=None): diff --git a/lib/ClusterShell/Worker/Tree.py b/lib/ClusterShell/Worker/Tree.py index 47d3a28f..9660a09e 100644 --- a/lib/ClusterShell/Worker/Tree.py +++ b/lib/ClusterShell/Worker/Tree.py @@ -564,6 +564,18 @@ def set_write_eof(self): self._set_write_eof_remote() + def _gateway_abort(self, gateway): + """Abort on gateway failure""" + if gateway not in self.gwtargets: + self.logger.warning("TreeWorker._gateway_abort %s not found", + gateway) + return + targets = self.gwtargets[gateway] + self.logger.debug("TreeWorker._gateway_abort %s found: targets=%s", + gateway, targets) + for target in NodeSet.fromlist(targets): # targets is a mutable list + self._on_remote_node_close(target, os.EX_PROTOCOL, gateway) + def abort(self): """Abort processing any action by this worker.""" # Not yet supported by TreeWorker diff --git a/tests/TreeWorkerTest.py b/tests/TreeWorkerTest.py index 236192ab..3c07d806 100644 --- a/tests/TreeWorkerTest.py +++ b/tests/TreeWorkerTest.py @@ -2,7 +2,7 @@ Unit test for ClusterShell.Worker.TreeWorker This unit test requires working ssh connections to the following -local addresses: $HOSTNAME, localhost, 127.0.0.[2-4] +local addresses: $HOSTNAME, localhost, 127.0.0.[2-7] You can use the following options in ~/.ssh/config: @@ -11,6 +11,7 @@ LogLevel ERROR """ +import logging import os from os.path import basename, join import unittest @@ -27,6 +28,8 @@ NODE_HEAD = HOSTNAME NODE_GATEWAY = 'localhost' +NODE_GATEWAY2 = '127.0.0.[6-7]' # two ok +NODE_GATEWAY2F1 = '127.0.0.6,192.0.2.0' # one ok, one failed NODE_DISTANT = '127.0.0.2' NODE_DISTANT2 = '127.0.0.[2-3]' NODE_DIRECT = '127.0.0.4' @@ -101,8 +104,184 @@ def ev_close(self, worker, timedout): self.ev_timedout_cnt += 1 +class TreeWorkerTestBase(unittest.TestCase): + """ + TreeWorkerTestBase: test TreeWorker (base class) + + Override setUp() to set up the tree topology. + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + task_terminate() # ideally shouldn't be needed... + self.task = task_self() + # set task topology (to override) + + def tearDown(self): + """clean up test environment""" + task_terminate() + self.task = None + + def _tree_run_write(self, target, separate_thread=False): + """helper to write to stdin""" + if separate_thread: + task = Task() + else: + task = self.task + teh = TEventHandler() + worker = task.shell('cat', nodes=target, handler=teh) + worker.write(b'Lorem Ipsum') + worker.set_write_eof() + task.run() + if separate_thread: + task_wait() + task_cleanup() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, target_cnt) + self.assertEqual(teh.ev_written_cnt, target_cnt) + self.assertEqual(teh.ev_written_sz, target_cnt * len('Lorem Ipsum')) + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def _tree_copy_file(self, target): + """helper to copy file""" + teh = TEventHandler() + srcf = make_temp_file(b'Lorem Ipsum', 'test_tree_copy_file_src') + dest = make_temp_filename('test_tree_copy_file_dest') + try: + worker = self.task.copy(srcf.name, dest, nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + with open(dest, 'r') as destf: + self.assertEqual(destf.read(), 'Lorem Ipsum') + finally: + os.remove(dest) + + def _tree_copy_dir(self, target): + """helper to copy directory""" + teh = TEventHandler() + + srcdir = make_temp_dir() + destdir = make_temp_dir() + file1 = make_temp_file(b'Lorem Ipsum Unum', suffix=".txt", + dir=srcdir.name) + file2 = make_temp_file(b'Lorem Ipsum Duo', suffix=".txt", + dir=srcdir.name) + + try: + # add '/' to dest so that distant does like the others + worker = self.task.copy(srcdir.name, destdir.name + '/', + nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # copy successful? + copy_dest = join(destdir.name, srcdir.name) + with open(join(copy_dest, basename(file1.name)), 'rb') as rfile1: + self.assertEqual(rfile1.read(), b'Lorem Ipsum Unum') + with open(join(copy_dest, basename(file2.name)), 'rb') as rfile2: + self.assertEqual(rfile2.read(), b'Lorem Ipsum Duo') + finally: + file1.close() + file2.close() + srcdir.cleanup() + destdir.cleanup() + + def _tree_rcopy_file(self, target): + """helper to rcopy file""" + teh = TEventHandler() + + # The file needs to be large enough to test GH#545 + b1 = b'Lorem Ipsum' * 1100000 + + srcdir = make_temp_dir() + destdir = make_temp_dir() + srcfile = make_temp_file(b1, suffix=".txt", dir=srcdir.name) + + try: + worker = self.task.rcopy(srcfile.name, destdir.name, nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # rcopy successful? + for tgt in NodeSet(target): + rcopy_dest = join(destdir.name, basename(srcfile.name) + '.' + tgt) + with open(rcopy_dest, 'rb') as tfile: + self.assertEqual(tfile.read(), b1) + finally: + srcfile.close() + srcdir.cleanup() + destdir.cleanup() + + def _tree_rcopy_dir(self, target): + """helper to rcopy directory""" + teh = TEventHandler() + + b1 = b'Lorem Ipsum Unum' * 100 + b2 = b'Lorem Ipsum Duo' * 100 + + srcdir = make_temp_dir() + destdir = make_temp_dir() + file1 = make_temp_file(b1, suffix=".txt", dir=srcdir.name) + file2 = make_temp_file(b2, suffix=".txt", dir=srcdir.name) + + try: + worker = self.task.rcopy(srcdir.name, destdir.name, nodes=target, + handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # rcopy successful? + for tgt in NodeSet(target): + rcopy_dest = join(destdir.name, basename(srcdir.name) + '.' + tgt) + with open(join(rcopy_dest, basename(file1.name)), 'rb') as rfile1: + self.assertEqual(rfile1.read(), b1) + with open(join(rcopy_dest, basename(file2.name)), 'rb') as rfile2: + self.assertEqual(rfile2.read(), b2) + finally: + file1.close() + file2.close() + srcdir.cleanup() + destdir.cleanup() + + @unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") -class TreeWorkerTest(unittest.TestCase): +class TreeWorkerTest(TreeWorkerTestBase): """ TreeWorkerTest: test TreeWorker @@ -116,8 +295,7 @@ class TreeWorkerTest(unittest.TestCase): def setUp(self): """setup test environment topology""" - task_terminate() # ideally shouldn't be needed... - self.task = task_self() + TreeWorkerTestBase.setUp(self) # set task topology graph = TopologyGraph() graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY)) @@ -126,13 +304,8 @@ def setUp(self): # NODE_FOREIGN is not included self.task.topology = graph.to_tree(HOSTNAME) - def tearDown(self): - """clean up test environment""" - task_terminate() - self.task = None - def test_tree_run_event_legacy(self): - """test simple tree run with legacy EventHandler""" + """test tree run with legacy EventHandler""" teh = TEventHandlerLegacy() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -148,7 +321,7 @@ def test_tree_run_event_legacy(self): self.assertEqual(teh.last_read, b'Lorem Ipsum') def test_tree_run_event_legacy_timeout(self): - """test simple tree run with legacy EventHandler with timeout""" + """test tree run with legacy EventHandler with timeout""" teh = TEventHandlerLegacy() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -163,7 +336,7 @@ def test_tree_run_event_legacy_timeout(self): self.assertEqual(teh.ev_close_cnt, 1) def test_tree_run_event(self): - """test simple tree run with EventHandler (1.8+)""" + """test tree run with EventHandler (1.8+)""" teh = TEventHandler() self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh) self.assertEqual(teh.ev_start_cnt, 1) @@ -176,7 +349,7 @@ def test_tree_run_event(self): self.assertEqual(teh.last_read, b'Lorem Ipsum') def test_tree_run_event_timeout(self): - """test simple tree run with EventHandler (1.8+) with timeout""" + """test tree run with EventHandler (1.8+) with timeout""" teh = TEventHandler() self.task.run('sleep 10', nodes=NODE_DISTANT, handler=teh, timeout=0.5) self.assertEqual(teh.ev_start_cnt, 1) @@ -242,30 +415,6 @@ def test_tree_run_foreign(self): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, b'Lorem Ipsum') - def _tree_run_write(self, target, separate_thread=False): - if separate_thread: - task = Task() - else: - task = self.task - teh = TEventHandler() - worker = task.shell('cat', nodes=target, handler=teh) - worker.write(b'Lorem Ipsum') - worker.set_write_eof() - task.run() - if separate_thread: - task_wait() - task_cleanup() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, target_cnt) - self.assertEqual(teh.ev_written_cnt, target_cnt) - self.assertEqual(teh.ev_written_sz, target_cnt * len('Lorem Ipsum')) - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - self.assertEqual(teh.last_read, b'Lorem Ipsum') - def test_tree_run_write_distant(self): """test tree run with write(), distant target""" self._tree_run_write(NODE_DISTANT) @@ -306,25 +455,7 @@ def test_tree_run_write_gateway_mt(self): """test tree run with write(), gateway is target, not in topology, separate thread""" self._tree_run_write(NODE_GATEWAY, separate_thread=True) - def _tree_copy_file(self, target): - teh = TEventHandler() - srcf = make_temp_file(b'Lorem Ipsum', 'test_tree_copy_file_src') - dest = make_temp_filename('test_tree_copy_file_dest') - try: - worker = self.task.copy(srcf.name, dest, nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - with open(dest, 'r') as destf: - self.assertEqual(destf.read(), 'Lorem Ipsum') - finally: - os.remove(dest) + ### copy ### def test_tree_copy_file_distant(self): """test tree copy: file, distant target""" @@ -346,42 +477,6 @@ def test_tree_copy_file_gateway(self): """test tree copy: file, gateway is target""" self._tree_copy_file(NODE_GATEWAY) - def _tree_copy_dir(self, target): - teh = TEventHandler() - - srcdir = make_temp_dir() - destdir = make_temp_dir() - file1 = make_temp_file(b'Lorem Ipsum Unum', suffix=".txt", - dir=srcdir.name) - file2 = make_temp_file(b'Lorem Ipsum Duo', suffix=".txt", - dir=srcdir.name) - - try: - # add '/' to dest so that distant does like the others - worker = self.task.copy(srcdir.name, destdir.name + '/', - nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # copy successful? - copy_dest = join(destdir.name, srcdir.name) - with open(join(copy_dest, basename(file1.name)), 'rb') as rfile1: - self.assertEqual(rfile1.read(), b'Lorem Ipsum Unum') - with open(join(copy_dest, basename(file2.name)), 'rb') as rfile2: - self.assertEqual(rfile2.read(), b'Lorem Ipsum Duo') - finally: - file1.close() - file2.close() - srcdir.cleanup() - destdir.cleanup() - def test_tree_copy_dir_distant(self): """test tree copy: directory, distant target""" self._tree_copy_dir(NODE_DISTANT) @@ -390,7 +485,6 @@ def test_tree_copy_dir_distant2(self): """test tree copy: directory, distant 2 targets""" self._tree_copy_dir(NODE_DISTANT2) - def test_tree_copy_dir_direct(self): """test tree copy: directory, direct target, in topology""" self._tree_copy_dir(NODE_DIRECT) @@ -403,42 +497,7 @@ def test_tree_copy_dir_gateway(self): """test tree copy: directory, gateway is target""" self._tree_copy_dir(NODE_GATEWAY) - def _tree_rcopy_dir(self, target): - teh = TEventHandler() - - b1 = b'Lorem Ipsum Unum' * 100 - b2 = b'Lorem Ipsum Duo' * 100 - - srcdir = make_temp_dir() - destdir = make_temp_dir() - file1 = make_temp_file(b1, suffix=".txt", dir=srcdir.name) - file2 = make_temp_file(b2, suffix=".txt", dir=srcdir.name) - - try: - worker = self.task.rcopy(srcdir.name, destdir.name, nodes=target, - handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # rcopy successful? - for tgt in NodeSet(target): - rcopy_dest = join(destdir.name, basename(srcdir.name) + '.' + tgt) - with open(join(rcopy_dest, basename(file1.name)), 'rb') as rfile1: - self.assertEqual(rfile1.read(), b1) - with open(join(rcopy_dest, basename(file2.name)), 'rb') as rfile2: - self.assertEqual(rfile2.read(), b2) - finally: - file1.close() - file2.close() - srcdir.cleanup() - destdir.cleanup() + ### rcopy ### def test_tree_rcopy_dir_distant(self): """test tree rcopy: directory, distant target""" @@ -462,38 +521,6 @@ def test_tree_rcopy_dir_gateway(self): """test tree rcopy: directory, gateway is target""" self._tree_rcopy_dir(NODE_GATEWAY) - def _tree_rcopy_file(self, target): - teh = TEventHandler() - - # The file needs to be large enough to test GH#545 - b1 = b'Lorem Ipsum' * 1100000 - - srcdir = make_temp_dir() - destdir = make_temp_dir() - srcfile = make_temp_file(b1, suffix=".txt", dir=srcdir.name) - - try: - worker = self.task.rcopy(srcfile.name, destdir.name, nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # rcopy successful? - for tgt in NodeSet(target): - rcopy_dest = join(destdir.name, basename(srcfile.name) + '.' + tgt) - with open(rcopy_dest, 'rb') as tfile: - self.assertEqual(tfile.read(), b1) - finally: - srcfile.close() - srcdir.cleanup() - destdir.cleanup() - def test_tree_rcopy_file_distant(self): """test tree rcopy: file, distant target""" self._tree_rcopy_file(NODE_DISTANT) @@ -519,3 +546,149 @@ def test_tree_worker_missing_arguments(self): def test_tree_worker_name_compat(self): """test TreeWorker former name (WorkerTree)""" self.assertEqual(TreeWorker, WorkerTree) + + +@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") +class TreeWorkerGW2Test(TreeWorkerTestBase): + """ + TreeWorkerTest: test TreeWorker with two functional gateways + + NODE_HEAD -> NODE_GATEWAY2 -> NODE_DISTANT2 + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + TreeWorkerTestBase.setUp(self) + # set task topology + graph = TopologyGraph() + graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY2)) + graph.add_route(NodeSet(NODE_GATEWAY2), NodeSet(NODE_DISTANT2)) + self.task.topology = graph.to_tree(HOSTNAME) + + def test_tree_run_gw2_event(self): + """test tree run with EventHandler and 2 gateways""" + teh = TEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 2) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def test_tree_run_gw2_event_timeout(self): + """test tree run with EventHandler, 2 gateways with timeout""" + teh = TEventHandler() + self.task.run('sleep 10', nodes=NODE_DISTANT2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 0) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 1) # command timed out + self.assertEqual(teh.ev_close_cnt, 1) + + def test_tree_run_gw2_event_timeout_2w(self): + """test tree run with EventHandler, 2 gateways with timeout, 2 workers""" + teh = TEventHandler() + n1, n2 = NodeSet(NODE_DISTANT2).split(2) + self.task.shell('sleep 10', nodes=n1, handler=teh, timeout=0.5) + self.task.run('sleep 10', nodes=n2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 0) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 2) # command timed out + self.assertEqual(teh.ev_close_cnt, 2) + + def test_tree_run_gw2_write_distant(self): + """test tree run with write(), 2 gateways, distant target""" + self._tree_run_write(NODE_DISTANT) + + def test_tree_run_gw2_write_distant2(self): + """test tree run with write(), 2 gateways, distant 2 targets""" + self._tree_run_write(NODE_DISTANT2) + + def test_tree_run_gw2_write_distant2_mt(self): + """test tree run with write(), 2 gateways, distant 2 targets, separate thread""" + self._tree_run_write(NODE_DISTANT2, separate_thread=True) + + +@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") +class TreeWorkerGW2F1FTest(TreeWorkerTestBase): + """ + TreeWorkerTest: test TreeWorker with two gateways, one being failed + + NODE_HEAD -> NODE_GATEWAY2F1 -> NODE_DISTANT2 + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + TreeWorkerTestBase.setUp(self) + # set task topology + graph = TopologyGraph() + graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY2F1)) + graph.add_route(NodeSet(NODE_GATEWAY2F1), NodeSet(NODE_DISTANT2)) + self.task.topology = graph.to_tree(HOSTNAME) + + def test_tree_run_gw2f1_event(self): + """test tree run with EventHandler and 1/2 gateways""" + teh = TEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 3) # 2 + gw error + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def test_tree_run_gw2f1_event_timeout(self): + """test tree run with EventHandler, 1/2 gateways with timeout""" + teh = TEventHandler() + self.task.run('sleep 10', nodes=NODE_DISTANT2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 1) # 1 gateway failure to read + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 1) # command timed out + self.assertEqual(teh.ev_close_cnt, 1) + + def test_tree_run_gw2f1_event_timeout_2w(self): + """test tree run with EventHandler, 1/2 gateways with timeout, 2 workers""" + teh = TEventHandler() + n1, n2 = NodeSet(NODE_DISTANT2).split(2) + self.task.shell('sleep 10', nodes=n1, handler=teh, timeout=0.5) + self.task.run('sleep 10', nodes=n2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 1) # 1 gateway failure to read + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 2) # command timed out + self.assertEqual(teh.ev_close_cnt, 2) + + def test_tree_run_gw2f1_write_distant(self): + """test tree run with write(), 1/2 gateways, distant target""" + self._tree_run_write(NODE_DISTANT) + + # FIXME, issue with stdin write in gw2f1 mode + #def test_tree_run_gw2f1_write_distant2(self): + # """test tree run with write(), 1/2 gateways, distant 2 targets""" + # logging.basicConfig(level=logging.DEBUG) + # self._tree_run_write(NODE_DISTANT2) + + def test_tree_run_gw2f1_write_distant2_mt(self): + """test tree run with write(), 1/2 gateways, distant 2 targets, separate thread""" + self._tree_run_write(NODE_DISTANT2, separate_thread=True)