Skip to content

Commit b75a928

Browse files
committed
Tree: implement per-gateway abort
This commit adds the functionality to abort a specific gateway channel from the initiator. Until now, this was not properly handled. This also fixes gateway failover. Changes: * Implement TreeWorker._gateway_abort() that can be used to abort/cancel all tasks being done by the TreeWorker via the specified gateway. In case of such abort (likely due to some gateway failure), a special return code 76 (os.EX_PROTOCOL) is used for closing all running remote commands via this gateway. This return code is sometimes used to specify a "Remote protocol error" / "An error occurred in a remote communication protocol" which seems appropriate here. * Implement a new Task._pchannel_closing() method that is called on PropagationChannel.ev_close(), so deterministically every time a gateway channel is closing (self-initiated or not). This method performs necessary cleanup actions, but most notably calls TreeWorker._gateway_abort(gateway) on each worker currently using the gateway channel. * Update Task._pchannel_release() so that it now calls PropagationChannel._close() instead of Worker.abort() to properly reset the channel's opened/setup flags. * Updated TreeWorkerTest with tests to better cover the above and gateway failover. Part of #229 and extended work on #566.
1 parent 7e5e3f6 commit b75a928

File tree

5 files changed

+378
-172
lines changed

5 files changed

+378
-172
lines changed

lib/ClusterShell/Communication.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,9 @@ def _open(self):
203203
xmlgen = XMLGenerator(self.worker, encoding=ENCODING)
204204
xmlgen.startElement('channel', {'version': __version__})
205205

206-
def _close(self):
206+
def _close(self, abort=False):
207207
"""close an already opened channel"""
208-
send_endtag = self.opened
209-
if send_endtag:
208+
if self.opened and not abort:
210209
XMLGenerator(self.worker, encoding=ENCODING).endElement('channel')
211210
self.worker.abort()
212211
self.opened = self.setup = False

lib/ClusterShell/Propagation.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,7 @@ def recv(self, msg):
253253
if msg.type == EndMessage.ident:
254254
#??#self.ptree.notify_close()
255255
self.logger.debug("got EndMessage; closing")
256-
# abort worker (now working)
257-
self.worker.abort()
256+
self._close()
258257
elif msg.type == StdErrMessage.ident and msg.srcid == 0:
259258
# Handle error messages when channel is not established yet
260259
# or if messages are non-routed (eg. gateway-related)
@@ -424,3 +423,6 @@ def ev_close(self, worker, timedout):
424423
self.logger.debug("channel was not set up: redistributing...")
425424
for mw in set(self.task.gateways[gateway][1]):
426425
mw._relaunch(gateway)
426+
427+
# update Task that we are closing
428+
worker.task._pchannel_close(gateway, worker)

lib/ClusterShell/Task.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1365,8 +1365,10 @@ def _pchannel(self, gateway, metaworker):
13651365
def _pchannel_release(self, gateway, metaworker):
13661366
"""Release propagation channel associated to gateway.
13671367
1368-
Lookup by gateway, decref associated metaworker set and release
1369-
channel worker if needed.
1368+
Lookup by gateway, decref associated metaworker set and abort channel
1369+
worker if not used anymore.
1370+
1371+
Called by TreeWorker._check_fini()
13701372
"""
13711373
logger = logging.getLogger(__name__)
13721374
logger.debug("pchannel_release %s %s", gateway, metaworker)
@@ -1381,11 +1383,29 @@ def _pchannel_release(self, gateway, metaworker):
13811383
chanworker, metaworkers = self.gateways[gwstr]
13821384
metaworkers.remove(metaworker)
13831385
if len(metaworkers) == 0:
1384-
logger.debug("pchannel_release: destroying channel %s",
1386+
logger.debug("pchannel_release: closing channel %s",
13851387
chanworker.eh)
1386-
chanworker.abort()
1387-
# delete gateway reference
1388-
del self.gateways[gwstr]
1388+
# Call PropagationChannel._close() that will close the channel
1389+
# properly, update the opened/setup flags and abort the worker.
1390+
# We might be in an event handler and we want to make sure we
1391+
# ignore any pending messages from this gateway from now on.
1392+
chanworker.eh._close(abort=True)
1393+
1394+
def _pchannel_close(self, gateway, chanworker):
1395+
"""A propagation channel is closing.
1396+
1397+
Perform necessary cleanup actions when a gateway channel is closing.
1398+
1399+
Called by PropagationChannel.ev_close().
1400+
"""
1401+
logger = logging.getLogger(__name__)
1402+
logger.debug("pchannel_closing: %s", gateway)
1403+
chwrk, metaworkers = self.gateways[gateway]
1404+
assert chwrk is chanworker, (chwrk, chanworker)
1405+
metaworkers_copy = list(metaworkers)
1406+
for mw in metaworkers_copy:
1407+
mw._gateway_abort(gateway)
1408+
del self.gateways[gateway]
13891409

13901410

13911411
def task_self(defaults=None):

lib/ClusterShell/Worker/Tree.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,18 @@ def set_write_eof(self):
564564

565565
self._set_write_eof_remote()
566566

567+
def _gateway_abort(self, gateway):
568+
"""Abort on gateway failure"""
569+
if gateway not in self.gwtargets:
570+
self.logger.warning("TreeWorker._gateway_abort %s not found",
571+
gateway)
572+
return
573+
targets = self.gwtargets[gateway]
574+
self.logger.debug("TreeWorker._gateway_abort %s found: targets=%s",
575+
gateway, targets)
576+
for target in NodeSet.fromlist(targets): # targets is a mutable list
577+
self._on_remote_node_close(target, os.EX_PROTOCOL, gateway)
578+
567579
def abort(self):
568580
"""Abort processing any action by this worker."""
569581
# Not yet supported by TreeWorker

0 commit comments

Comments
 (0)