Skip to content

Commit e91dbfa

Browse files
committed
use concurrent.futures.wait in EasyBlock.skip_extensions_parallel
1 parent 1c7c8b4 commit e91dbfa

File tree

4 files changed

+29
-31
lines changed

4 files changed

+29
-31
lines changed

easybuild/framework/easyblock.py

Lines changed: 21 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
* Davide Vanzo (Vanderbilt University)
4242
* Caspar van Leeuwen (SURF)
4343
"""
44-
4544
import copy
4645
import glob
4746
import inspect
@@ -52,7 +51,7 @@
5251
import tempfile
5352
import time
5453
import traceback
55-
from concurrent.futures import ThreadPoolExecutor
54+
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
5655
from datetime import datetime
5756

5857
import easybuild.tools.environment as env
@@ -1821,42 +1820,41 @@ def skip_extensions_parallel(self, exts_filter):
18211820

18221821
thread_pool = ThreadPoolExecutor(max_workers=self.cfg['parallel'])
18231822

1824-
async_shell_cmd_tasks = {}
1825-
running_checks_ids = []
1823+
async_shell_cmd_tasks = []
18261824
installed_exts_ids = []
18271825
exts_queue = list(enumerate(self.ext_instances[:]))
18281826
checked_exts_cnt = 0
18291827
exts_cnt = len(self.ext_instances)
1828+
done_tasks = []
18301829

18311830
# asynchronously run checks to see whether extensions are already installed
1832-
while exts_queue or running_checks_ids:
1831+
while exts_queue or async_shell_cmd_tasks:
18331832

18341833
# first handle completed checks
1835-
for idx in running_checks_ids[:]:
1834+
for task in done_tasks:
1835+
async_shell_cmd_tasks.remove(task)
1836+
res = task.result()
1837+
idx = res.task_id
18361838
ext_name = self.ext_instances[idx].name
1837-
# check whether command completed
1838-
task = async_shell_cmd_tasks[idx]
1839-
if task.done():
1840-
res = task.result()
1841-
self.log.info(f"exts_filter result for {ext_name}: exit code {res.exit_code}; output: {res.output}")
1842-
running_checks_ids.remove(idx)
1843-
if res.exit_code == 0:
1844-
print_msg(f"skipping extension {ext_name}", log=self.log)
1845-
installed_exts_ids.append(idx)
1846-
1847-
checked_exts_cnt += 1
1848-
exts_pbar_label = "skipping installed extensions "
1849-
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
1850-
self.update_exts_progress_bar(exts_pbar_label)
1839+
self.log.info(f"exts_filter result for {ext_name}: exit code {res.exit_code}; output: {res.output}")
1840+
if res.exit_code == 0:
1841+
print_msg(f"skipping extension {ext_name}", log=self.log)
1842+
installed_exts_ids.append(idx)
1843+
1844+
checked_exts_cnt += 1
1845+
exts_pbar_label = "skipping installed extensions "
1846+
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
1847+
self.update_exts_progress_bar(exts_pbar_label)
18511848

18521849
# start additional checks asynchronously
18531850
while exts_queue:
18541851
idx, ext = exts_queue.pop(0)
18551852
cmd, stdin = resolve_exts_filter_template(exts_filter, ext)
18561853
task = thread_pool.submit(run_shell_cmd, cmd, stdin=stdin, hidden=True,
1857-
fail_on_error=False, asynchronous=True)
1858-
async_shell_cmd_tasks[idx] = task
1859-
running_checks_ids.append(idx)
1854+
fail_on_error=False, asynchronous=True, task_id=idx)
1855+
async_shell_cmd_tasks.append(task)
1856+
1857+
(done_tasks, _) = wait(async_shell_cmd_tasks, timeout=1, return_when=FIRST_COMPLETED)
18601858

18611859
# compose new list of extensions, skip over the ones that are already installed;
18621860
# note: original order in extensions list should be preserved!

easybuild/tools/run.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@
8080

8181

8282
RunShellCmdResult = namedtuple('RunShellCmdResult', ('cmd', 'exit_code', 'output', 'stderr', 'work_dir',
83-
'out_file', 'err_file', 'thread_id'))
83+
'out_file', 'err_file', 'thread_id', 'task_id'))
8484

8585

8686
class RunShellCmdError(BaseException):
@@ -184,7 +184,7 @@ def cache_aware_func(cmd, *args, **kwargs):
184184
@run_shell_cmd_cache
185185
def run_shell_cmd(cmd, fail_on_error=True, split_stderr=False, stdin=None, env=None,
186186
hidden=False, in_dry_run=False, verbose_dry_run=False, work_dir=None, use_bash=True,
187-
output_file=True, stream_output=None, asynchronous=False, with_hooks=True,
187+
output_file=True, stream_output=None, asynchronous=False, task_id=None, with_hooks=True,
188188
qa_patterns=None, qa_wait_patterns=None):
189189
"""
190190
Run specified (interactive) shell command, and capture output + exit code.
@@ -267,7 +267,7 @@ def to_cmd_str(cmd):
267267
dry_run_msg(msg, silent=silent)
268268

269269
return RunShellCmdResult(cmd=cmd_str, exit_code=0, output='', stderr=None, work_dir=work_dir,
270-
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id)
270+
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)
271271

272272
start_time = datetime.now()
273273
if not hidden:
@@ -344,7 +344,7 @@ def to_cmd_str(cmd):
344344
raise EasyBuildError(f"Failed to dump command output to temporary file: {err}")
345345

346346
res = RunShellCmdResult(cmd=cmd_str, exit_code=proc.returncode, output=output, stderr=stderr, work_dir=work_dir,
347-
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id)
347+
out_file=cmd_out_fp, err_file=cmd_err_fp, thread_id=thread_id, task_id=task_id)
348348

349349
# always log command output
350350
cmd_name = cmd_str.split(' ')[0]

test/framework/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ def test_run_shell_cmd_cache(self):
911911
with self.mocked_stdout_stderr():
912912
cached_res = RunShellCmdResult(cmd=cmd, output="123456", exit_code=123, stderr=None,
913913
work_dir='/test_ulimit', out_file='/tmp/foo.out', err_file=None,
914-
thread_id=None)
914+
thread_id=None, task_id=None)
915915
run_shell_cmd.update_cache({(cmd, None): cached_res})
916916
res = run_shell_cmd(cmd)
917917
self.assertEqual(res.cmd, cmd)
@@ -931,7 +931,7 @@ def test_run_shell_cmd_cache(self):
931931
with self.mocked_stdout_stderr():
932932
cached_res = RunShellCmdResult(cmd=cmd, output="bar", exit_code=123, stderr=None,
933933
work_dir='/test_cat', out_file='/tmp/cat.out', err_file=None,
934-
thread_id=None)
934+
thread_id=None, task_id=None)
935935
run_shell_cmd.update_cache({(cmd, 'foo'): cached_res})
936936
res = run_shell_cmd(cmd, stdin='foo')
937937
self.assertEqual(res.cmd, cmd)

test/framework/systemtools.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ def mocked_run_shell_cmd(cmd, **kwargs):
341341
}
342342
if cmd in known_cmds:
343343
return RunShellCmdResult(cmd=cmd, exit_code=0, output=known_cmds[cmd], stderr=None, work_dir=os.getcwd(),
344-
out_file=None, err_file=None, thread_id=None)
344+
out_file=None, err_file=None, thread_id=None, task_id=None)
345345
else:
346346
return run_shell_cmd(cmd, **kwargs)
347347

@@ -774,7 +774,7 @@ def test_gcc_version_darwin(self):
774774
out = "Apple LLVM version 7.0.0 (clang-700.1.76)"
775775
cwd = os.getcwd()
776776
mocked_run_res = RunShellCmdResult(cmd="gcc --version", exit_code=0, output=out, stderr=None, work_dir=cwd,
777-
out_file=None, err_file=None, thread_id=None)
777+
out_file=None, err_file=None, thread_id=None, task_id=None)
778778
st.run_shell_cmd = lambda *args, **kwargs: mocked_run_res
779779
self.assertEqual(get_gcc_version(), None)
780780

0 commit comments

Comments
 (0)