Skip to content

Commit ca3828b

Browse files
committed
refactor EasyBlock.skip_extensions_parallel
1 parent 36febd8 commit ca3828b

File tree

1 file changed

+10
-23
lines changed

1 file changed

+10
-23
lines changed

easybuild/framework/easyblock.py

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
* Davide Vanzo (Vanderbilt University)
4242
* Caspar van Leeuwen (SURF)
4343
"""
44+
import concurrent
4445
import copy
4546
import glob
4647
import inspect
@@ -1818,22 +1819,20 @@ def skip_extensions_parallel(self, exts_filter):
18181819
self.log.experimental("Skipping installed extensions in parallel")
18191820
print_msg("skipping installed extensions (in parallel)", log=self.log)
18201821

1821-
thread_pool = ThreadPoolExecutor(max_workers=self.cfg['parallel'])
1822-
1823-
async_shell_cmd_tasks = []
18241822
installed_exts_ids = []
1825-
exts_queue = list(enumerate(self.ext_instances[:]))
18261823
checked_exts_cnt = 0
18271824
exts_cnt = len(self.ext_instances)
1828-
done_tasks = []
1825+
cmds = [resolve_exts_filter_template(exts_filter, ext) for ext in self.ext_instances]
18291826

1830-
# asynchronously run checks to see whether extensions are already installed
1831-
while exts_queue or async_shell_cmd_tasks:
1827+
with ThreadPoolExecutor(max_workers=self.cfg['parallel']) as thread_pool:
18321828

1833-
# first handle completed checks
1834-
for task in done_tasks:
1835-
async_shell_cmd_tasks.remove(task)
1836-
res = task.result()
1829+
# list of command to run asynchronously
1830+
async_cmds = [thread_pool.submit(run_shell_cmd, cmd, stdin=stdin, hidden=True, fail_on_error=False,
1831+
asynchronous=True, task_id=idx) for (idx, (cmd, stdin)) in enumerate(cmds)]
1832+
1833+
# process result of commands as they have completed running
1834+
for done_task in concurrent.futures.as_completed(async_cmds):
1835+
res = done_task.result()
18371836
idx = res.task_id
18381837
ext_name = self.ext_instances[idx].name
18391838
self.log.info(f"exts_filter result for {ext_name}: exit code {res.exit_code}; output: {res.output}")
@@ -1846,16 +1845,6 @@ def skip_extensions_parallel(self, exts_filter):
18461845
exts_pbar_label += "(%d/%d checked)" % (checked_exts_cnt, exts_cnt)
18471846
self.update_exts_progress_bar(exts_pbar_label)
18481847

1849-
# start additional checks asynchronously
1850-
while exts_queue:
1851-
idx, ext = exts_queue.pop(0)
1852-
cmd, stdin = resolve_exts_filter_template(exts_filter, ext)
1853-
task = thread_pool.submit(run_shell_cmd, cmd, stdin=stdin, hidden=True,
1854-
fail_on_error=False, asynchronous=True, task_id=idx)
1855-
async_shell_cmd_tasks.append(task)
1856-
1857-
(done_tasks, _) = wait(async_shell_cmd_tasks, timeout=1, return_when=FIRST_COMPLETED)
1858-
18591848
# compose new list of extensions, skip over the ones that are already installed;
18601849
# note: original order in extensions list should be preserved!
18611850
retained_ext_instances = []
@@ -1866,8 +1855,6 @@ def skip_extensions_parallel(self, exts_filter):
18661855

18671856
self.ext_instances = retained_ext_instances
18681857

1869-
thread_pool.shutdown()
1870-
18711858
def install_extensions(self, install=True):
18721859
"""
18731860
Install extensions.

0 commit comments

Comments
 (0)