Skip to content

Commit 4e55b70

Browse files
committed
Worker/Tree.py: implement reverse file copy
Implement tree mode reverse copy using tar commands. Principle is similar to the one used for regular copy, but tar data are transferred back through stdout (instead of sent through stdin). Data is also encoded using base64. To ease tar extraction on head node, each hostname is added as suffix to the base file (or directory) by tar itself, hence the complicated command. Closes #290 (fix clush --rcopy). Change-Id: Ia7a7ab4ab513520b46fbea96836243497fc2c94a
1 parent d14b619 commit 4e55b70

File tree

5 files changed

+156
-61
lines changed

5 files changed

+156
-61
lines changed

ChangeLog

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
2016-02-27 S. Thiell <[email protected]>
22

3+
* Worker/Tree.py: implement tree mode reverse copy using tar commands to
4+
fix clush --rcopy (ticket #290).
5+
36
* Communication.py: remove 76-char base64 encoding fixed length
47
restriction for tree XML payload communication. The default max length is
58
now 64K, which gives good results. The environment variable

lib/ClusterShell/CLI/Clush.py

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -640,11 +640,6 @@ def run_command(task, cmd, ns, timeout, display, remote):
640640
"""
641641
task.set_default("USER_running", True)
642642

643-
if display.verbosity >= VERB_VERB and task.topology:
644-
print Display.COLOR_RESULT_FMT % '-' * 15
645-
print Display.COLOR_RESULT_FMT % task.topology,
646-
print Display.COLOR_RESULT_FMT % '-' * 15
647-
648643
if (display.gather or display.line_mode) and ns is not None:
649644
if display.gather and display.line_mode:
650645
handler = LiveGatherOutputHandler(display, ns)
@@ -675,20 +670,15 @@ def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
675670
task.set_default("USER_running", True)
676671
task.set_default("USER_copies", len(sources))
677672

678-
if display.verbosity >= VERB_VERB and task.topology:
679-
print Display.COLOR_RESULT_FMT % '-' * 15
680-
print Display.COLOR_RESULT_FMT % task.topology,
681-
print Display.COLOR_RESULT_FMT % '-' * 15
682-
683673
copyhandler = CopyOutputHandler(display)
684674
if display.verbosity in (VERB_STD, VERB_VERB):
685675
copyhandler.runtimer_init(task, len(ns) * len(sources))
686676

687677
# Sources check
688678
for source in sources:
689679
if not exists(source):
690-
display.vprint_err(VERB_QUIET, "ERROR: file \"%s\" not found" % \
691-
source)
680+
display.vprint_err(VERB_QUIET,
681+
'ERROR: file "%s" not found' % source)
692682
clush_exit(1, task)
693683
task.copy(source, dest, ns, handler=copyhandler, timeout=timeout,
694684
preserve=preserve_flag)
@@ -701,20 +691,20 @@ def run_rcopy(task, sources, dest, ns, timeout, preserve_flag, display):
701691

702692
# Sanity checks
703693
if not exists(dest):
704-
display.vprint_err(VERB_QUIET, "ERROR: directory \"%s\" not found" % \
705-
dest)
694+
display.vprint_err(VERB_QUIET,
695+
'ERROR: directory "%s" not found' % dest)
706696
clush_exit(1, task)
707697
if not isdir(dest):
708-
display.vprint_err(VERB_QUIET, \
709-
"ERROR: destination \"%s\" is not a directory" % dest)
698+
display.vprint_err(VERB_QUIET,
699+
'ERROR: destination "%s" is not a directory' % dest)
710700
clush_exit(1, task)
711701

712702
copyhandler = CopyOutputHandler(display, True)
713703
if display.verbosity == VERB_STD or display.verbosity == VERB_VERB:
714704
copyhandler.runtimer_init(task, len(ns) * len(sources))
715705
for source in sources:
716706
task.rcopy(source, dest, ns, handler=copyhandler, timeout=timeout,
717-
preserve=preserve_flag)
707+
stderr=True, preserve=preserve_flag)
718708
task.resume()
719709

720710
def set_fdlimit(fd_max, display):
@@ -977,9 +967,12 @@ def main():
977967

978968
if options.grooming_delay:
979969
if config.verbosity >= VERB_VERB:
980-
print Display.COLOR_RESULT_FMT % ("Grooming delay: %f" % \
981-
options.grooming_delay)
970+
print Display.COLOR_RESULT_FMT % ("Grooming delay: %f"
971+
% options.grooming_delay)
982972
task.set_info("grooming_delay", options.grooming_delay)
973+
elif options.rcopy:
974+
# By default, --rcopy should inhibit grooming
975+
task.set_info("grooming_delay", 0)
983976

984977
if config.ssh_user:
985978
task.set_info("ssh_user", config.ssh_user)
@@ -1043,6 +1036,10 @@ def main():
10431036
config.command_timeout,
10441037
op))
10451038
if not task.default("USER_interactive"):
1039+
if display.verbosity >= VERB_VERB and task.topology:
1040+
print Display.COLOR_RESULT_FMT % '-' * 15
1041+
print Display.COLOR_RESULT_FMT % task.topology,
1042+
print Display.COLOR_RESULT_FMT % '-' * 15
10461043
if options.copy:
10471044
run_copy(task, args, options.dest_path, nodeset_base, timeout,
10481045
options.preserve_flag, display)

lib/ClusterShell/Propagation.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/usr/bin/env python
22
#
3-
# Copyright CEA/DAM/DIF (2010-2015)
3+
# Copyright CEA/DAM/DIF (2010-2016)
44
# Contributor: Henri DOREAU <[email protected]>
55
# Contributor: Stephane THIELL <[email protected]>
66
#
@@ -263,7 +263,7 @@ def start(self):
263263

264264
def recv(self, msg):
265265
"""process incoming messages"""
266-
self.logger.debug("[DBG] rcvd from: %s", msg)
266+
self.logger.debug("recv: %s", msg)
267267
if msg.type == EndMessage.ident:
268268
#??#self.ptree.notify_close()
269269
self.logger.debug("got EndMessage; closing")
@@ -344,7 +344,6 @@ def recv_cfg(self, msg):
344344

345345
def recv_ctl(self, msg):
346346
"""handle incoming messages for state 'control'"""
347-
self.logger.debug("recv_ctl")
348347
if msg.type == 'ACK':
349348
self.logger.debug("got ack (%s)", msg.type)
350349
# check if ack matches write history msgid to generate ev_written
@@ -361,7 +360,8 @@ def recv_ctl(self, msg):
361360
if metaworker.eh:
362361
nodeset = NodeSet(msg.nodes)
363362
decoded = msg.data_decode() + '\n'
364-
self.logger.debug("StdOutMessage: \"%s\"", decoded)
363+
# this is way too verbose for --rcopy
364+
#self.logger.debug("StdOutMessage: \"%s\"", decoded)
365365
for line in decoded.splitlines():
366366
for node in nodeset:
367367
metaworker._on_remote_node_msgline(node,

lib/ClusterShell/Worker/Tree.py

Lines changed: 113 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
ClusterShell v2 tree propagation worker
3535
"""
3636

37+
import base64
3738
import logging
3839
import os
3940
from os.path import basename, dirname, isfile, normpath
@@ -67,8 +68,6 @@ def ev_read(self, worker):
6768
"""
6869
Called to indicate that a worker has data to read.
6970
"""
70-
self.logger.debug("MetaWorkerEventHandler: ev_read (%s)",
71-
worker.current_sname)
7271
self.metaworker._on_node_msgline(worker.current_node,
7372
worker.current_msg,
7473
'stdout')
@@ -127,7 +126,12 @@ class WorkerTree(DistantWorker):
127126
ClusterShell tree worker Class.
128127
129128
"""
130-
UNTAR_CMD_FMT = 'tar -xf - -C "%s"'
129+
# copy and rcopy tar command formats
130+
# the choice of single or double quotes is essential
131+
UNTAR_CMD_FMT = "tar -xf - -C '%s'"
132+
TAR_CMD_FMT = "tar -cf - -C '%s' " \
133+
"--transform \"s,^\\([^/]*\\)[/]*,\\1.$(hostname -s)/,\" " \
134+
"'%s' | base64 -w 65536"
131135

132136
def __init__(self, nodes, handler, timeout, **kwargs):
133137
"""
@@ -142,6 +146,7 @@ def __init__(self, nodes, handler, timeout, **kwargs):
142146
"""
143147
DistantWorker.__init__(self, handler)
144148

149+
self.logger = logging.getLogger(__name__)
145150
self.workers = []
146151
self.nodes = NodeSet(nodes)
147152
self.timeout = timeout
@@ -150,18 +155,27 @@ def __init__(self, nodes, handler, timeout, **kwargs):
150155
self.dest = kwargs.get('dest')
151156
autoclose = kwargs.get('autoclose', False)
152157
self.stderr = kwargs.get('stderr', False)
158+
self.logger.debug("stderr=%s", self.stderr)
153159
self.remote = kwargs.get('remote', True)
160+
self.preserve = kwargs.get('preserve', None)
161+
self.reverse = kwargs.get('reverse', False)
162+
self._rcopy_bufs = {}
163+
self._rcopy_tars = {}
154164
self._close_count = 0
155165
self._start_count = 0
156166
self._child_count = 0
157167
self._target_count = 0
158168
self._has_timeout = False
159-
self.logger = logging.getLogger(__name__)
160169

161170
if self.command is None and self.source is None:
162171
raise ValueError("missing command or source parameter in "
163172
"WorkerTree constructor")
164173

174+
# rcopy is enforcing separated stderr to handle tar error messages
175+
# because stdout is used for data transfer
176+
if self.source and self.reverse:
177+
self.stderr = True
178+
165179
# build gateway invocation command
166180
invoke_gw_args = []
167181
for envname in ('PYTHONPATH',
@@ -183,6 +197,7 @@ def __init__(self, nodes, handler, timeout, **kwargs):
183197
self.router = None
184198

185199
self.upchannel = None
200+
186201
self.metahandler = MetaWorkerEventHandler(self)
187202

188203
# gateway -> active targets selection
@@ -211,19 +226,29 @@ def _launch(self, nodes):
211226
# Prepare copy params if source is defined
212227
destdir = None
213228
if self.source:
214-
self.logger.debug("copy self.dest=%s", self.dest)
215-
# Special processing to determine best arcname and destdir for tar.
216-
# The only case that we don't support is when source is a file and
217-
# dest is a dir without a finishing / (in that case we cannot
218-
# determine remotely whether it is a file or a directory).
219-
if isfile(self.source):
220-
# dest is not normalized here
221-
arcname = basename(self.dest) or basename(normpath(self.source))
222-
destdir = dirname(self.dest)
229+
if self.reverse:
230+
self.logger.debug("rcopy source=%s, dest=%s", self.source,
231+
self.dest)
232+
# dest is a directory
233+
destdir = self.dest
223234
else:
224-
arcname = basename(normpath(self.source))
225-
destdir = os.path.normpath(self.dest)
226-
self.logger.debug("copy arcname=%s destdir=%s", arcname, destdir)
235+
self.logger.debug("copy source=%s, dest=%s", self.source,
236+
self.dest)
237+
# Special processing to determine best arcname and destdir for
238+
# tar. The only case that we don't support is when source is a
239+
# file and dest is a dir without a finishing / (in that case we
240+
# cannot determine remotely whether it is a file or a
241+
# directory).
242+
if isfile(self.source):
243+
# dest is not normalized here
244+
arcname = basename(self.dest) or \
245+
basename(normpath(self.source))
246+
destdir = dirname(self.dest)
247+
else:
248+
arcname = basename(normpath(self.source))
249+
destdir = os.path.normpath(self.dest)
250+
self.logger.debug("copy arcname=%s destdir=%s", arcname,
251+
destdir)
227252

228253
# And launch stuffs
229254
next_hops = self._distribute(self.task.info("fanout"), nodes.copy())
@@ -238,15 +263,18 @@ def _launch(self, nodes):
238263
self._target_count += len(targets)
239264
if self.remote:
240265
if self.source:
241-
self.logger.debug('_launch remote untar (destdir=%s)',
242-
destdir)
243-
self.command = self.UNTAR_CMD_FMT % destdir
244-
worker = self.task.shell(self.command,
245-
nodes=targets,
246-
timeout=self.timeout,
247-
handler=self.metahandler,
248-
stderr=self.stderr,
249-
tree=False)
266+
# Note: specific case where targets are not in topology
267+
# as self.source is never used on remote gateways
268+
# so we try a direct copy/rcopy:
269+
self.logger.debug('_launch copy r=%s source=%s dest=%s',
270+
self.reverse, self.source, self.dest)
271+
worker = self.task.copy(self.source, self.dest, targets,
272+
handler=self.metahandler,
273+
stderr=self.stderr,
274+
timeout=self.timeout,
275+
preserve=self.preserve,
276+
reverse=self.reverse,
277+
tree=False)
250278
else:
251279
worker = self.task.shell(self.command,
252280
nodes=targets,
@@ -270,13 +298,13 @@ def _launch(self, nodes):
270298
self.logger.debug("trying gateway %s to reach %s", gw, targets)
271299
if self.source:
272300
self._copy_remote(self.source, destdir, targets, gw,
273-
self.timeout)
301+
self.timeout, self.reverse)
274302
else:
275303
self._execute_remote(self.command, targets, gw,
276304
self.timeout)
277305

278306
# Copy mode: send tar data after above workers have been initialized
279-
if self.source:
307+
if self.source and not self.reverse:
280308
try:
281309
# create temporary tar file with all source files
282310
tmptar = tempfile.TemporaryFile()
@@ -305,16 +333,25 @@ def _distribute(self, fanout, dst_nodeset):
305333
distribution[gw] = dstset
306334
return distribution
307335

308-
def _copy_remote(self, source, dest, targets, gateway, timeout):
336+
def _copy_remote(self, source, dest, targets, gateway, timeout, reverse):
309337
"""run a remote copy in tree mode (using gateway)"""
310-
self.logger.debug("_copy_remote gateway=%s source=%s dest=%s",
311-
gateway, source, dest)
338+
self.logger.debug("_copy_remote gateway=%s source=%s dest=%s "
339+
"reverse=%s", gateway, source, dest, reverse)
312340

313341
self._target_count += len(targets)
314342

315343
self.gwtargets[gateway] = targets.copy()
316344

317-
cmd = self.UNTAR_CMD_FMT % dest
345+
# tar commands are built here and launched on targets
346+
if reverse:
347+
# these weird replace calls aim to escape single quotes ' within ''
348+
srcdir = dirname(source).replace("'", '\'\"\'\"\'')
349+
srcbase = basename(normpath(self.source)).replace("'", '\'\"\'\"\'')
350+
cmd = self.TAR_CMD_FMT % (srcdir, srcbase)
351+
else:
352+
cmd = self.UNTAR_CMD_FMT % dest.replace("'", '\'\"\'\"\'')
353+
354+
self.logger.debug('_copy_remote: tar cmd: %s', cmd)
318355

319356
pchan = self.task._pchannel(gateway, self)
320357
pchan.shell(nodes=targets, command=cmd, worker=self, timeout=timeout,
@@ -343,17 +380,61 @@ def _engine_clients(self):
343380
return []
344381

345382
def _on_remote_node_msgline(self, node, msg, sname, gateway):
346-
DistantWorker._on_node_msgline(self, node, msg, sname)
383+
"""remote msg received"""
384+
if not self.source or not self.reverse or sname != 'stdout':
385+
DistantWorker._on_node_msgline(self, node, msg, sname)
386+
return
387+
388+
# rcopy only: we expect base64 encoded tar content on stdout
389+
encoded = self._rcopy_bufs.setdefault(node, '') + msg
390+
if node not in self._rcopy_tars:
391+
self._rcopy_tars[node] = tempfile.TemporaryFile()
392+
393+
# partial base64 decoding requires a multiple of 4 characters
394+
encoded_sz = (len(encoded) // 4) * 4
395+
# write decoded binary msg to node temporary tarfile
396+
self._rcopy_tars[node].write(base64.b64decode(encoded[0:encoded_sz]))
397+
# keep trailing encoded chars for next time
398+
self._rcopy_bufs[node] = encoded[encoded_sz:]
347399

348400
def _on_remote_node_rc(self, node, rc, gateway):
401+
"""remote rc received"""
349402
DistantWorker._on_node_rc(self, node, rc)
350403
self.logger.debug("_on_remote_node_rc %s %s via gw %s", node,
351404
self._close_count, gateway)
405+
406+
# finalize rcopy: extract tar data
407+
if self.source and self.reverse:
408+
for node, buf in self._rcopy_bufs.iteritems():
409+
tarfileobj = self._rcopy_tars[node]
410+
if len(buf) > 0:
411+
self.logger.debug("flushing node %s buf %d bytes", node,
412+
len(buf))
413+
tarfileobj.write(buf)
414+
tarfileobj.flush()
415+
tarfileobj.seek(0)
416+
try:
417+
tmptar = tarfile.open(fileobj=tarfileobj)
418+
try:
419+
self.logger.debug("%s extracting %d members in dest %s",
420+
node, len(tmptar.getmembers()),
421+
self.dest)
422+
tmptar.extractall(path=self.dest)
423+
except IOError, ex:
424+
self._on_remote_node_msgline(node, ex, 'stderr',
425+
gateway)
426+
# note: try-except-finally not supported before python 2.5
427+
finally:
428+
tmptar.close()
429+
self._rcopy_bufs = {}
430+
self._rcopy_tars = {}
431+
352432
self.gwtargets[gateway].remove(node)
353433
self._close_count += 1
354434
self._check_fini(gateway)
355435

356436
def _on_remote_node_timeout(self, node, gateway):
437+
"""remote node timeout received"""
357438
DistantWorker._on_node_timeout(self, node)
358439
self.logger.debug("_on_remote_node_timeout %s via gw %s", node, gateway)
359440
self._close_count += 1

0 commit comments

Comments
 (0)