Skip to content

Commit f564c21

Browse files
committed
Clush: added -P/--progress option
Added --progress option to clush to force display of the live progress indicator and display global write bandwidth when writing standard input (make use of the ev_written event handler). Added documentation for clush progress indicator. Change-Id: Iaa2b6613939792de9ada3d333c1c5cc5cc3144a9
1 parent fede70f commit f564c21

File tree

9 files changed

+162
-27
lines changed

9 files changed

+162
-27
lines changed

ChangeLog

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
2015-11-01 S. Thiell <[email protected]>
2+
3+
* Clush.py: added -P/--progress to force display of the live progress
4+
indicator and display global write bandwidth when writing standard input.
5+
16
2015-10-25 S. Thiell <[email protected]>
27

38
* Clush.py: added --option/-O clush.conf settings override (pull

doc/man/man1/clush.1

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.\" Man page generated from reStructuredText.
22
.
3-
.TH CLUSH 1 "2015-10-30" "1.7" "ClusterShell User Manual"
3+
.TH CLUSH 1 "2015-11-01" "1.7" "ClusterShell User Manual"
44
.SH NAME
55
clush \- execute shell commands on a cluster
66
.
@@ -213,6 +213,9 @@ disable header block and order output by nodes; additionally, when used in conju
213213
.B \-N
214214
disable labeling of command line
215215
.TP
216+
.B \-P\fP,\fB \-\-progress
217+
show progress during command execution; if writing is performed to standard input, the live progress indicator will display the global bandwidth of data written to the target nodes
218+
.TP
216219
.B \-b\fP,\fB \-\-dshbak
217220
display gathered results in a dshbak\-like way
218221
.TP

doc/sphinx/tools/clush.rst

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,14 +228,15 @@ specified remote hosts in parallel (given the current *fanout* value and the
228228
number of commands to execute (see *fanout* library settings in
229229
:ref:`class-Task-configure`).
230230

231+
.. _clush-gather:
232+
231233
Output gathering options
232234
""""""""""""""""""""""""
233235

234236
If option ``-b`` or ``--dshbak`` is specified, *clush* waits for command
235-
completion while displaying a progress indicator (unless ``-q, --quiet``
236-
switch is provided) and then displays gathered output results. If standard
237-
output is redirected to a file, *clush* detects it and disable any progress
238-
indicator.
237+
completion while displaying a :ref:`progress indicator <clush-progress>` and
238+
then displays gathered output results. If standard output is redirected to a
239+
file, *clush* detects it and disable any progress indicator.
239240

240241
The following is a simple example of *clush* command used to execute ``uname
241242
-r`` on *node40*, *node41* and *node42*, wait for their completion and finally
@@ -312,6 +313,32 @@ Another stdin-bound *clush* usage example::
312313
---------------
313314
/etc/yum.repos.d/cobbler-config.repo
314315

316+
.. _clush-progress:
317+
318+
Progress indicator
319+
""""""""""""""""""
320+
321+
In :ref:`output gathering mode <clush-gather>`, *clush* will display a live
322+
progress indicator as a simple but convenient way to follow the completion of
323+
parallel commands. It can be disabled just by using the ``-q`` or ``--quiet``
324+
options. The progress indicator will appear after 1 to 2 seconds and should
325+
look like this::
326+
327+
clush: <command_completed>/<command_total>
328+
329+
If writing is performed to *clush* standard input, like in ``command |
330+
clush``, the live progress indicator will display the global bandwidth of data
331+
written to the target nodes.
332+
333+
Finally, the special option ``--progress`` can be used to force the display of
334+
the live progress indicator. Using this option may interfere with some command
335+
outputs, but it can be useful when using stdin while remote commands are
336+
silent. As an example, the following command will copy a local file to
337+
node[1-3] and display the global write bandwidth to the target nodes::
338+
339+
$ dd if=/path/to/local/file | clush -w node[1-3] --progress 'dd of=/path/to/remote/file'
340+
clush: 0/3 write: 212.27 MiB/s
341+
315342
.. _clush-interactive:
316343

317344
Interactive mode

doc/txt/clush.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ execute shell commands on a cluster
77
-----------------------------------
88

99
:Author: Stephane Thiell <[email protected]>
10-
:Date: 2015-10-30
10+
:Date: 2015-11-01
1111
:Copyright: CeCILL-C V1
1212
:Version: 1.7
1313
:Manual section: 1
@@ -152,6 +152,7 @@ Output behaviour:
152152
-G, --groupbase do not display group source prefix
153153
-L disable header block and order output by nodes; additionally, when used in conjunction with -b/-B, it will enable "life gathering" of results by line mode, such as the next line is displayed as soon as possible (eg. when all nodes have sent the line)
154154
-N disable labeling of command line
155+
-P, --progress show progress during command execution; if writing is performed to standard input, the live progress indicator will display the global bandwidth of data written to the target nodes
155156
-b, --dshbak display gathered results in a dshbak-like way
156157
-B like -b but including standard error
157158
-r, --regroup fold nodeset using node groups

lib/ClusterShell/CLI/Clush.py

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import resource
5252
import sys
5353
import signal
54+
import time
5455
import threading
5556

5657
from ClusterShell.Defaults import DEFAULTS, _load_workerclass
@@ -59,7 +60,7 @@
5960
from ClusterShell.CLI.Display import VERB_QUIET, VERB_STD, VERB_VERB, VERB_DEBUG
6061
from ClusterShell.CLI.OptionParser import OptionParser
6162
from ClusterShell.CLI.Error import GENERIC_ERRORS, handle_generic_error
62-
from ClusterShell.CLI.Utils import NodeSet, bufnodeset_cmp
63+
from ClusterShell.CLI.Utils import NodeSet, bufnodeset_cmp, human_bi_bytes_unit
6364

6465
from ClusterShell.Event import EventHandler
6566
from ClusterShell.MsgTree import MsgTree
@@ -92,10 +93,11 @@ def __init__(self):
9293
EventHandler.__init__(self)
9394
self._runtimer = None
9495

95-
def runtimer_init(self, task, ntotal):
96+
def runtimer_init(self, task, ntotal=0):
9697
"""Init timer for live command-completed progressmeter."""
97-
self._runtimer = task.timer(2.0, RunTimer(task, ntotal),
98-
interval=1./3., autoclose=True)
98+
thandler = RunTimer(task, ntotal)
99+
self._runtimer = task.timer(1.33, thandler, interval=1./3.,
100+
autoclose=True)
99101

100102
def _runtimer_clean(self):
101103
"""Hide runtimer counter"""
@@ -124,6 +126,16 @@ def update_prompt(self, worker):
124126
if worker.task.default("USER_handle_SIGUSR1"):
125127
os.kill(os.getpid(), signal.SIGUSR1)
126128

129+
def ev_start(self, worker):
130+
"""Worker is starting."""
131+
if self._runtimer:
132+
self._runtimer.eh.start_time = time.time()
133+
134+
def ev_written(self, worker, node, sname, size):
135+
"""Bytes written on worker"""
136+
if self._runtimer:
137+
self._runtimer.eh.bytes_written += size
138+
127139
class DirectOutputHandler(OutputHandler):
128140
"""Direct output event handler class."""
129141

@@ -156,7 +168,29 @@ def ev_timeout(self, worker):
156168
def ev_close(self, worker):
157169
self.update_prompt(worker)
158170

159-
class CopyOutputHandler(DirectOutputHandler):
171+
class DirectProgressOutputHandler(DirectOutputHandler):
172+
"""Direct output event handler class with progress support."""
173+
174+
# NOTE: This class is very similar to DirectOutputHandler, thus it could
175+
# first look overkill, but merging both is slightly impacting ev_read
176+
# performance of current DirectOutputHandler.
177+
178+
def ev_read(self, worker):
179+
self._runtimer_clean()
180+
# it is ~10% faster to avoid calling super here
181+
node = worker.current_node or worker.key
182+
self._display.print_line(node, worker.current_msg)
183+
184+
def ev_error(self, worker):
185+
self._runtimer_clean()
186+
node = worker.current_node or worker.key
187+
self._display.print_line_error(node, worker.current_errmsg)
188+
189+
def ev_close(self, worker):
190+
self._runtimer_clean()
191+
DirectOutputHandler.ev_close(self, worker)
192+
193+
class CopyOutputHandler(DirectProgressOutputHandler):
160194
"""Copy output event handler."""
161195
def __init__(self, display, reverse=False):
162196
DirectOutputHandler.__init__(self, display)
@@ -308,6 +342,9 @@ def __init__(self, task, total):
308342
self.tslen = len(str(self.total))
309343
self.wholelen = 0
310344
self.started = False
345+
# updated by worker handler for progress
346+
self.start_time = 0
347+
self.bytes_written = 0
311348

312349
def ev_timer(self, timer):
313350
self.update()
@@ -318,8 +355,15 @@ def set_dirty(self):
318355
def erase_line(self):
319356
if self.wholelen:
320357
sys.stderr.write(' ' * self.wholelen + '\r')
358+
self.wholelen = 0
321359

322360
def update(self):
361+
"""Update runtime progress info"""
362+
wrbwinfo = ''
363+
if self.bytes_written > 0:
364+
bandwidth = self.bytes_written/(time.time() - self.start_time)
365+
wrbwinfo = " write: %s/s" % human_bi_bytes_unit(bandwidth)
366+
323367
gws = self.task.gateways.keys()
324368
if gws:
325369
# tree mode
@@ -331,11 +375,12 @@ def update(self):
331375
else:
332376
cnt = len(self.task._engine.clients())
333377
gwinfo = ''
334-
if cnt != self.cnt_last:
378+
if self.bytes_written > 0 or cnt != self.cnt_last:
335379
self.cnt_last = cnt
336380
# display completed/total clients
337-
towrite = 'clush: %*d/%*d%s\r' % (self.tslen, self.total - cnt,
338-
self.tslen, self.total, gwinfo)
381+
towrite = 'clush: %*d/%*d%s%s\r' % (self.tslen, self.total - cnt,
382+
self.tslen, self.total, gwinfo,
383+
wrbwinfo)
339384
self.wholelen = len(towrite)
340385
sys.stderr.write(towrite)
341386
self.started = True
@@ -344,6 +389,7 @@ def finalize(self, force_cr):
344389
"""finalize display of runtimer"""
345390
if not self.started:
346391
return
392+
self.erase_line()
347393
# display completed/total clients
348394
fmt = 'clush: %*d/%*d'
349395
if force_cr:
@@ -599,9 +645,14 @@ def run_command(task, cmd, ns, timeout, display, remote):
599645
else:
600646
handler = GatherOutputHandler(display)
601647

602-
if display.verbosity == VERB_STD or display.verbosity == VERB_VERB:
648+
if display.verbosity in (VERB_STD, VERB_VERB) or \
649+
(display.progress and display.verbosity > VERB_QUIET):
603650
handler.runtimer_init(task, len(ns))
651+
elif display.progress and display.verbosity > VERB_QUIET:
652+
handler = DirectProgressOutputHandler(display)
653+
handler.runtimer_init(task, len(ns))
604654
else:
655+
# this is the simpler but faster output handler
605656
handler = DirectOutputHandler(display)
606657

607658
worker = task.shell(cmd, nodes=ns, handler=handler, timeout=timeout,
@@ -614,9 +665,7 @@ def run_command(task, cmd, ns, timeout, display, remote):
614665
task.resume()
615666

616667
def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
617-
"""
618-
run copy command
619-
"""
668+
"""run copy command"""
620669
task.set_default("USER_running", True)
621670
task.set_default("USER_copies", len(sources))
622671

@@ -626,7 +675,7 @@ def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
626675
print Display.COLOR_RESULT_FMT % '-' * 15
627676

628677
copyhandler = CopyOutputHandler(display)
629-
if display.verbosity == VERB_STD or display.verbosity == VERB_VERB:
678+
if display.verbosity in (VERB_STD, VERB_VERB):
630679
copyhandler.runtimer_init(task, len(ns) * len(sources))
631680

632681
# Sources check
@@ -640,9 +689,7 @@ def run_copy(task, sources, dest, ns, timeout, preserve_flag, display):
640689
task.resume()
641690

642691
def run_rcopy(task, sources, dest, ns, timeout, preserve_flag, display):
643-
"""
644-
run reverse copy command
645-
"""
692+
"""run reverse copy command"""
646693
task.set_default("USER_running", True)
647694
task.set_default("USER_copies", len(sources))
648695

lib/ClusterShell/CLI/Display.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
#
3-
# Copyright CEA/DAM/DIF (2010, 2011, 2012)
4-
# Contributor: Stephane THIELL <[email protected]>
3+
# Copyright CEA/DAM/DIF (2010-2015)
4+
# Contributor: Stephane THIELL <[email protected]>
55
#
66
# This file is part of the ClusterShell library.
77
#
@@ -82,6 +82,7 @@ def __init__(self, options, config=None, color=None):
8282
self._diffref = None
8383
# diff implies at least -b
8484
self.gather = options.gatherall or options.gather or options.diff
85+
self.progress = getattr(options, 'progress', False) # only in clush
8586
# check parameter combinaison
8687
if options.diff and options.line_mode:
8788
raise ValueError("diff not supported in line_mode")

lib/ClusterShell/CLI/OptionParser.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ def install_display_options(self,
158158
optgrp.add_option("-b", "-c", "--dshbak", action="store_true",
159159
dest="gather", help="gather nodes with same output")
160160
else:
161+
optgrp.add_option("-P", "--progress", action="store_true",
162+
dest="progress", help="show progress during command execution")
161163
optgrp.add_option("-b", "--dshbak", action="store_true",
162164
dest="gather", help="gather nodes with same output")
163165
optgrp.add_option("-B", action="store_true", dest="gatherall",

lib/ClusterShell/CLI/Utils.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env python
22
#
3-
# Copyright CEA/DAM/DIF (2010)
4-
# Contributor: Stephane THIELL <[email protected]>
3+
# Copyright CEA/DAM/DIF (2010-2015)
4+
# Contributor: Stephane THIELL <[email protected]>
55
#
66
# This file is part of the ClusterShell library.
77
#
@@ -46,6 +46,24 @@
4646
"ERROR: ClusterShell node groups configuration error:\n\t%s" % exc
4747
sys.exit(1)
4848

49+
(KIBI, MEBI, GIBI, TEBI) = (1024.0, 1024.0 ** 2, 1024.0 ** 3, 1024.0 ** 4)
50+
51+
def human_bi_bytes_unit(value):
52+
"""
53+
Format numerical `value` to display it using human readable unit with
54+
binary prefix like (KiB, MiB, GiB, ...).
55+
"""
56+
if value >= TEBI:
57+
fmt = "%.2f TiB" % (value / TEBI)
58+
elif value >= GIBI:
59+
fmt = "%.2f GiB" % (value / GIBI)
60+
elif value >= MEBI:
61+
fmt = "%.2f MiB" % (value / MEBI)
62+
elif value >= KIBI:
63+
fmt = "%.2f KiB" % (value / KIBI)
64+
else:
65+
fmt = "%d B" % value
66+
return fmt
4967

5068
def nodeset_cmp(ns1, ns2):
5169
"""Compare 2 nodesets by their length (we want larger nodeset
@@ -65,4 +83,3 @@ def bufnodeset_cmp(bn1, bn2):
6583
node."""
6684
# Extract nodesets and call nodeset_cmp
6785
return nodeset_cmp(bn1[1], bn2[1])
68-

tests/CLIClushTest.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,38 @@ def test_030_config_options(self):
427427
"-O", "color=never", "-w", HOSTNAME, "echo", "ok"],
428428
None, "%s: ok\n" % HOSTNAME)
429429

430+
def test_031_progress(self):
431+
"""test clush -P/--progress"""
432+
self._clush_t(["-w", HOSTNAME, "--progress", "echo", "ok"], None,
433+
"%s: ok\n" % HOSTNAME)
434+
self._clush_t(["-w", HOSTNAME, "--progress", "sleep", "2"], None, '', 0,
435+
re.compile(r'clush: 0/1\r.*'))
436+
self._clush_t(["-w", HOSTNAME, "--progress", "sleep", "2"], 'AAAAAAAA',
437+
'', 0, re.compile(r'clush: 0/1 write: \d B/s\r.*'))
438+
self._clush_t(["-w", "%s,localhost" % HOSTNAME, "--progress",
439+
"sleep", "2"], 'AAAAAAAAAAAAAA', '', 0,
440+
re.compile(r'clush: 0/2 write: \d+ B/s\r.*'))
441+
self._clush_t(["-w", HOSTNAME, "-b", "--progress", "sleep", "2"],
442+
None, '', 0, re.compile(r'clush: 0/1\r.*'))
443+
self._clush_t(["-w", HOSTNAME, "-b", "--progress", "sleep", "2"],
444+
'AAAAAAAAAAAAAAAA', '', 0,
445+
re.compile(r'clush: 0/1 write: \d+ B/s\r.*'))
446+
# -q and --progress: explicit -q wins
447+
self._clush_t(["-w", HOSTNAME, "--progress", "-q", "sleep", "2"], None,
448+
'', 0)
449+
self._clush_t(["-w", HOSTNAME, "-b", "--progress", "-q", "sleep", "2"],
450+
None, '', 0, '')
451+
self._clush_t(["-w", HOSTNAME, "-b", "--progress", "-q", "sleep", "2"],
452+
'AAAAAAAAAAAAAAAA', '', 0, '')
453+
# cover stderr output and --progress
454+
self._clush_t(["-w", HOSTNAME, "--progress",
455+
"echo foo >&2; echo bar; sleep 2"], None,
456+
"%s: bar\n" % HOSTNAME, 0,
457+
re.compile(r'%s: foo\nclush: 0/1\r.*'
458+
% HOSTNAME))
459+
460+
461+
430462
class CLIClushTest_B_StdinFailure(unittest.TestCase):
431463
"""Unit test class for testing CLI/Clush.py and stdin failure"""
432464

0 commit comments

Comments
 (0)