Skip to content

Commit e8e9054

Browse files
authored
RF/ENH: Rework workflow generation (#219)
* WIP: Clean up excessively complicated build workflow code * RF: Clean up workflow build - Builds workflow / boilerplate within the same process - Less warnings by default - Set OMP_NUM_THREADS to 1 to reduce VMS load * FIX: Import * FIX: Remove forgotten variables * ENH: Add analysis level attribute to workflow * FIX: Track cwd on entrypoint * FIX: Ensure config is properly initialized * FIX: Suppress warnings on releases
1 parent 8e77801 commit e8e9054

File tree

4 files changed

+212
-210
lines changed

4 files changed

+212
-210
lines changed

nibabies/cli/parser.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,8 +796,13 @@ def parse_args(args=None, namespace=None):
796796
if missing_subjects:
797797
parser.error(
798798
"One or more participant labels were not found in the BIDS directory: "
799-
"%s." % ", ".join(missing_subjects)
799+
f"{', '.join(missing_subjects)}."
800800
)
801801

802802
config.execution.participant_label = sorted(participant_label)
803803
config.workflow.skull_strip_template = config.workflow.skull_strip_template[0]
804+
805+
# finally, write config to file
806+
config_file = config.execution.work_dir / config.execution.run_uuid / "config.toml"
807+
config_file.parent.mkdir(exist_ok=True, parents=True)
808+
config.to_filename(config_file)

nibabies/cli/run.py

Lines changed: 117 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -6,164 +6,136 @@
66

77
def main():
88
"""Entry point."""
9+
import atexit
910
import gc
11+
import os
1012
import sys
11-
from multiprocessing import Manager, Process
12-
from os import EX_SOFTWARE
1313
from pathlib import Path
1414

1515
from ..utils.bids import write_bidsignore, write_derivative_description
1616
from .parser import parse_args
17+
from .workflow import build_boilerplate, build_workflow
18+
19+
_cwd = os.getcwd()
20+
# Revert OMP_NUM_THREADS + other runtime set environment variables
21+
atexit.register(config.restore_env)
1722

1823
parse_args()
1924

20-
# sentry_sdk = None
21-
# if not config.execution.notrack:
22-
# import sentry_sdk
23-
# from ..utils.sentry import sentry_setup
24-
25-
# sentry_setup()
26-
27-
# CRITICAL Save the config to a file. This is necessary because the execution graph
28-
# is built as a separate process to keep the memory footprint low. The most
29-
# straightforward way to communicate with the child process is via the filesystem.
30-
config_file = config.execution.work_dir / config.execution.run_uuid / "config.toml"
31-
config_file.parent.mkdir(exist_ok=True, parents=True)
32-
config.to_filename(config_file)
33-
34-
# CRITICAL Call build_workflow(config_file, retval) in a subprocess.
35-
# Because Python on Linux does not ever free virtual memory (VM), running the
36-
# workflow construction jailed within a process preempts excessive VM buildup.
37-
with Manager() as mgr:
38-
from .workflow import build_workflow
39-
40-
retval = mgr.dict()
41-
p = Process(target=build_workflow, args=(str(config_file), retval))
42-
p.start()
43-
p.join()
44-
45-
retcode = p.exitcode or retval.get("return_code", 0)
46-
nibabies_wf = retval.get("workflow", None)
47-
48-
# CRITICAL Load the config from the file. This is necessary because the ``build_workflow``
49-
# function executed constrained in a process may change the config (and thus the global
50-
# state of NiBabies).
51-
config.load(config_file)
52-
53-
if config.execution.reports_only:
54-
sys.exit(int(retcode > 0))
55-
56-
if nibabies_wf and config.execution.write_graph:
57-
nibabies_wf.write_graph(graph2use="colored", format="svg", simple_form=True)
58-
59-
retcode = retcode or (nibabies_wf is None) * EX_SOFTWARE
60-
if retcode != 0:
61-
sys.exit(retcode)
62-
63-
# Generate boilerplate
64-
with Manager() as mgr:
65-
from .workflow import build_boilerplate
66-
67-
p = Process(target=build_boilerplate, args=(str(config_file), nibabies_wf))
68-
p.start()
69-
p.join()
70-
71-
if config.execution.boilerplate_only:
72-
sys.exit(int(retcode > 0))
73-
74-
# Clean up master process before running workflow, which may create forks
75-
gc.collect()
76-
77-
# Sentry tracking
78-
# if sentry_sdk is not None:
79-
# with sentry_sdk.configure_scope() as scope:
80-
# scope.set_tag("run_uuid", config.execution.run_uuid)
81-
# scope.set_tag("npart", len(config.execution.participant_label))
82-
# sentry_sdk.add_breadcrumb(message="nibabies started", level="info")
83-
# sentry_sdk.capture_message("nibabies started", level="info")
84-
85-
config.loggers.workflow.log(
86-
15,
87-
"\n".join(["nibabies config:"] + ["\t\t%s" % s for s in config.dumps().splitlines()]),
88-
)
89-
config.loggers.workflow.log(25, "nibabies started!")
90-
# errno = 1 # Default is error exit unless otherwise set
91-
try:
92-
nibabies_wf.run(**config.nipype.get_plugin())
93-
except Exception as e:
94-
# if not config.execution.notrack:
95-
# from ..utils.sentry import process_crashfile
96-
97-
# crashfolders = [
98-
# config.execution.nibabies_dir,
99-
# / "sub-{}".format(s)
100-
# / "log"
101-
# / config.execution.run_uuid
102-
# for s in config.execution.participant_label
103-
# ]
104-
# for crashfolder in crashfolders:
105-
# for crashfile in crashfolder.glob("crash*.*"):
106-
# process_crashfile(crashfile)
107-
108-
# if "Workflow did not execute cleanly" not in str(e):
109-
# sentry_sdk.capture_exception(e)
110-
config.loggers.workflow.critical("nibabies failed: %s", e)
111-
raise
112-
else:
113-
config.loggers.workflow.log(25, "nibabies finished successfully!")
114-
# if not config.execution.notrack:
115-
# success_message = "nibabies finished without errors"
116-
# sentry_sdk.add_breadcrumb(message=success_message, level="info")
117-
# sentry_sdk.capture_message(success_message, level="info")
118-
119-
# Bother users with the boilerplate only iff the workflow went okay.
120-
boiler_file = config.execution.nibabies_dir / "logs" / "CITATION.md"
121-
if boiler_file.exists():
122-
if config.environment.exec_env in (
123-
"singularity",
124-
"docker",
125-
"nibabies-docker",
126-
):
127-
boiler_file = Path("<OUTPUT_PATH>") / boiler_file.relative_to(
128-
config.execution.output_dir
129-
)
130-
config.loggers.workflow.log(
131-
25,
132-
"Works derived from this nibabies execution should include the "
133-
f"boilerplate text found in {boiler_file}.",
25+
if "participant" in config.workflow.analysis_level:
26+
_pool = None
27+
if config.nipype.plugin == "MultiProc":
28+
import multiprocessing as mp
29+
from concurrent.futures import ProcessPoolExecutor
30+
from contextlib import suppress
31+
32+
# should drastically reduce VMS
33+
# see https://github.com/nipreps/mriqc/pull/984 for more details
34+
os.environ["OMP_NUM_THREADS"] = "1"
35+
36+
with suppress(RuntimeError):
37+
mp.set_start_method("fork")
38+
gc.collect()
39+
40+
_pool = ProcessPoolExecutor(
41+
max_workers=config.nipype.nprocs,
42+
initializer=config._process_initializer,
43+
initargs=(_cwd, config.nipype.omp_nthreads),
13444
)
13545

136-
if config.workflow.run_reconall:
137-
from niworkflows.utils.misc import _copy_any
138-
from templateflow import api
46+
config_file = config.execution.work_dir / config.execution.run_uuid / "config.toml"
47+
config_file.parent.mkdir(exist_ok=True, parents=True)
48+
config.to_filename(config_file)
13949

140-
dseg_tsv = str(api.get("fsaverage", suffix="dseg", extension=[".tsv"]))
141-
_copy_any(dseg_tsv, str(config.execution.nibabies_dir / "desc-aseg_dseg.tsv"))
142-
_copy_any(dseg_tsv, str(config.execution.nibabies_dir / "desc-aparcaseg_dseg.tsv"))
143-
# errno = 0
144-
finally:
145-
from pkg_resources import resource_filename as pkgrf
146-
147-
from ..reports.core import generate_reports
148-
149-
# Generate reports phase
150-
generate_reports(
151-
config.execution.participant_label,
152-
config.execution.session_id,
153-
config.execution.nibabies_dir,
154-
config.execution.run_uuid,
155-
config=pkgrf("nibabies", "data/reports-spec.yml"),
156-
packagename="nibabies",
50+
# build the workflow within the same process
51+
# it still needs to be saved / loaded to be properly initialized
52+
retval = build_workflow(config_file)
53+
retcode = retval['return_code']
54+
nibabies_wf = retval['workflow']
55+
56+
if nibabies_wf is None:
57+
if config.execution.reports_only:
58+
sys.exit(int(retcode > 0))
59+
sys.exit(os.EX_SOFTWARE)
60+
61+
if config.execution.write_graph:
62+
nibabies_wf.write_graph(graph2use="colored", format="svg", simple_form=True)
63+
64+
if retcode != 0:
65+
sys.exit(retcode)
66+
67+
# generate boilerplate
68+
build_boilerplate(nibabies_wf)
69+
if config.execution.boilerplate_only:
70+
sys.exit(0)
71+
72+
gc.collect()
73+
74+
config.loggers.workflow.log(
75+
15,
76+
"\n".join(["nibabies config:"] + ["\t\t%s" % s for s in config.dumps().splitlines()]),
15777
)
158-
write_derivative_description(config.execution.bids_dir, config.execution.nibabies_dir)
159-
write_bidsignore(config.execution.nibabies_dir)
160-
161-
# if failed_reports and not config.execution.notrack:
162-
# sentry_sdk.capture_message(
163-
# "Report generation failed for %d subjects" % failed_reports,
164-
# level="error",
165-
# )
166-
# sys.exit(int((errno + failed_reports) > 0))
78+
config.loggers.workflow.log(25, "nibabies started!")
79+
80+
# Hack MultiProc's pool to reduce VMS
81+
_plugin = config.nipype.get_plugin()
82+
if _pool:
83+
from nipype.pipeline.plugins.multiproc import MultiProcPlugin
84+
85+
multiproc = MultiProcPlugin(plugin_args=config.nipype.plugin_args)
86+
multiproc.pool = _pool
87+
_plugin = {"plugin": multiproc}
88+
89+
gc.collect()
90+
try:
91+
nibabies_wf.run(**_plugin)
92+
except Exception as e:
93+
config.loggers.workflow.critical("nibabies failed: %s", e)
94+
raise
95+
else:
96+
config.loggers.workflow.log(25, "nibabies finished successfully!")
97+
98+
# Bother users with the boilerplate only iff the workflow went okay.
99+
boiler_file = config.execution.nibabies_dir / "logs" / "CITATION.md"
100+
if boiler_file.exists():
101+
if config.environment.exec_env in (
102+
"singularity",
103+
"docker",
104+
"nibabies-docker",
105+
):
106+
boiler_file = Path("<OUTPUT_PATH>") / boiler_file.relative_to(
107+
config.execution.output_dir
108+
)
109+
config.loggers.workflow.log(
110+
25,
111+
"Works derived from this nibabies execution should include the "
112+
f"boilerplate text found in {boiler_file}.",
113+
)
114+
115+
if config.workflow.run_reconall:
116+
from niworkflows.utils.misc import _copy_any
117+
from templateflow import api
118+
119+
dseg_tsv = str(api.get("fsaverage", suffix="dseg", extension=[".tsv"]))
120+
_copy_any(dseg_tsv, str(config.execution.nibabies_dir / "desc-aseg_dseg.tsv"))
121+
_copy_any(dseg_tsv, str(config.execution.nibabies_dir / "desc-aparcaseg_dseg.tsv"))
122+
# errno = 0
123+
finally:
124+
from pkg_resources import resource_filename as pkgrf
125+
126+
from ..reports.core import generate_reports
127+
128+
# Generate reports phase
129+
generate_reports(
130+
config.execution.participant_label,
131+
config.execution.session_id,
132+
config.execution.nibabies_dir,
133+
config.execution.run_uuid,
134+
config=pkgrf("nibabies", "data/reports-spec.yml"),
135+
packagename="nibabies",
136+
)
137+
write_derivative_description(config.execution.bids_dir, config.execution.nibabies_dir)
138+
write_bidsignore(config.execution.nibabies_dir)
167139

168140

169141
if __name__ == "__main__":

nibabies/cli/workflow.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
"""
1111

1212

13-
def build_workflow(config_file, retval):
13+
def build_workflow(config_file):
1414
"""Create the Nipype Workflow that supports the whole execution graph."""
1515
from niworkflows.utils.bids import check_pipeline_version, collect_participants
1616
from niworkflows.utils.misc import check_valid_fs_license
@@ -20,19 +20,19 @@ def build_workflow(config_file, retval):
2020
from ..utils.misc import check_deps
2121
from ..workflows.base import init_nibabies_wf
2222

23+
# initalize config
2324
config.load(config_file)
24-
build_log = config.loggers.workflow
25+
build_logger = config.loggers.workflow
2526

2627
nibabies_dir = config.execution.nibabies_dir
2728
version = config.environment.version
2829

29-
retval["return_code"] = 1
30-
retval["workflow"] = None
30+
retval = {"return_code": 1, "workflow": None}
3131

3232
# warn if older results exist: check for dataset_description.json in output folder
3333
msg = check_pipeline_version(version, nibabies_dir / "dataset_description.json")
3434
if msg is not None:
35-
build_log.warning(msg)
35+
build_logger.warning(msg)
3636

3737
# Please note this is the input folder's dataset_description.json
3838
dset_desc_path = config.execution.bids_dir / "dataset_description.json"
@@ -57,7 +57,7 @@ def build_workflow(config_file, retval):
5757
if config.execution.reports_only:
5858
from pkg_resources import resource_filename as pkgrf
5959

60-
build_log.log(25, "Running --reports-only on participants %s", ", ".join(subject_list))
60+
build_logger.log(25, "Running --reports-only on participants %s", ", ".join(subject_list))
6161
retval["return_code"] = generate_reports(
6262
subject_list,
6363
nibabies_dir,
@@ -82,13 +82,13 @@ def build_workflow(config_file, retval):
8282
if config.execution.fs_subjects_dir:
8383
init_msg += f"""
8484
* Pre-run FreeSurfer's SUBJECTS_DIR: {config.execution.fs_subjects_dir}."""
85-
build_log.log(25, init_msg)
85+
build_logger.log(25, init_msg)
8686

8787
retval["workflow"] = init_nibabies_wf(subjects_sessions)
8888

8989
# Check for FS license after building the workflow
9090
if not check_valid_fs_license():
91-
build_log.critical(
91+
build_logger.critical(
9292
"""\
9393
ERROR: a valid license file is required for FreeSurfer to run. nibabies looked for an existing \
9494
license file at several paths, in this order: 1) command line argument ``--fs-license-file``; \
@@ -101,32 +101,29 @@ def build_workflow(config_file, retval):
101101
# Check workflow for missing commands
102102
missing = check_deps(retval["workflow"])
103103
if missing:
104-
build_log.critical(
104+
build_logger.critical(
105105
"Cannot run nibabies. Missing dependencies:%s",
106106
"\n\t* ".join([""] + [f"{cmd} (Interface: {iface})" for iface, cmd in missing]),
107107
)
108108
retval["return_code"] = 127 # 127 == command not found.
109109
return retval
110110

111-
config.to_filename(config_file)
112-
build_log.info(
111+
# config.to_filename(config_file)
112+
build_logger.info(
113113
"NiBabies workflow graph with %d nodes built successfully.",
114114
len(retval["workflow"]._get_all_nodes()),
115115
)
116116
retval["return_code"] = 0
117117
return retval
118118

119119

120-
def build_boilerplate(config_file, workflow):
120+
def build_boilerplate(workflow):
121121
"""Write boilerplate in an isolated process."""
122122
from .. import config
123123

124-
config.load(config_file)
125124
logs_path = config.execution.nibabies_dir / "logs"
126125
boilerplate = workflow.visit_desc()
127-
citation_files = {
128-
ext: logs_path / ("CITATION.%s" % ext) for ext in ("bib", "tex", "md", "html")
129-
}
126+
citation_files = {ext: logs_path / f"CITATION.{ext}" for ext in ("bib", "tex", "md", "html")}
130127

131128
if boilerplate:
132129
# To please git-annex users and also to guarantee consistency

0 commit comments

Comments
 (0)