Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions lib/ClusterShell/Communication.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,9 @@ def _open(self):
xmlgen = XMLGenerator(self.worker, encoding=ENCODING)
xmlgen.startElement('channel', {'version': __version__})

def _close(self):
def _close(self, abort=False):
"""close an already opened channel"""
send_endtag = self.opened
if send_endtag:
if self.opened and not abort:
XMLGenerator(self.worker, encoding=ENCODING).endElement('channel')
self.worker.abort()
self.opened = self.setup = False
Expand Down
30 changes: 20 additions & 10 deletions lib/ClusterShell/Propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,7 @@ def recv(self, msg):
if msg.type == EndMessage.ident:
#??#self.ptree.notify_close()
self.logger.debug("got EndMessage; closing")
# abort worker (now working)
self.worker.abort()
self._close()
elif msg.type == StdErrMessage.ident and msg.srcid == 0:
# Handle error messages when channel is not established yet
# or if messages are non-routed (eg. gateway-related)
Expand Down Expand Up @@ -394,24 +393,35 @@ def recv_ctl(self, msg):

def ev_hup(self, worker, node, rc):
"""Channel command is closing"""
self.logger.debug("ev_hup gateway=%s %s", str(worker.nodes), self)
self._rc = rc

def ev_close(self, worker, timedout):
"""Channel is closing"""
# do not use worker buffer or rc accessors here as we doesn't use
# common stream names
gateway = str(worker.nodes)
self.logger.debug("ev_close gateway=%s %s", gateway, self)
self.logger.debug("ev_close rc=%s", self._rc) # may be None

# NOTE: self._rc may be None if the communication channel has aborted
if self._rc != 0:
self.logger.debug("error on gateway %s (setup=%s)", gateway,
self.setup)
self.logger.debug("ev_close gateway=%s rc=%s %s", gateway, self._rc,
self)

# NOTE: self._rc is set None when _we_ close the channel (abort)
if self._rc is None and not self.setup:
# aborting before the channel is setup is worth a warning
self.logger.warning("ev_close: rc=%s with channel not setup",
self._rc)

if self._rc is not None and self._rc != 0:
# handle gateway channel error
self.logger.debug("error on gateway %s (rc=%s, setup=%s)", gateway,
self._rc, self.setup)
self.task.router.mark_unreachable(gateway)
self.logger.debug("gateway %s now set as unreachable", gateway)

if not self.setup:
# channel was not set up: we can safely repropagate commands
# channel was not set up: we can safely redistribute commands
self.logger.debug("channel was not set up: redistributing...")
for mw in set(self.task.gateways[gateway][1]):
mw._relaunch(gateway)

# update Task that we are closing
worker.task._pchannel_close(gateway, worker)
32 changes: 26 additions & 6 deletions lib/ClusterShell/Task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1365,8 +1365,10 @@ def _pchannel(self, gateway, metaworker):
def _pchannel_release(self, gateway, metaworker):
"""Release propagation channel associated to gateway.

Lookup by gateway, decref associated metaworker set and release
channel worker if needed.
Lookup by gateway, decref associated metaworker set and abort channel
worker if not used anymore.

Called by TreeWorker._check_fini()
"""
logger = logging.getLogger(__name__)
logger.debug("pchannel_release %s %s", gateway, metaworker)
Expand All @@ -1381,11 +1383,29 @@ def _pchannel_release(self, gateway, metaworker):
chanworker, metaworkers = self.gateways[gwstr]
metaworkers.remove(metaworker)
if len(metaworkers) == 0:
logger.debug("pchannel_release: destroying channel %s",
logger.debug("pchannel_release: closing channel %s",
chanworker.eh)
chanworker.abort()
# delete gateway reference
del self.gateways[gwstr]
# Call PropagationChannel._close() that will close the channel
# properly, update the opened/setup flags and abort the worker.
# We might be in an event handler and we want to make sure we
# ignore any pending messages from this gateway from now on.
chanworker.eh._close(abort=True)

def _pchannel_close(self, gateway, chanworker):
"""A propagation channel is closing.

Perform necessary cleanup actions when a gateway channel is closing.

Called by PropagationChannel.ev_close().
"""
logger = logging.getLogger(__name__)
logger.debug("pchannel_closing: %s", gateway)
chwrk, metaworkers = self.gateways[gateway]
assert chwrk is chanworker, (chwrk, chanworker)
metaworkers_copy = list(metaworkers)
for mw in metaworkers_copy:
mw._gateway_abort(gateway)
del self.gateways[gateway]


def task_self(defaults=None):
Expand Down
16 changes: 14 additions & 2 deletions lib/ClusterShell/Worker/Tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ def _on_remote_node_msgline(self, node, msg, sname, gateway):
def _on_remote_node_close(self, node, rc, gateway):
"""remote node closing with return code"""
DistantWorker._on_node_close(self, node, rc)
self.logger.debug("_on_remote_node_close %s %s via gw %s", node,
self._close_count, gateway)
self.logger.debug("_on_remote_node_close %s %s via gw %s rc=%s", node,
self._close_count, gateway, rc)

# finalize rcopy: extract tar data
if self.source and self.reverse:
Expand Down Expand Up @@ -564,6 +564,18 @@ def set_write_eof(self):

self._set_write_eof_remote()

def _gateway_abort(self, gateway):
"""Abort on gateway failure"""
if gateway not in self.gwtargets:
self.logger.warning("TreeWorker._gateway_abort %s not found",
gateway)
return
targets = self.gwtargets[gateway]
self.logger.debug("TreeWorker._gateway_abort %s found: targets=%s",
gateway, targets)
for target in NodeSet.fromlist(targets): # targets is a mutable list
self._on_remote_node_close(target, os.EX_PROTOCOL, gateway)

def abort(self):
"""Abort processing any action by this worker."""
# Not yet supported by TreeWorker
Expand Down
Loading
Loading