Skip to content

Commit c14a130

Browse files
authored
Merge pull request #4444 from boegel/run_shell_cmd_async
add support for running shell commands asynchronously with `run_shell_cmd`
2 parents 360b053 + f7c0ff2 commit c14a130

File tree

9 files changed

+199
-154
lines changed

9 files changed

+199
-154
lines changed

easybuild/framework/easyblock.py

Lines changed: 43 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
* Davide Vanzo (Vanderbilt University)
4242
* Caspar van Leeuwen (SURF)
4343
"""
44-
44+
import concurrent
4545
import copy
4646
import glob
4747
import inspect
@@ -52,6 +52,7 @@
5252
import tempfile
5353
import time
5454
import traceback
55+
from concurrent.futures import ThreadPoolExecutor
5556
from datetime import datetime
5657

5758
import easybuild.tools.environment as env
@@ -87,7 +88,7 @@
8788
from easybuild.tools.hooks import MODULE_STEP, MODULE_WRITE, PACKAGE_STEP, PATCH_STEP, PERMISSIONS_STEP, POSTITER_STEP
8889
from easybuild.tools.hooks import POSTPROC_STEP, PREPARE_STEP, READY_STEP, SANITYCHECK_STEP, SOURCE_STEP
8990
from easybuild.tools.hooks import SINGLE_EXTENSION, TEST_STEP, TESTCASES_STEP, load_hooks, run_hook
90-
from easybuild.tools.run import RunShellCmdError, check_async_cmd, run_cmd, run_shell_cmd
91+
from easybuild.tools.run import RunShellCmdError, raise_run_shell_cmd_error, run_shell_cmd
9192
from easybuild.tools.jenkins import write_to_xml
9293
from easybuild.tools.module_generator import ModuleGeneratorLua, ModuleGeneratorTcl, module_generator, dependencies_for
9394
from easybuild.tools.module_naming_scheme.utilities import det_full_ec_version
@@ -1818,41 +1819,31 @@ def skip_extensions_parallel(self, exts_filter):
18181819
self.log.experimental("Skipping installed extensions in parallel")
18191820
print_msg("skipping installed extensions (in parallel)", log=self.log)
18201821

1821-
async_cmd_info_cache = {}
1822-
running_checks_ids = []
18231822
installed_exts_ids = []
1824-
exts_queue = list(enumerate(self.ext_instances[:]))
18251823
checked_exts_cnt = 0
18261824
exts_cnt = len(self.ext_instances)
1825+
cmds = [resolve_exts_filter_template(exts_filter, ext) for ext in self.ext_instances]
1826+
1827+
with ThreadPoolExecutor(max_workers=self.cfg['parallel']) as thread_pool:
18271828

1828-
# asynchronously run checks to see whether extensions are already installed
1829-
while exts_queue or running_checks_ids:
1829+
# list of command to run asynchronously
1830+
async_cmds = [thread_pool.submit(run_shell_cmd, cmd, stdin=stdin, hidden=True, fail_on_error=False,
1831+
asynchronous=True, task_id=idx) for (idx, (cmd, stdin)) in enumerate(cmds)]
18301832

1831-
# first handle completed checks
1832-
for idx in running_checks_ids[:]:
1833+
# process result of commands as they have completed running
1834+
for done_task in concurrent.futures.as_completed(async_cmds):
1835+
res = done_task.result()
1836+
idx = res.task_id
18331837
ext_name = self.ext_instances[idx].name
1834-
# don't read any output, just check whether command completed
1835-
async_cmd_info = check_async_cmd(*async_cmd_info_cache[idx], output_read_size=0, fail_on_error=False)
1836-
if async_cmd_info['done']:
1837-
out, ec = async_cmd_info['output'], async_cmd_info['exit_code']
1838-
self.log.info("exts_filter result for %s: exit code %s; output: %s", ext_name, ec, out)
1839-
running_checks_ids.remove(idx)
1840-
if ec == 0:
1841-
print_msg("skipping extension %s" % ext_name, log=self.log)
1842-
installed_exts_ids.append(idx)
1843-
1844-
checked_exts_cnt += 1
1845-
exts_pbar_label = "skipping installed extensions "
1846-
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
1847-
self.update_exts_progress_bar(exts_pbar_label)
1848-
1849-
# start additional checks asynchronously
1850-
while exts_queue and len(running_checks_ids) < self.cfg['parallel']:
1851-
idx, ext = exts_queue.pop(0)
1852-
cmd, stdin = resolve_exts_filter_template(exts_filter, ext)
1853-
async_cmd_info_cache[idx] = run_cmd(cmd, log_all=False, log_ok=False, simple=False, inp=stdin,
1854-
regexp=False, trace=False, asynchronous=True)
1855-
running_checks_ids.append(idx)
1838+
self.log.info(f"exts_filter result for {ext_name}: exit code {res.exit_code}; output: {res.output}")
1839+
if res.exit_code == 0:
1840+
print_msg(f"skipping extension {ext_name}", log=self.log)
1841+
installed_exts_ids.append(idx)
1842+
1843+
checked_exts_cnt += 1
1844+
exts_pbar_label = "skipping installed extensions "
1845+
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
1846+
self.update_exts_progress_bar(exts_pbar_label)
18561847

18571848
# compose new list of extensions, skip over the ones that are already installed;
18581849
# note: original order in extensions list should be preserved!
@@ -1957,6 +1948,8 @@ def install_extensions_parallel(self, install=True):
19571948
"""
19581949
self.log.info("Installing extensions in parallel...")
19591950

1951+
thread_pool = ThreadPoolExecutor(max_workers=self.cfg['parallel'])
1952+
19601953
running_exts = []
19611954
installed_ext_names = []
19621955

@@ -1993,16 +1986,23 @@ def update_exts_progress_bar_helper(running_exts, progress_size):
19931986

19941987
# check for extension installations that have completed
19951988
if running_exts:
1996-
self.log.info("Checking for completed extension installations (%d running)...", len(running_exts))
1989+
self.log.info(f"Checking for completed extension installations ({len(running_exts)} running)...")
19971990
for ext in running_exts[:]:
1998-
if self.dry_run or ext.async_cmd_check():
1999-
self.log.info("Installation of %s completed!", ext.name)
2000-
ext.postrun()
2001-
running_exts.remove(ext)
2002-
installed_ext_names.append(ext.name)
2003-
update_exts_progress_bar_helper(running_exts, 1)
1991+
if self.dry_run or ext.async_cmd_task.done():
1992+
res = ext.async_cmd_task.result()
1993+
if res.exit_code == 0:
1994+
self.log.info(f"Installation of extension {ext.name} completed!")
1995+
# run post-install method for extension from same working dir as installation of extension
1996+
cwd = change_dir(res.work_dir)
1997+
ext.postrun()
1998+
change_dir(cwd)
1999+
running_exts.remove(ext)
2000+
installed_ext_names.append(ext.name)
2001+
update_exts_progress_bar_helper(running_exts, 1)
2002+
else:
2003+
raise_run_shell_cmd_error(res)
20042004
else:
2005-
self.log.debug("Installation of %s is still running...", ext.name)
2005+
self.log.debug(f"Installation of extension {ext.name} is still running...")
20062006

20072007
# try to start as many extension installations as we can, taking into account number of available cores,
20082008
# but only consider first 100 extensions still in the queue
@@ -2069,9 +2069,9 @@ def update_exts_progress_bar_helper(running_exts, progress_size):
20692069
rpath_filter_dirs=self.rpath_filter_dirs)
20702070
if install:
20712071
ext.prerun()
2072-
ext.run_async()
2072+
ext.async_cmd_task = ext.run_async(thread_pool)
20732073
running_exts.append(ext)
2074-
self.log.info("Started installation of extension %s in the background...", ext.name)
2074+
self.log.info(f"Started installation of extension {ext.name} in the background...")
20752075
update_exts_progress_bar_helper(running_exts, 0)
20762076

20772077
# print progress info after every iteration (unless that info is already shown via progress bar)
@@ -2084,6 +2084,8 @@ def update_exts_progress_bar_helper(running_exts, progress_size):
20842084
running_ext_names = ', '.join(x.name for x in running_exts[:3]) + ", ..."
20852085
print_msg(msg % (installed_cnt, exts_cnt, queued_cnt, running_cnt, running_ext_names), log=self.log)
20862086

2087+
thread_pool.shutdown()
2088+
20872089
#
20882090
# MISCELLANEOUS UTILITY FUNCTIONS
20892091
#

easybuild/framework/extension.py

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from easybuild.framework.easyconfig.templates import TEMPLATE_NAMES_EASYBLOCK_RUN_STEP, template_constant_dict
4343
from easybuild.tools.build_log import EasyBuildError, raise_nosupport
4444
from easybuild.tools.filetools import change_dir
45-
from easybuild.tools.run import check_async_cmd, run_cmd, run_shell_cmd
45+
from easybuild.tools.run import run_shell_cmd
4646

4747

4848
def resolve_exts_filter_template(exts_filter, ext):
@@ -150,12 +150,7 @@ def __init__(self, mself, ext, extra_params=None):
150150
self.sanity_check_module_loaded = False
151151
self.fake_mod_data = None
152152

153-
self.async_cmd_info = None
154-
self.async_cmd_output = None
155-
self.async_cmd_check_cnt = None
156-
# initial read size should be relatively small,
157-
# to avoid hanging for a long time until desired output is available in async_cmd_check
158-
self.async_cmd_read_size = 1024
153+
self.async_cmd_task = None
159154

160155
@property
161156
def name(self):
@@ -195,44 +190,6 @@ def postrun(self):
195190
"""
196191
self.master.run_post_install_commands(commands=self.cfg.get('postinstallcmds', []))
197192

198-
def async_cmd_start(self, cmd, inp=None):
199-
"""
200-
Start installation asynchronously using specified command.
201-
"""
202-
self.async_cmd_output = ''
203-
self.async_cmd_check_cnt = 0
204-
self.async_cmd_info = run_cmd(cmd, log_all=True, simple=False, inp=inp, regexp=False, asynchronous=True)
205-
206-
def async_cmd_check(self):
207-
"""
208-
Check progress of installation command that was started asynchronously.
209-
210-
:return: True if command completed, False otherwise
211-
"""
212-
if self.async_cmd_info is None:
213-
raise EasyBuildError("No installation command running asynchronously for %s", self.name)
214-
elif self.async_cmd_info is False:
215-
self.log.info("No asynchronous command was started for extension %s", self.name)
216-
return True
217-
else:
218-
self.log.debug("Checking on installation of extension %s...", self.name)
219-
# use small read size, to avoid waiting for a long time until sufficient output is produced
220-
res = check_async_cmd(*self.async_cmd_info, output_read_size=self.async_cmd_read_size)
221-
self.async_cmd_output += res['output']
222-
if res['done']:
223-
self.log.info("Installation of extension %s completed!", self.name)
224-
self.async_cmd_info = None
225-
else:
226-
self.async_cmd_check_cnt += 1
227-
self.log.debug("Installation of extension %s still running (checked %d times)",
228-
self.name, self.async_cmd_check_cnt)
229-
# increase read size after sufficient checks,
230-
# to avoid that installation hangs due to output buffer filling up...
231-
if self.async_cmd_check_cnt % 10 == 0 and self.async_cmd_read_size < (1024 ** 2):
232-
self.async_cmd_read_size *= 2
233-
234-
return res['done']
235-
236193
@property
237194
def required_deps(self):
238195
"""Return list of required dependencies for this extension."""
@@ -273,7 +230,6 @@ def sanity_check_step(self):
273230
self.log.info("modulename set to False for '%s' extension, so skipping sanity check", self.name)
274231
elif exts_filter:
275232
cmd, stdin = resolve_exts_filter_template(exts_filter, self)
276-
# set log_ok to False so we can catch the error instead of run_cmd
277233
cmd_res = run_shell_cmd(cmd, fail_on_error=False, stdin=stdin)
278234

279235
if cmd_res.exit_code:

easybuild/tools/run.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@
4949
from collections import namedtuple
5050
from datetime import datetime
5151

52+
try:
53+
# get_native_id is only available in Python >= 3.8
54+
from threading import get_native_id as get_thread_id
55+
except ImportError:
56+
# get_ident is available in Python >= 3.3
57+
from threading import get_ident as get_thread_id
58+
5259
import easybuild.tools.asyncprocess as asyncprocess
5360
from easybuild.base import fancylogger
5461
from easybuild.tools.build_log import EasyBuildError, dry_run_msg, print_msg, time_str_since
@@ -79,7 +86,7 @@
7986

8087

8188
RunShellCmdResult = namedtuple('RunShellCmdResult', ('cmd', 'exit_code', 'output', 'stderr', 'work_dir',
82-
'out_file', 'err_file'))
89+
'out_file', 'err_file', 'thread_id', 'task_id'))
8390

8491

8592
class RunShellCmdError(BaseException):
@@ -183,7 +190,7 @@ def cache_aware_func(cmd, *args, **kwargs):
183190
@run_shell_cmd_cache
184191
def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None,
185192
hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True,
186-
output_file=True, stream_output=None, asynchronous=False, with_hooks=True,
193+
output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True,
187194
qa_patterns=None, qa_wait_patterns=None):
188195
"""
189196
Run specified (interactive) shell command, and capture output + exit code.
@@ -199,7 +206,8 @@ def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=N
199206
:param use_bash: execute command through bash shell (enabled by default)
200207
:param output_file: collect command output in temporary output file
201208
:param stream_output: stream command output to stdout (auto-enabled with --logtostdout if None)
202-
:param asynchronous: run command asynchronously
209+
:param asynchronous: indicate that command is being run asynchronously
210+
:param task_id: task ID for specified shell command (included in return value)
203211
:param with_hooks: trigger pre/post run_shell_cmd hooks (if defined)
204212
:param qa_patterns: list of 2-tuples with patterns for questions + corresponding answers
205213
:param qa_wait_patterns: list of 2-tuples with patterns for non-questions
@@ -223,9 +231,6 @@ def to_cmd_str(cmd):
223231
return cmd_str
224232

225233
# temporarily raise a NotImplementedError until all options are implemented
226-
if asynchronous:
227-
raise NotImplementedError
228-
229234
if qa_patterns or qa_wait_patterns:
230235
raise NotImplementedError
231236

@@ -235,6 +240,11 @@ def to_cmd_str(cmd):
235240
cmd_str = to_cmd_str(cmd)
236241
cmd_name = os.path.basename(cmd_str.split(' ')[0])
237242

243+
thread_id = None
244+
if asynchronous:
245+
thread_id = get_thread_id()
246+
_log.info(f"Initiating running of shell command '{cmd_str}' via thread with ID {thread_id}")
247+
238248
# auto-enable streaming of command output under --logtostdout/-l, unless it was disabled explicitely
239249
if stream_output is None and build_option('logtostdout'):
240250
_log.info(f"Auto-enabling streaming output of '{cmd_str}' command because logging to stdout is enabled")
@@ -259,16 +269,16 @@ def to_cmd_str(cmd):
259269
if not in_dry_run and build_option('extended_dry_run'):
260270
if not hidden or verbose_dry_run:
261271
silent = build_option('silent')
262-
msg = f" running command \"{cmd_str}\"\n"
272+
msg = f" running shell command \"{cmd_str}\"\n"
263273
msg += f" (in {work_dir})"
264274
dry_run_msg(msg, silent=silent)
265275

266276
return RunShellCmdResult(cmd=cmd_str, exit_code=0, output='', stderr=None, work_dir=work_dir,
267-
out_file=cmd_out_fp, err_file=cmd_err_fp)
277+
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)
268278

269279
start_time = datetime.now()
270280
if not hidden:
271-
cmd_trace_msg(cmd_str, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp)
281+
_cmd_trace_msg(cmd_str, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp, thread_id)
272282

273283
if stream_output:
274284
print_msg(f"(streaming) output for command '{cmd_str}':")
@@ -293,7 +303,11 @@ def to_cmd_str(cmd):
293303

294304
stderr = subprocess.PIPE if split_stderr else subprocess.STDOUT
295305

296-
_log.info(f"Running command '{cmd_str}' in {work_dir}")
306+
log_msg = f"Running shell command '{cmd_str}' in {work_dir}"
307+
if thread_id:
308+
log_msg += f" (via thread with ID {thread_id})"
309+
_log.info(log_msg)
310+
297311
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=stderr, stdin=subprocess.PIPE,
298312
cwd=work_dir, env=env, shell=shell, executable=executable)
299313

@@ -337,7 +351,7 @@ def to_cmd_str(cmd):
337351
raise EasyBuildError(f"Failed to dump command output to temporary file: {err}")
338352

339353
res = RunShellCmdResult(cmd=cmd_str, exit_code=proc.returncode, output=output, stderr=stderr, work_dir=work_dir,
340-
out_file=cmd_out_fp, err_file=cmd_err_fp)
354+
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)
341355

342356
# always log command output
343357
cmd_name = cmd_str.split(' ')[0]
@@ -370,7 +384,7 @@ def to_cmd_str(cmd):
370384
return res
371385

372386

373-
def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
387+
def _cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp, thread_id):
374388
"""
375389
Helper function to construct and print trace message for command being run
376390
@@ -380,11 +394,18 @@ def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
380394
:param stdin: stdin input value for command
381395
:param cmd_out_fp: path to output file for command
382396
:param cmd_err_fp: path to errors/warnings output file for command
397+
:param thread_id: thread ID (None when not running shell command asynchronously)
383398
"""
384399
start_time = start_time.strftime('%Y-%m-%d %H:%M:%S')
385400

401+
if thread_id:
402+
run_cmd_msg = f"running shell command (asynchronously, thread ID: {thread_id}):"
403+
else:
404+
run_cmd_msg = "running shell command:"
405+
386406
lines = [
387-
"running command:",
407+
run_cmd_msg,
408+
f"\t{cmd}",
388409
f"\t[started at: {start_time}]",
389410
f"\t[working dir: {work_dir}]",
390411
]
@@ -395,8 +416,6 @@ def cmd_trace_msg(cmd, start_time, work_dir, stdin, cmd_out_fp, cmd_err_fp):
395416
if cmd_err_fp:
396417
lines.append(f"\t[errors/warnings saved to {cmd_err_fp}]")
397418

398-
lines.append('\t' + cmd)
399-
400419
trace_msg('\n'.join(lines))
401420

402421

0 commit comments

Comments
 (0)