diff --git a/lib/ClusterShell/Communication.py b/lib/ClusterShell/Communication.py index 23b20756..5cf50707 100644 --- a/lib/ClusterShell/Communication.py +++ b/lib/ClusterShell/Communication.py @@ -203,10 +203,9 @@ def _open(self): xmlgen = XMLGenerator(self.worker, encoding=ENCODING) xmlgen.startElement('channel', {'version': __version__}) - def _close(self): + def _close(self, abort=False): """close an already opened channel""" - send_endtag = self.opened - if send_endtag: + if self.opened and not abort: XMLGenerator(self.worker, encoding=ENCODING).endElement('channel') self.worker.abort() self.opened = self.setup = False diff --git a/lib/ClusterShell/Propagation.py b/lib/ClusterShell/Propagation.py index b1e358fd..3ad22cf4 100644 --- a/lib/ClusterShell/Propagation.py +++ b/lib/ClusterShell/Propagation.py @@ -253,8 +253,7 @@ def recv(self, msg): if msg.type == EndMessage.ident: #??#self.ptree.notify_close() self.logger.debug("got EndMessage; closing") - # abort worker (now working) - self.worker.abort() + self._close() elif msg.type == StdErrMessage.ident and msg.srcid == 0: # Handle error messages when channel is not established yet # or if messages are non-routed (eg. gateway-related) @@ -394,6 +393,7 @@ def recv_ctl(self, msg): def ev_hup(self, worker, node, rc): """Channel command is closing""" + self.logger.debug("ev_hup gateway=%s %s", str(worker.nodes), self) self._rc = rc def ev_close(self, worker, timedout): @@ -401,17 +401,27 @@ def ev_close(self, worker, timedout): # do not use worker buffer or rc accessors here as we doesn't use # common stream names gateway = str(worker.nodes) - self.logger.debug("ev_close gateway=%s %s", gateway, self) - self.logger.debug("ev_close rc=%s", self._rc) # may be None - - # NOTE: self._rc may be None if the communication channel has aborted - if self._rc != 0: - self.logger.debug("error on gateway %s (setup=%s)", gateway, - self.setup) + self.logger.debug("ev_close gateway=%s rc=%s %s", gateway, self._rc, + self) + + # NOTE: self._rc is set None when _we_ close the channel (abort) + if self._rc is None and not self.setup: + # aborting before the channel is setup is worth a warning + self.logger.warning("ev_close: rc=%s with channel not setup", + self._rc) + + if self._rc is not None and self._rc != 0: + # handle gateway channel error + self.logger.debug("error on gateway %s (rc=%s, setup=%s)", gateway, + self._rc, self.setup) self.task.router.mark_unreachable(gateway) self.logger.debug("gateway %s now set as unreachable", gateway) if not self.setup: - # channel was not set up: we can safely repropagate commands + # channel was not set up: we can safely redistribute commands + self.logger.debug("channel was not set up: redistributing...") for mw in set(self.task.gateways[gateway][1]): mw._relaunch(gateway) + + # update Task that we are closing + worker.task._pchannel_close(gateway, worker) diff --git a/lib/ClusterShell/Task.py b/lib/ClusterShell/Task.py index f220949a..3a596953 100644 --- a/lib/ClusterShell/Task.py +++ b/lib/ClusterShell/Task.py @@ -1365,8 +1365,10 @@ def _pchannel(self, gateway, metaworker): def _pchannel_release(self, gateway, metaworker): """Release propagation channel associated to gateway. - Lookup by gateway, decref associated metaworker set and release - channel worker if needed. + Lookup by gateway, decref associated metaworker set and abort channel + worker if not used anymore. + + Called by TreeWorker._check_fini() """ logger = logging.getLogger(__name__) logger.debug("pchannel_release %s %s", gateway, metaworker) @@ -1381,11 +1383,29 @@ def _pchannel_release(self, gateway, metaworker): chanworker, metaworkers = self.gateways[gwstr] metaworkers.remove(metaworker) if len(metaworkers) == 0: - logger.debug("pchannel_release: destroying channel %s", + logger.debug("pchannel_release: closing channel %s", chanworker.eh) - chanworker.abort() - # delete gateway reference - del self.gateways[gwstr] + # Call PropagationChannel._close() that will close the channel + # properly, update the opened/setup flags and abort the worker. + # We might be in an event handler and we want to make sure we + # ignore any pending messages from this gateway from now on. + chanworker.eh._close(abort=True) + + def _pchannel_close(self, gateway, chanworker): + """A propagation channel is closing. + + Perform necessary cleanup actions when a gateway channel is closing. + + Called by PropagationChannel.ev_close(). + """ + logger = logging.getLogger(__name__) + logger.debug("pchannel_closing: %s", gateway) + chwrk, metaworkers = self.gateways[gateway] + assert chwrk is chanworker, (chwrk, chanworker) + metaworkers_copy = list(metaworkers) + for mw in metaworkers_copy: + mw._gateway_abort(gateway) + del self.gateways[gateway] def task_self(defaults=None): diff --git a/lib/ClusterShell/Worker/Tree.py b/lib/ClusterShell/Worker/Tree.py index bb326418..9660a09e 100644 --- a/lib/ClusterShell/Worker/Tree.py +++ b/lib/ClusterShell/Worker/Tree.py @@ -430,8 +430,8 @@ def _on_remote_node_msgline(self, node, msg, sname, gateway): def _on_remote_node_close(self, node, rc, gateway): """remote node closing with return code""" DistantWorker._on_node_close(self, node, rc) - self.logger.debug("_on_remote_node_close %s %s via gw %s", node, - self._close_count, gateway) + self.logger.debug("_on_remote_node_close %s %s via gw %s rc=%s", node, + self._close_count, gateway, rc) # finalize rcopy: extract tar data if self.source and self.reverse: @@ -564,6 +564,18 @@ def set_write_eof(self): self._set_write_eof_remote() + def _gateway_abort(self, gateway): + """Abort on gateway failure""" + if gateway not in self.gwtargets: + self.logger.warning("TreeWorker._gateway_abort %s not found", + gateway) + return + targets = self.gwtargets[gateway] + self.logger.debug("TreeWorker._gateway_abort %s found: targets=%s", + gateway, targets) + for target in NodeSet.fromlist(targets): # targets is a mutable list + self._on_remote_node_close(target, os.EX_PROTOCOL, gateway) + def abort(self): """Abort processing any action by this worker.""" # Not yet supported by TreeWorker diff --git a/tests/TreeWorkerTest.py b/tests/TreeWorkerTest.py index 236192ab..3c07d806 100644 --- a/tests/TreeWorkerTest.py +++ b/tests/TreeWorkerTest.py @@ -2,7 +2,7 @@ Unit test for ClusterShell.Worker.TreeWorker This unit test requires working ssh connections to the following -local addresses: $HOSTNAME, localhost, 127.0.0.[2-4] +local addresses: $HOSTNAME, localhost, 127.0.0.[2-7] You can use the following options in ~/.ssh/config: @@ -11,6 +11,7 @@ LogLevel ERROR """ +import logging import os from os.path import basename, join import unittest @@ -27,6 +28,8 @@ NODE_HEAD = HOSTNAME NODE_GATEWAY = 'localhost' +NODE_GATEWAY2 = '127.0.0.[6-7]' # two ok +NODE_GATEWAY2F1 = '127.0.0.6,192.0.2.0' # one ok, one failed NODE_DISTANT = '127.0.0.2' NODE_DISTANT2 = '127.0.0.[2-3]' NODE_DIRECT = '127.0.0.4' @@ -101,8 +104,184 @@ def ev_close(self, worker, timedout): self.ev_timedout_cnt += 1 +class TreeWorkerTestBase(unittest.TestCase): + """ + TreeWorkerTestBase: test TreeWorker (base class) + + Override setUp() to set up the tree topology. + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + task_terminate() # ideally shouldn't be needed... + self.task = task_self() + # set task topology (to override) + + def tearDown(self): + """clean up test environment""" + task_terminate() + self.task = None + + def _tree_run_write(self, target, separate_thread=False): + """helper to write to stdin""" + if separate_thread: + task = Task() + else: + task = self.task + teh = TEventHandler() + worker = task.shell('cat', nodes=target, handler=teh) + worker.write(b'Lorem Ipsum') + worker.set_write_eof() + task.run() + if separate_thread: + task_wait() + task_cleanup() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, target_cnt) + self.assertEqual(teh.ev_written_cnt, target_cnt) + self.assertEqual(teh.ev_written_sz, target_cnt * len('Lorem Ipsum')) + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def _tree_copy_file(self, target): + """helper to copy file""" + teh = TEventHandler() + srcf = make_temp_file(b'Lorem Ipsum', 'test_tree_copy_file_src') + dest = make_temp_filename('test_tree_copy_file_dest') + try: + worker = self.task.copy(srcf.name, dest, nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + with open(dest, 'r') as destf: + self.assertEqual(destf.read(), 'Lorem Ipsum') + finally: + os.remove(dest) + + def _tree_copy_dir(self, target): + """helper to copy directory""" + teh = TEventHandler() + + srcdir = make_temp_dir() + destdir = make_temp_dir() + file1 = make_temp_file(b'Lorem Ipsum Unum', suffix=".txt", + dir=srcdir.name) + file2 = make_temp_file(b'Lorem Ipsum Duo', suffix=".txt", + dir=srcdir.name) + + try: + # add '/' to dest so that distant does like the others + worker = self.task.copy(srcdir.name, destdir.name + '/', + nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # copy successful? + copy_dest = join(destdir.name, srcdir.name) + with open(join(copy_dest, basename(file1.name)), 'rb') as rfile1: + self.assertEqual(rfile1.read(), b'Lorem Ipsum Unum') + with open(join(copy_dest, basename(file2.name)), 'rb') as rfile2: + self.assertEqual(rfile2.read(), b'Lorem Ipsum Duo') + finally: + file1.close() + file2.close() + srcdir.cleanup() + destdir.cleanup() + + def _tree_rcopy_file(self, target): + """helper to rcopy file""" + teh = TEventHandler() + + # The file needs to be large enough to test GH#545 + b1 = b'Lorem Ipsum' * 1100000 + + srcdir = make_temp_dir() + destdir = make_temp_dir() + srcfile = make_temp_file(b1, suffix=".txt", dir=srcdir.name) + + try: + worker = self.task.rcopy(srcfile.name, destdir.name, nodes=target, handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # rcopy successful? + for tgt in NodeSet(target): + rcopy_dest = join(destdir.name, basename(srcfile.name) + '.' + tgt) + with open(rcopy_dest, 'rb') as tfile: + self.assertEqual(tfile.read(), b1) + finally: + srcfile.close() + srcdir.cleanup() + destdir.cleanup() + + def _tree_rcopy_dir(self, target): + """helper to rcopy directory""" + teh = TEventHandler() + + b1 = b'Lorem Ipsum Unum' * 100 + b2 = b'Lorem Ipsum Duo' * 100 + + srcdir = make_temp_dir() + destdir = make_temp_dir() + file1 = make_temp_file(b1, suffix=".txt", dir=srcdir.name) + file2 = make_temp_file(b2, suffix=".txt", dir=srcdir.name) + + try: + worker = self.task.rcopy(srcdir.name, destdir.name, nodes=target, + handler=teh) + self.task.run() + target_cnt = len(NodeSet(target)) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, target_cnt) + self.assertEqual(teh.ev_read_cnt, 0) + #self.assertEqual(teh.ev_written_cnt, 0) # FIXME + self.assertEqual(teh.ev_hup_cnt, target_cnt) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + + # rcopy successful? + for tgt in NodeSet(target): + rcopy_dest = join(destdir.name, basename(srcdir.name) + '.' + tgt) + with open(join(rcopy_dest, basename(file1.name)), 'rb') as rfile1: + self.assertEqual(rfile1.read(), b1) + with open(join(rcopy_dest, basename(file2.name)), 'rb') as rfile2: + self.assertEqual(rfile2.read(), b2) + finally: + file1.close() + file2.close() + srcdir.cleanup() + destdir.cleanup() + + @unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") -class TreeWorkerTest(unittest.TestCase): +class TreeWorkerTest(TreeWorkerTestBase): """ TreeWorkerTest: test TreeWorker @@ -116,8 +295,7 @@ class TreeWorkerTest(unittest.TestCase): def setUp(self): """setup test environment topology""" - task_terminate() # ideally shouldn't be needed... - self.task = task_self() + TreeWorkerTestBase.setUp(self) # set task topology graph = TopologyGraph() graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY)) @@ -126,13 +304,8 @@ def setUp(self): # NODE_FOREIGN is not included self.task.topology = graph.to_tree(HOSTNAME) - def tearDown(self): - """clean up test environment""" - task_terminate() - self.task = None - def test_tree_run_event_legacy(self): - """test simple tree run with legacy EventHandler""" + """test tree run with legacy EventHandler""" teh = TEventHandlerLegacy() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -148,7 +321,7 @@ def test_tree_run_event_legacy(self): self.assertEqual(teh.last_read, b'Lorem Ipsum') def test_tree_run_event_legacy_timeout(self): - """test simple tree run with legacy EventHandler with timeout""" + """test tree run with legacy EventHandler with timeout""" teh = TEventHandlerLegacy() with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") @@ -163,7 +336,7 @@ def test_tree_run_event_legacy_timeout(self): self.assertEqual(teh.ev_close_cnt, 1) def test_tree_run_event(self): - """test simple tree run with EventHandler (1.8+)""" + """test tree run with EventHandler (1.8+)""" teh = TEventHandler() self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT, handler=teh) self.assertEqual(teh.ev_start_cnt, 1) @@ -176,7 +349,7 @@ def test_tree_run_event(self): self.assertEqual(teh.last_read, b'Lorem Ipsum') def test_tree_run_event_timeout(self): - """test simple tree run with EventHandler (1.8+) with timeout""" + """test tree run with EventHandler (1.8+) with timeout""" teh = TEventHandler() self.task.run('sleep 10', nodes=NODE_DISTANT, handler=teh, timeout=0.5) self.assertEqual(teh.ev_start_cnt, 1) @@ -242,30 +415,6 @@ def test_tree_run_foreign(self): self.assertEqual(teh.ev_close_cnt, 1) self.assertEqual(teh.last_read, b'Lorem Ipsum') - def _tree_run_write(self, target, separate_thread=False): - if separate_thread: - task = Task() - else: - task = self.task - teh = TEventHandler() - worker = task.shell('cat', nodes=target, handler=teh) - worker.write(b'Lorem Ipsum') - worker.set_write_eof() - task.run() - if separate_thread: - task_wait() - task_cleanup() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, target_cnt) - self.assertEqual(teh.ev_written_cnt, target_cnt) - self.assertEqual(teh.ev_written_sz, target_cnt * len('Lorem Ipsum')) - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - self.assertEqual(teh.last_read, b'Lorem Ipsum') - def test_tree_run_write_distant(self): """test tree run with write(), distant target""" self._tree_run_write(NODE_DISTANT) @@ -306,25 +455,7 @@ def test_tree_run_write_gateway_mt(self): """test tree run with write(), gateway is target, not in topology, separate thread""" self._tree_run_write(NODE_GATEWAY, separate_thread=True) - def _tree_copy_file(self, target): - teh = TEventHandler() - srcf = make_temp_file(b'Lorem Ipsum', 'test_tree_copy_file_src') - dest = make_temp_filename('test_tree_copy_file_dest') - try: - worker = self.task.copy(srcf.name, dest, nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - with open(dest, 'r') as destf: - self.assertEqual(destf.read(), 'Lorem Ipsum') - finally: - os.remove(dest) + ### copy ### def test_tree_copy_file_distant(self): """test tree copy: file, distant target""" @@ -346,42 +477,6 @@ def test_tree_copy_file_gateway(self): """test tree copy: file, gateway is target""" self._tree_copy_file(NODE_GATEWAY) - def _tree_copy_dir(self, target): - teh = TEventHandler() - - srcdir = make_temp_dir() - destdir = make_temp_dir() - file1 = make_temp_file(b'Lorem Ipsum Unum', suffix=".txt", - dir=srcdir.name) - file2 = make_temp_file(b'Lorem Ipsum Duo', suffix=".txt", - dir=srcdir.name) - - try: - # add '/' to dest so that distant does like the others - worker = self.task.copy(srcdir.name, destdir.name + '/', - nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # copy successful? - copy_dest = join(destdir.name, srcdir.name) - with open(join(copy_dest, basename(file1.name)), 'rb') as rfile1: - self.assertEqual(rfile1.read(), b'Lorem Ipsum Unum') - with open(join(copy_dest, basename(file2.name)), 'rb') as rfile2: - self.assertEqual(rfile2.read(), b'Lorem Ipsum Duo') - finally: - file1.close() - file2.close() - srcdir.cleanup() - destdir.cleanup() - def test_tree_copy_dir_distant(self): """test tree copy: directory, distant target""" self._tree_copy_dir(NODE_DISTANT) @@ -390,7 +485,6 @@ def test_tree_copy_dir_distant2(self): """test tree copy: directory, distant 2 targets""" self._tree_copy_dir(NODE_DISTANT2) - def test_tree_copy_dir_direct(self): """test tree copy: directory, direct target, in topology""" self._tree_copy_dir(NODE_DIRECT) @@ -403,42 +497,7 @@ def test_tree_copy_dir_gateway(self): """test tree copy: directory, gateway is target""" self._tree_copy_dir(NODE_GATEWAY) - def _tree_rcopy_dir(self, target): - teh = TEventHandler() - - b1 = b'Lorem Ipsum Unum' * 100 - b2 = b'Lorem Ipsum Duo' * 100 - - srcdir = make_temp_dir() - destdir = make_temp_dir() - file1 = make_temp_file(b1, suffix=".txt", dir=srcdir.name) - file2 = make_temp_file(b2, suffix=".txt", dir=srcdir.name) - - try: - worker = self.task.rcopy(srcdir.name, destdir.name, nodes=target, - handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # rcopy successful? - for tgt in NodeSet(target): - rcopy_dest = join(destdir.name, basename(srcdir.name) + '.' + tgt) - with open(join(rcopy_dest, basename(file1.name)), 'rb') as rfile1: - self.assertEqual(rfile1.read(), b1) - with open(join(rcopy_dest, basename(file2.name)), 'rb') as rfile2: - self.assertEqual(rfile2.read(), b2) - finally: - file1.close() - file2.close() - srcdir.cleanup() - destdir.cleanup() + ### rcopy ### def test_tree_rcopy_dir_distant(self): """test tree rcopy: directory, distant target""" @@ -462,38 +521,6 @@ def test_tree_rcopy_dir_gateway(self): """test tree rcopy: directory, gateway is target""" self._tree_rcopy_dir(NODE_GATEWAY) - def _tree_rcopy_file(self, target): - teh = TEventHandler() - - # The file needs to be large enough to test GH#545 - b1 = b'Lorem Ipsum' * 1100000 - - srcdir = make_temp_dir() - destdir = make_temp_dir() - srcfile = make_temp_file(b1, suffix=".txt", dir=srcdir.name) - - try: - worker = self.task.rcopy(srcfile.name, destdir.name, nodes=target, handler=teh) - self.task.run() - target_cnt = len(NodeSet(target)) - self.assertEqual(teh.ev_start_cnt, 1) - self.assertEqual(teh.ev_pickup_cnt, target_cnt) - self.assertEqual(teh.ev_read_cnt, 0) - #self.assertEqual(teh.ev_written_cnt, 0) # FIXME - self.assertEqual(teh.ev_hup_cnt, target_cnt) - self.assertEqual(teh.ev_timedout_cnt, 0) - self.assertEqual(teh.ev_close_cnt, 1) - - # rcopy successful? - for tgt in NodeSet(target): - rcopy_dest = join(destdir.name, basename(srcfile.name) + '.' + tgt) - with open(rcopy_dest, 'rb') as tfile: - self.assertEqual(tfile.read(), b1) - finally: - srcfile.close() - srcdir.cleanup() - destdir.cleanup() - def test_tree_rcopy_file_distant(self): """test tree rcopy: file, distant target""" self._tree_rcopy_file(NODE_DISTANT) @@ -519,3 +546,149 @@ def test_tree_worker_missing_arguments(self): def test_tree_worker_name_compat(self): """test TreeWorker former name (WorkerTree)""" self.assertEqual(TreeWorker, WorkerTree) + + +@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") +class TreeWorkerGW2Test(TreeWorkerTestBase): + """ + TreeWorkerTest: test TreeWorker with two functional gateways + + NODE_HEAD -> NODE_GATEWAY2 -> NODE_DISTANT2 + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + TreeWorkerTestBase.setUp(self) + # set task topology + graph = TopologyGraph() + graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY2)) + graph.add_route(NodeSet(NODE_GATEWAY2), NodeSet(NODE_DISTANT2)) + self.task.topology = graph.to_tree(HOSTNAME) + + def test_tree_run_gw2_event(self): + """test tree run with EventHandler and 2 gateways""" + teh = TEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 2) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def test_tree_run_gw2_event_timeout(self): + """test tree run with EventHandler, 2 gateways with timeout""" + teh = TEventHandler() + self.task.run('sleep 10', nodes=NODE_DISTANT2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 0) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 1) # command timed out + self.assertEqual(teh.ev_close_cnt, 1) + + def test_tree_run_gw2_event_timeout_2w(self): + """test tree run with EventHandler, 2 gateways with timeout, 2 workers""" + teh = TEventHandler() + n1, n2 = NodeSet(NODE_DISTANT2).split(2) + self.task.shell('sleep 10', nodes=n1, handler=teh, timeout=0.5) + self.task.run('sleep 10', nodes=n2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 0) + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 2) # command timed out + self.assertEqual(teh.ev_close_cnt, 2) + + def test_tree_run_gw2_write_distant(self): + """test tree run with write(), 2 gateways, distant target""" + self._tree_run_write(NODE_DISTANT) + + def test_tree_run_gw2_write_distant2(self): + """test tree run with write(), 2 gateways, distant 2 targets""" + self._tree_run_write(NODE_DISTANT2) + + def test_tree_run_gw2_write_distant2_mt(self): + """test tree run with write(), 2 gateways, distant 2 targets, separate thread""" + self._tree_run_write(NODE_DISTANT2, separate_thread=True) + + +@unittest.skipIf(HOSTNAME == 'localhost', "does not work with hostname set to 'localhost'") +class TreeWorkerGW2F1FTest(TreeWorkerTestBase): + """ + TreeWorkerTest: test TreeWorker with two gateways, one being failed + + NODE_HEAD -> NODE_GATEWAY2F1 -> NODE_DISTANT2 + + Connections are really established to the target and command results + are tested. + """ + + def setUp(self): + """setup test environment topology""" + TreeWorkerTestBase.setUp(self) + # set task topology + graph = TopologyGraph() + graph.add_route(NodeSet(HOSTNAME), NodeSet(NODE_GATEWAY2F1)) + graph.add_route(NodeSet(NODE_GATEWAY2F1), NodeSet(NODE_DISTANT2)) + self.task.topology = graph.to_tree(HOSTNAME) + + def test_tree_run_gw2f1_event(self): + """test tree run with EventHandler and 1/2 gateways""" + teh = TEventHandler() + self.task.run('echo Lorem Ipsum', nodes=NODE_DISTANT2, handler=teh) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 3) # 2 + gw error + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 2) + self.assertEqual(teh.ev_timedout_cnt, 0) + self.assertEqual(teh.ev_close_cnt, 1) + self.assertEqual(teh.last_read, b'Lorem Ipsum') + + def test_tree_run_gw2f1_event_timeout(self): + """test tree run with EventHandler, 1/2 gateways with timeout""" + teh = TEventHandler() + self.task.run('sleep 10', nodes=NODE_DISTANT2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 1) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 1) # 1 gateway failure to read + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 1) # command timed out + self.assertEqual(teh.ev_close_cnt, 1) + + def test_tree_run_gw2f1_event_timeout_2w(self): + """test tree run with EventHandler, 1/2 gateways with timeout, 2 workers""" + teh = TEventHandler() + n1, n2 = NodeSet(NODE_DISTANT2).split(2) + self.task.shell('sleep 10', nodes=n1, handler=teh, timeout=0.5) + self.task.run('sleep 10', nodes=n2, handler=teh, timeout=0.5) + self.assertEqual(teh.ev_start_cnt, 2) + self.assertEqual(teh.ev_pickup_cnt, 2) + self.assertEqual(teh.ev_read_cnt, 1) # 1 gateway failure to read + self.assertEqual(teh.ev_written_cnt, 0) + self.assertEqual(teh.ev_hup_cnt, 0) # no hup event if timed out + self.assertEqual(teh.ev_timedout_cnt, 2) # command timed out + self.assertEqual(teh.ev_close_cnt, 2) + + def test_tree_run_gw2f1_write_distant(self): + """test tree run with write(), 1/2 gateways, distant target""" + self._tree_run_write(NODE_DISTANT) + + # FIXME, issue with stdin write in gw2f1 mode + #def test_tree_run_gw2f1_write_distant2(self): + # """test tree run with write(), 1/2 gateways, distant 2 targets""" + # logging.basicConfig(level=logging.DEBUG) + # self._tree_run_write(NODE_DISTANT2) + + def test_tree_run_gw2f1_write_distant2_mt(self): + """test tree run with write(), 1/2 gateways, distant 2 targets, separate thread""" + self._tree_run_write(NODE_DISTANT2, separate_thread=True)