diff --git a/src/fmripost_tedana/__main__.py b/src/fmripost_tedana/__main__.py index e69de29..a32a191 100644 --- a/src/fmripost_tedana/__main__.py +++ b/src/fmripost_tedana/__main__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2023-present Chris Markiewicz +# +# SPDX-License-Identifier: Apache-2.0 +import sys + +if __name__ == '__main__': + from .cli import fmripost_tedana + + sys.exit(fmripost_tedana()) diff --git a/src/fmripost_tedana/cli/parser.py b/src/fmripost_tedana/cli/parser.py index 141a63a..7a5186f 100644 --- a/src/fmripost_tedana/cli/parser.py +++ b/src/fmripost_tedana/cli/parser.py @@ -36,6 +36,7 @@ def _build_parser(**kwargs): from pathlib import Path from packaging.version import Version + from tedana.workflows.parser_utils import check_tedpca_value from fmripost_tedana.cli.version import check_latest, is_flagged @@ -170,7 +171,7 @@ def _bids_filter(value, parser): ) g_bids.add_argument( "-d", - "--derivatives", + "--datasets", action="store", metavar="PATH", type=Path, @@ -255,13 +256,42 @@ def _bids_filter(value, parser): ) g_conf = parser.add_argument_group("Workflow configuration") + g_t2star = parser.add_mutually_exclusive_group() + g_t2star.add_argument( + "--average-t2star", + required=False, + action="store_true", + default=False, + help="Average T2* maps across runs before applying the tedana workflow", + ) + g_t2star.add_argument( + "--concatenate-t2star", + required=False, + action="store_true", + default=False, + help="Concatenate runs before estimating T2* maps", + ) + g_conf.add_argument( + "--force", + required=False, + action="store", + nargs="+", + default=[], + choices=["t2star", "fmap-jacobian"], + help=( + "Force selected aspects of the input dataset to disable corresponding " + "parts of the workflow (a space delimited list). " + "t2star: re-estimate T2* maps from the data. " + "fmap-jacobian: apply Jacobian scaling during distortion correction." + ), + ) g_conf.add_argument( "--ignore", required=False, action="store", nargs="+", default=[], - choices=["fieldmaps", "slicetiming", "sbref", "t2w", "flair"], + choices=["fieldmaps", "slicetiming", "sbref", "t2w", "flair", "fmap-jacobian"], help=( "Ignore selected aspects of the input dataset to disable corresponding " "parts of the workflow (a space delimited list)" diff --git a/src/fmripost_tedana/cli/run.py b/src/fmripost_tedana/cli/run.py index 64ae61e..776c0b8 100644 --- a/src/fmripost_tedana/cli/run.py +++ b/src/fmripost_tedana/cli/run.py @@ -1,5 +1,269 @@ -"""Tedana postprocessing workflow.""" +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""fMRI preprocessing workflow.""" + +from fmripost_tedana import config + +EXITCODE: int = -1 def main(): - ... + """Entry point.""" + + import gc + import sys + from multiprocessing import Manager, Process + from os import EX_SOFTWARE + from pathlib import Path + + from fmripost_tedana.cli.parser import parse_args + from fmripost_tedana.cli.workflow import build_workflow + from fmripost_tedana.utils.bids import write_bidsignore, write_derivative_description + + parse_args() + + # Code Carbon + if config.execution.track_carbon: + from codecarbon import OfflineEmissionsTracker + + country_iso_code = config.execution.country_code + config.loggers.workflow.log(25, 'CodeCarbon tracker started ...') + config.loggers.workflow.log(25, f'Using country_iso_code: {country_iso_code}') + config.loggers.workflow.log(25, f'Saving logs at: {config.execution.log_dir}') + + tracker = OfflineEmissionsTracker( + output_dir=config.execution.log_dir, country_iso_code=country_iso_code + ) + tracker.start() + + if 'pdb' in config.execution.debug: + from fmripost_tedana.utils.debug import setup_exceptionhook + + setup_exceptionhook() + config.nipype.plugin = 'Linear' + + sentry_sdk = None + if not config.execution.notrack and not config.execution.debug: + import atexit + + import sentry_sdk + + from fmripost_tedana.utils.telemetry import sentry_setup, setup_migas + + sentry_setup() + setup_migas(init_ping=True) + atexit.register(migas_exit) + + # CRITICAL Save the config to a file. This is necessary because the execution graph + # is built as a separate process to keep the memory footprint low. The most + # straightforward way to communicate with the child process is via the filesystem. + config_file = config.execution.work_dir / config.execution.run_uuid / 'config.toml' + config_file.parent.mkdir(exist_ok=True, parents=True) + config.to_filename(config_file) + + # CRITICAL Call build_workflow(config_file, retval) in a subprocess. + # Because Python on Linux does not ever free virtual memory (VM), running the + # workflow construction jailed within a process preempts excessive VM buildup. + if 'pdb' not in config.execution.debug: + with Manager() as mgr: + retval = mgr.dict() + p = Process(target=build_workflow, args=(str(config_file), retval)) + p.start() + p.join() + retval = dict(retval.items()) # Convert to base dictionary + + if p.exitcode: + retval['return_code'] = p.exitcode + + else: + retval = build_workflow(str(config_file), {}) + + global EXITCODE + EXITCODE = retval.get('return_code', 0) + fmripost_tedana_wf = retval.get('workflow', None) + + # CRITICAL Load the config from the file. This is necessary because the ``build_workflow`` + # function executed constrained in a process may change the config (and thus the global + # state of fMRIPost-tedana). + config.load(config_file) + + if config.execution.reports_only: + sys.exit(int(EXITCODE > 0)) + + if fmripost_tedana_wf and config.execution.write_graph: + fmripost_tedana_wf.write_graph(graph2use='colored', format='svg', simple_form=True) + + EXITCODE = EXITCODE or (fmripost_tedana_wf is None) * EX_SOFTWARE + if EXITCODE != 0: + sys.exit(EXITCODE) + + # Generate boilerplate + with Manager() as mgr: + from fmripost_tedana.cli.workflow import build_boilerplate + + p = Process(target=build_boilerplate, args=(str(config_file), fmripost_tedana_wf)) + p.start() + p.join() + + if config.execution.boilerplate_only: + sys.exit(int(EXITCODE > 0)) + + # Clean up master process before running workflow, which may create forks + gc.collect() + + # Sentry tracking + if sentry_sdk is not None: + with sentry_sdk.configure_scope() as scope: + scope.set_tag('run_uuid', config.execution.run_uuid) + scope.set_tag('npart', len(config.execution.participant_label)) + sentry_sdk.add_breadcrumb(message='fMRIPost-tedana started', level='info') + sentry_sdk.capture_message('fMRIPost-tedana started', level='info') + + config.loggers.workflow.log( + 15, + '\n'.join(['fMRIPost-tedana config:'] + [f'\t\t{s}' for s in config.dumps().splitlines()]), + ) + config.loggers.workflow.log(25, 'fMRIPost-tedana started!') + errno = 1 # Default is error exit unless otherwise set + try: + fmripost_tedana_wf.run(**config.nipype.get_plugin()) + except Exception as e: + if not config.execution.notrack: + from fmripost_tedana.utils.telemetry import process_crashfile + + crashfolders = [ + config.execution.output_dir / f'sub-{s}' / 'log' / config.execution.run_uuid + for s in config.execution.participant_label + ] + for crashfolder in crashfolders: + for crashfile in crashfolder.glob('crash*.*'): + process_crashfile(crashfile) + + if sentry_sdk is not None and 'Workflow did not execute cleanly' not in str(e): + sentry_sdk.capture_exception(e) + + config.loggers.workflow.critical('fMRIPost-tedana failed: %s', e) + raise + + else: + config.loggers.workflow.log(25, 'fMRIPost-tedana finished successfully!') + if sentry_sdk is not None: + success_message = 'fMRIPost-tedana finished without errors' + sentry_sdk.add_breadcrumb(message=success_message, level='info') + sentry_sdk.capture_message(success_message, level='info') + + # Bother users with the boilerplate only iff the workflow went okay. + boiler_file = config.execution.output_dir / 'logs' / 'CITATION.md' + if boiler_file.exists(): + if config.environment.exec_env in ( + 'singularity', + 'docker', + 'fmripost_tedana-docker', + ): + boiler_file = Path('') / boiler_file.relative_to( + config.execution.output_dir + ) + + config.loggers.workflow.log( + 25, + 'Works derived from this fMRIPost-tedana execution should include the ' + f'boilerplate text found in {boiler_file}.', + ) + + if config.workflow.run_reconall: + from niworkflows.utils.misc import _copy_any + from templateflow import api + + dseg_tsv = str(api.get('fsaverage', suffix='dseg', extension=['.tsv'])) + _copy_any(dseg_tsv, str(config.execution.output_dir / 'desc-aseg_dseg.tsv')) + _copy_any(dseg_tsv, str(config.execution.output_dir / 'desc-aparcaseg_dseg.tsv')) + errno = 0 + finally: + # Code Carbon + if config.execution.track_carbon: + emissions: float = tracker.stop() + config.loggers.workflow.log(25, 'CodeCarbon tracker has stopped.') + config.loggers.workflow.log(25, f'Saving logs at: {config.execution.log_dir}') + config.loggers.workflow.log(25, f'Carbon emissions: {emissions} kg') + + from fmripost_tedana.reports.core import generate_reports + + # Generate reports phase + failed_reports = generate_reports( + subject_list=config.execution.participant_label, + output_dir=config.execution.output_dir, + run_uuid=config.execution.run_uuid, + ) + write_derivative_description( + input_dir=config.execution.bids_dir, + output_dir=config.execution.output_dir, + dataset_links=config.execution.dataset_links, + ) + write_bidsignore(config.execution.output_dir) + + if sentry_sdk is not None and failed_reports: + sentry_sdk.capture_message( + f'Report generation failed for {failed_reports} subjects', + level='error', + ) + sys.exit(int((errno + len(failed_reports)) > 0)) + + +def migas_exit() -> None: + """Exit migas. + + Send a final crumb to the migas server signaling if the run successfully completed + This function should be registered with `atexit` to run at termination. + """ + import sys + + from fmripost_tedana.utils.telemetry import send_breadcrumb + + global EXITCODE + migas_kwargs = {'status': 'C'} + # `sys` will not have these attributes unless an error has been handled + if hasattr(sys, 'last_type'): + migas_kwargs = { + 'status': 'F', + 'status_desc': 'Finished with error(s)', + 'error_type': sys.last_type, + 'error_desc': sys.last_value, + } + elif EXITCODE != 0: + migas_kwargs.update( + { + 'status': 'F', + 'status_desc': f'Completed with exitcode {EXITCODE}', + } + ) + else: + migas_kwargs['status_desc'] = 'Success' + + send_breadcrumb(**migas_kwargs) + + +if __name__ == '__main__': + raise RuntimeError( + 'fmripost_tedana/cli/run.py should not be run directly;\n' + 'Please `pip install` fmripost_tedana and use the `fmripost_tedana` command' + ) diff --git a/src/fmripost_tedana/cli/workflow.py b/src/fmripost_tedana/cli/workflow.py index bda0794..8beb28c 100644 --- a/src/fmripost_tedana/cli/workflow.py +++ b/src/fmripost_tedana/cli/workflow.py @@ -1,11 +1,206 @@ -"""The workflow builder factory method.""" +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +""" +The workflow builder factory method. +All the checks and the construction of the workflow are done +inside this function that has pickleable inputs and output +dictionary (``retval``) to allow isolation using a +``multiprocessing.Process`` that allows fmripost_tedana to enforce +a hard-limited memory-scope. -def build_workflow(): - """Build the Tedana workflow.""" - ... +""" -def build_boilerplate(): +def build_workflow(config_file, retval): + """Create the Nipype Workflow that supports the whole execution graph.""" + from pathlib import Path + + from fmriprep.utils.bids import check_pipeline_version + from fmriprep.utils.misc import check_deps + from niworkflows.utils.bids import collect_participants + from pkg_resources import resource_filename as pkgrf + + from fmripost_tedana import config + from fmripost_tedana.reports.core import generate_reports + from fmripost_tedana.workflows.base import init_fmripost_tedana_wf + + config.load(config_file) + build_log = config.loggers.workflow + + output_dir = config.execution.output_dir + version = config.environment.version + + retval['return_code'] = 1 + retval['workflow'] = None + + banner = [f'Running fMRIPost-tedana version {version}'] + notice_path = Path(pkgrf('fmripost_tedana', 'data/NOTICE')) + if notice_path.exists(): + banner[0] += '\n' + banner += [f'License NOTICE {"#" * 50}'] + banner += [f'fMRIPost-tedana {version}'] + banner += notice_path.read_text().splitlines(keepends=False)[1:] + banner += ['#' * len(banner[1])] + build_log.log(25, f'\n{" " * 9}'.join(banner)) + + # warn if older results exist: check for dataset_description.json in output folder + msg = check_pipeline_version( + 'fMRIPost-tedana', + version, + output_dir / 'dataset_description.json', + ) + if msg is not None: + build_log.warning(msg) + + # Please note this is the input folder's dataset_description.json + dset_desc_path = config.execution.bids_dir / 'dataset_description.json' + if dset_desc_path.exists(): + from hashlib import sha256 + + desc_content = dset_desc_path.read_bytes() + config.execution.bids_description_hash = sha256(desc_content).hexdigest() + + # First check that bids_dir looks like a BIDS folder + subject_list = collect_participants( + config.execution.layout, + participant_label=config.execution.participant_label, + ) + + # Called with reports only + if config.execution.reports_only: + build_log.log(25, 'Running --reports-only on participants %s', ', '.join(subject_list)) + failed_reports = generate_reports( + subject_list=config.execution.participant_label, + output_dir=config.execution.output_dir, + run_uuid=config.execution.run_uuid, + ) + retval['return_code'] = len(failed_reports) + return retval + + # Build main workflow + init_msg = [ + "Building fMRIPost-tedana's workflow:", + f'BIDS dataset path: {config.execution.bids_dir}.', + f'Participant list: {subject_list}.', + f'Run identifier: {config.execution.run_uuid}.', + f'Output spaces: {config.execution.output_spaces}.', + ] + + if config.execution.datasets: + init_msg += [f'Searching for datasets: {config.execution.datasets}.'] + + build_log.log(25, f'\n{" " * 11}* '.join(init_msg)) + + retval['workflow'] = init_fmripost_tedana_wf() + + # Check workflow for missing commands + missing = check_deps(retval['workflow']) + if missing: + build_log.critical( + 'Cannot run fMRIPost-tedana. Missing dependencies:%s', + '\n\t* '.join([''] + [f'{cmd} (Interface: {iface})' for iface, cmd in missing]), + ) + retval['return_code'] = 127 # 127 == command not found. + return retval + + config.to_filename(config_file) + build_log.info( + 'fMRIPost-tedana workflow graph with %d nodes built successfully.', + len(retval['workflow']._get_all_nodes()), + ) + retval['return_code'] = 0 + return retval + + +def build_boilerplate(config_file, workflow): """Write boilerplate in an isolated process.""" - ... + from fmripost_tedana import config + + config.load(config_file) + logs_path = config.execution.output_dir / 'logs' + boilerplate = workflow.visit_desc() + citation_files = {ext: logs_path / f'CITATION.{ext}' for ext in ('bib', 'tex', 'md', 'html')} + + if boilerplate: + # To please git-annex users and also to guarantee consistency + # among different renderings of the same file, first remove any + # existing one + for citation_file in citation_files.values(): + try: + citation_file.unlink() + except FileNotFoundError: + pass + + citation_files['md'].write_text(boilerplate) + + if not config.execution.md_only_boilerplate and citation_files['md'].exists(): + from pathlib import Path + from subprocess import CalledProcessError, TimeoutExpired, check_call + + from pkg_resources import resource_filename as pkgrf + + bib_text = Path(pkgrf('fmripost_tedana', 'data/boilerplate.bib')).read_text() + citation_files['bib'].write_text( + bib_text.replace( + 'fMRIPost-tedana ', + f'fMRIPost-tedana {config.environment.version}', + ) + ) + + # Generate HTML file resolving citations + cmd = [ + 'pandoc', + '-s', + '--bibliography', + str(citation_files['bib']), + '--citeproc', + '--metadata', + 'pagetitle="fMRIPost-tedana citation boilerplate"', + str(citation_files['md']), + '-o', + str(citation_files['html']), + ] + + config.loggers.cli.info('Generating an HTML version of the citation boilerplate...') + try: + check_call(cmd, timeout=10) + except (FileNotFoundError, CalledProcessError, TimeoutExpired): + config.loggers.cli.warning('Could not generate CITATION.html file:\n%s', ' '.join(cmd)) + + # Generate LaTex file resolving citations + cmd = [ + 'pandoc', + '-s', + '--bibliography', + str(citation_files['bib']), + '--natbib', + str(citation_files['md']), + '-o', + str(citation_files['tex']), + ] + config.loggers.cli.info('Generating a LaTeX version of the citation boilerplate...') + try: + check_call(cmd, timeout=10) + except (FileNotFoundError, CalledProcessError, TimeoutExpired): + config.loggers.cli.warning('Could not generate CITATION.tex file:\n%s', ' '.join(cmd)) diff --git a/src/fmripost_tedana/config.py b/src/fmripost_tedana/config.py new file mode 100644 index 0000000..20bae4e --- /dev/null +++ b/src/fmripost_tedana/config.py @@ -0,0 +1,757 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +r""" +A Python module to maintain unique, run-wide *fMRIPost-tedana* settings. + +This module implements the memory structures to keep a consistent, singleton config. +Settings are passed across processes via filesystem, and a copy of the settings for +each run and subject is left under +``/sub-/log//fmripost_tedana.toml``. +Settings are stored using :abbr:`ToML (Tom's Markup Language)`. +The module has a :py:func:`~fmripost_tedana.config.to_filename` function to allow writing out +the settings to hard disk in *ToML* format, which looks like: + +.. literalinclude:: ../fmripost_tedana/data/tests/config.toml + :language: toml + :name: fmripost_tedana.toml + :caption: **Example file representation of fMRIPost-tedana settings**. + +This config file is used to pass the settings across processes, +using the :py:func:`~fmripost_tedana.config.load` function. + +Configuration sections +---------------------- +.. autoclass:: environment + :members: +.. autoclass:: execution + :members: +.. autoclass:: workflow + :members: +.. autoclass:: nipype + :members: + +Usage +----- +A config file is used to pass settings and collect information as the execution +graph is built across processes. + +.. code-block:: Python + + from fmripost_tedana import config + config_file = config.execution.work_dir / '.fmripost_tedana.toml' + config.to_filename(config_file) + # Call build_workflow(config_file, retval) in a subprocess + with Manager() as mgr: + from .workflow import build_workflow + retval = mgr.dict() + p = Process(target=build_workflow, args=(str(config_file), retval)) + p.start() + p.join() + config.load(config_file) + # Access configs from any code section as: + value = config.section.setting + +Logging +------- +.. autoclass:: loggers + :members: + +Other responsibilities +---------------------- +The :py:mod:`config` is responsible for other conveniency actions. + + * Switching Python's :obj:`multiprocessing` to *forkserver* mode. + * Set up a filter for warnings as early as possible. + * Automated I/O magic operations. Some conversions need to happen in the + store/load processes (e.g., from/to :obj:`~pathlib.Path` \<-\> :obj:`str`, + :py:class:`~bids.layout.BIDSLayout`, etc.) + +""" + +import os +from multiprocessing import set_start_method + +from templateflow.conf import TF_LAYOUT + +# Disable NiPype etelemetry always +_disable_et = bool(os.getenv('NO_ET') is not None or os.getenv('NIPYPE_NO_ET') is not None) +os.environ['NIPYPE_NO_ET'] = '1' +os.environ['NO_ET'] = '1' + +CONFIG_FILENAME = 'fmripost_tedana.toml' + +try: + set_start_method('forkserver') +except RuntimeError: + pass # context has been already set +finally: + # Defer all custom import for after initializing the forkserver and + # ignoring the most annoying warnings + import random + import sys + from pathlib import Path + from time import strftime + from uuid import uuid4 + + from nipype import __version__ as _nipype_ver + from templateflow import __version__ as _tf_ver + + from . import __version__ + +if not hasattr(sys, '_is_pytest_session'): + sys._is_pytest_session = False # Trick to avoid sklearn's FutureWarnings +# Disable all warnings in main and children processes only on production versions +if not any( + ( + '+' in __version__, + __version__.endswith('.dirty'), + os.getenv('FMRIPREP_DEV', '0').lower() in ('1', 'on', 'true', 'y', 'yes'), + ) +): + from fmriprep._warnings import logging + + os.environ['PYTHONWARNINGS'] = 'ignore' +elif os.getenv('FMRIPREP_WARNINGS', '0').lower() in ('1', 'on', 'true', 'y', 'yes'): + # allow disabling warnings on development versions + # https://github.com/nipreps/fmripost_tedana/pull/2080#discussion_r409118765 + from fmriprep._warnings import logging +else: + import logging + +logging.addLevelName(25, 'IMPORTANT') # Add a new level between INFO and WARNING +logging.addLevelName(15, 'VERBOSE') # Add a new level between INFO and DEBUG + +DEFAULT_MEMORY_MIN_GB = 0.01 + +# Ping NiPype eTelemetry once if env var was not set +# workers on the pool will have the env variable set from the master process +if not _disable_et: + # Just get so analytics track one hit + from contextlib import suppress + + from requests import ConnectionError, ReadTimeout # noqa: A004 + from requests import get as _get_url + + with suppress((ConnectionError, ReadTimeout)): + _get_url('https://rig.mit.edu/et/projects/nipy/nipype', timeout=0.05) + +# Execution environment +_exec_env = os.name +_docker_ver = None +# special variable set in the container +if os.getenv('IS_DOCKER_8395080871'): + _exec_env = 'singularity' + _cgroup = Path('/proc/1/cgroup') + if _cgroup.exists() and 'docker' in _cgroup.read_text(): + _docker_ver = os.getenv('DOCKER_VERSION_8395080871') + _exec_env = 'fmripost_tedana-docker' if _docker_ver else 'docker' + del _cgroup + +_fs_license = os.getenv('FS_LICENSE') +if not _fs_license and os.getenv('FREESURFER_HOME'): + _fs_home = os.getenv('FREESURFER_HOME') + if _fs_home and (Path(_fs_home) / 'license.txt').is_file(): + _fs_license = str(Path(_fs_home) / 'license.txt') + del _fs_home + +_templateflow_home = Path( + os.getenv('TEMPLATEFLOW_HOME', os.path.join(os.getenv('HOME'), '.cache', 'templateflow')) +) + +try: + from psutil import virtual_memory + + _free_mem_at_start = round(virtual_memory().available / 1024**3, 1) +except ImportError: + _free_mem_at_start = None + +_oc_limit = 'n/a' +_oc_policy = 'n/a' +try: + # Memory policy may have a large effect on types of errors experienced + _proc_oc_path = Path('/proc/sys/vm/overcommit_memory') + if _proc_oc_path.exists(): + _oc_policy = {'0': 'heuristic', '1': 'always', '2': 'never'}.get( + _proc_oc_path.read_text().strip(), 'unknown' + ) + if _oc_policy != 'never': + _proc_oc_kbytes = Path('/proc/sys/vm/overcommit_kbytes') + if _proc_oc_kbytes.exists(): + _oc_limit = _proc_oc_kbytes.read_text().strip() + if _oc_limit in ('0', 'n/a') and Path('/proc/sys/vm/overcommit_ratio').exists(): + _oc_limit = '{}%'.format(Path('/proc/sys/vm/overcommit_ratio').read_text().strip()) +except Exception: # noqa: S110, BLE001 + pass + + +# Debug modes are names that influence the exposure of internal details to +# the user, either through additional derivatives or increased verbosity +DEBUG_MODES = ('compcor', 'fieldmaps', 'pdb') + + +class _Config: + """An abstract class forbidding instantiation.""" + + _paths = () + + def __init__(self): + """Avert instantiation.""" + raise RuntimeError('Configuration type is not instantiable.') + + @classmethod + def load(cls, settings, init=True, ignore=None): + """Store settings from a dictionary.""" + ignore = ignore or {} + for k, v in settings.items(): + if k in ignore or v is None: + continue + + if k in cls._paths: + if isinstance(v, list | tuple): + setattr(cls, k, [Path(val).absolute() for val in v]) + elif isinstance(v, dict): + setattr(cls, k, {key: Path(val).absolute() for key, val in v.items()}) + else: + setattr(cls, k, Path(v).absolute()) + + elif hasattr(cls, k): + setattr(cls, k, v) + + if init: + try: + cls.init() + except AttributeError: + pass + + @classmethod + def get(cls): + """Return defined settings.""" + from niworkflows.utils.spaces import Reference, SpatialReferences + + out = {} + for k, v in cls.__dict__.items(): + if k.startswith('_') or v is None: + continue + + if callable(getattr(cls, k)): + continue + + if k in cls._paths: + if isinstance(v, list | tuple): + v = [str(val) for val in v] + elif isinstance(v, dict): + v = {key: str(val) for key, val in v.items()} + else: + v = str(v) + + if isinstance(v, SpatialReferences): + v = ' '.join(str(s) for s in v.references) or None + + if isinstance(v, Reference): + v = str(v) or None + + out[k] = v + return out + + +class environment(_Config): + """ + Read-only options regarding the platform and environment. + + Crawls runtime descriptive settings (e.g., default FreeSurfer license, + execution environment, nipype and *fMRIPost-tedana* versions, etc.). + The ``environment`` section is not loaded in from file, + only written out when settings are exported. + This config section is useful when reporting issues, + and these variables are tracked whenever the user does not + opt-out using the ``--notrack`` argument. + + """ + + cpu_count = os.cpu_count() + """Number of available CPUs.""" + exec_docker_version = _docker_ver + """Version of Docker Engine.""" + exec_env = _exec_env + """A string representing the execution platform.""" + free_mem = _free_mem_at_start + """Free memory at start.""" + overcommit_policy = _oc_policy + """Linux's kernel virtual memory overcommit policy.""" + overcommit_limit = _oc_limit + """Linux's kernel virtual memory overcommit limits.""" + nipype_version = _nipype_ver + """Nipype's current version.""" + templateflow_version = _tf_ver + """The TemplateFlow client version installed.""" + version = __version__ + """*fMRIPost-tedana*'s version.""" + + +class nipype(_Config): + """Nipype settings.""" + + crashfile_format = 'txt' + """The file format for crashfiles, either text (txt) or pickle (pklz).""" + get_linked_libs = False + """Run NiPype's tool to enlist linked libraries for every interface.""" + memory_gb = None + """Estimation in GB of the RAM this workflow can allocate at any given time.""" + nprocs = os.cpu_count() + """Number of processes (compute tasks) that can be run in parallel (multiprocessing only).""" + omp_nthreads = None + """Number of CPUs a single process can access for multithreaded execution.""" + plugin = 'MultiProc' + """NiPype's execution plugin.""" + plugin_args = { + 'maxtasksperchild': 1, + 'raise_insufficient': False, + } + """Settings for NiPype's execution plugin.""" + remove_unnecessary_outputs = True + """Clean up unused outputs after running""" + resource_monitor = False + """Enable resource monitor.""" + stop_on_first_crash = True + """Whether the workflow should stop or continue after the first error.""" + + @classmethod + def get_plugin(cls): + """Format a dictionary for Nipype consumption.""" + out = { + 'plugin': cls.plugin, + 'plugin_args': cls.plugin_args, + } + if cls.plugin in ('MultiProc', 'LegacyMultiProc'): + out['plugin_args']['n_procs'] = int(cls.nprocs) + if cls.memory_gb: + out['plugin_args']['memory_gb'] = float(cls.memory_gb) + return out + + @classmethod + def init(cls): + """Set NiPype configurations.""" + from nipype import config as ncfg + + # Configure resource_monitor + if cls.resource_monitor: + ncfg.update_config( + { + 'monitoring': { + 'enabled': cls.resource_monitor, + 'sample_frequency': '0.5', + 'summary_append': True, + } + } + ) + ncfg.enable_resource_monitor() + + # Nipype config (logs and execution) + ncfg.update_config( + { + 'execution': { + 'crashdump_dir': str(execution.log_dir), + 'crashfile_format': cls.crashfile_format, + 'get_linked_libs': cls.get_linked_libs, + 'remove_unnecessary_outputs': cls.remove_unnecessary_outputs, + 'stop_on_first_crash': cls.stop_on_first_crash, + 'check_version': False, # disable future telemetry + } + } + ) + + if cls.omp_nthreads is None: + cls.omp_nthreads = min(cls.nprocs - 1 if cls.nprocs > 1 else os.cpu_count(), 8) + + +class execution(_Config): + """Configure run-level settings.""" + + bids_dir = None + """An existing path to the dataset, which must be BIDS-compliant.""" + datasets = {} + """Path(s) to search for pre-computed derivatives""" + bids_database_dir = None + """Path to the directory containing SQLite database indices for the input BIDS dataset.""" + bids_description_hash = None + """Checksum (SHA256) of the ``dataset_description.json`` of the BIDS dataset.""" + bids_filters = None + """A dictionary of BIDS selection filters.""" + boilerplate_only = False + """Only generate a boilerplate.""" + sloppy = False + """Run in sloppy mode (meaning, suboptimal parameters that minimize run-time).""" + debug = [] + """Debug mode(s).""" + fs_license_file = _fs_license + """An existing file containing a FreeSurfer license.""" + layout = None + """A :py:class:`~bids.layout.BIDSLayout` object, see :py:func:`init`.""" + log_dir = None + """The path to a directory that contains execution logs.""" + log_level = 25 + """Output verbosity.""" + low_mem = None + """Utilize uncompressed NIfTIs and other tricks to minimize memory allocation.""" + md_only_boilerplate = False + """Do not convert boilerplate from MarkDown to LaTex and HTML.""" + notrack = False + """Do not collect telemetry information for *fMRIPost-tedana*.""" + track_carbon = False + """Tracks power draws using CodeCarbon package.""" + country_code = 'CAN' + """Country ISO code used by carbon trackers.""" + output_dir = None + """Folder where derivatives will be stored.""" + aggr_ses_reports = None + """Maximum number of sessions aggregated in one subject's visual report.""" + output_spaces = None + """List of (non)standard spaces designated (with the ``--output-spaces`` flag of + the command line) as spatial references for outputs.""" + reports_only = False + """Only build the reports, based on the reportlets found in a cached working directory.""" + run_uuid = f'{strftime("%Y%m%d-%H%M%S")}_{uuid4()}' + """Unique identifier of this particular run.""" + participant_label = None + """List of participant identifiers that are to be preprocessed.""" + task_id = None + """Select a particular task from all available in the dataset.""" + templateflow_home = _templateflow_home + """The root folder of the TemplateFlow client.""" + work_dir = Path('work').absolute() + """Path to a working directory where intermediate results will be available.""" + write_graph = False + """Write out the computational graph corresponding to the planned preprocessing.""" + + _layout = None + + _paths = ( + 'bids_dir', + 'datasets', + 'bids_database_dir', + 'fs_license_file', + 'layout', + 'log_dir', + 'output_dir', + 'templateflow_home', + 'work_dir', + ) + + @classmethod + def init(cls): + """Create a new BIDS Layout accessible with :attr:`~execution.layout`.""" + if cls.fs_license_file and Path(cls.fs_license_file).is_file(): + os.environ['FS_LICENSE'] = str(cls.fs_license_file) + + if cls._layout is None: + import re + + from bids.layout import BIDSLayout + from bids.layout.index import BIDSLayoutIndexer + + _db_path = cls.bids_database_dir or (cls.work_dir / cls.run_uuid / 'bids_db') + _db_path.mkdir(exist_ok=True, parents=True) + + # Recommended after PyBIDS 12.1 + _indexer = BIDSLayoutIndexer( + validate=False, + ignore=( + 'code', + 'stimuli', + 'sourcedata', + 'models', + re.compile(r'^\.'), + re.compile( + r'sub-[a-zA-Z0-9]+(/ses-[a-zA-Z0-9]+)?/(beh|dwi|eeg|ieeg|meg|perf)' + ), + ), + ) + cls._layout = BIDSLayout( + str(cls.bids_dir), + database_path=_db_path, + reset_database=cls.bids_database_dir is None, + indexer=_indexer, + ) + cls.bids_database_dir = _db_path + cls.layout = cls._layout + if cls.bids_filters: + from bids.layout import Query + + def _process_value(value): + """Convert string with "Query" in it to Query object.""" + if isinstance(value, list): + return [_process_value(val) for val in value] + else: + return ( + getattr(Query, value[7:-4]) + if not isinstance(value, Query) and 'Query' in value + else value + ) + + # unserialize pybids Query enum values + for entity, values in cls.bids_filters.items(): + cls.bids_filters[entity] = _process_value(values) + + dataset_links = { + 'input': cls.bids_dir, + 'templateflow': Path(TF_LAYOUT.root), + } + for deriv_name, deriv_path in cls.datasets.items(): + dataset_links[deriv_name] = deriv_path + cls.dataset_links = dataset_links + + if 'all' in cls.debug: + cls.debug = list(DEBUG_MODES) + + +# These variables are not necessary anymore +del _fs_license +del _exec_env +del _nipype_ver +del _templateflow_home +del _tf_ver +del _free_mem_at_start +del _oc_limit +del _oc_policy + + +class workflow(_Config): + """Configure the particular execution graph of this workflow.""" + + err_on_warn = False + """Cast tedana warnings to errors.""" + melodic_dim = None + """Number of ICA components to be estimated by MELODIC + (positive = exact, negative = maximum).""" + denoise_method = None + """Denoising strategy to be used.""" + ignore = None + """Ignore particular steps for *fMRIPost-tedana*.""" + cifti_output = None + """Generate HCP Grayordinates, accepts either ``'91k'`` (default) or ``'170k'``.""" + dummy_scans = None + """Set a number of initial scans to be considered nonsteady states.""" + slice_time_ref = 0.5 + """The time of the reference slice to correct BOLD values to, as a fraction + acquisition time. 0 indicates the start, 0.5 the midpoint, and 1 the end + of acquisition. The alias `start` corresponds to 0, and `middle` to 0.5. + The default value is 0.5.""" + + +class loggers: + """Keep loggers easily accessible (see :py:func:`init`).""" + + _fmt = '%(asctime)s,%(msecs)d %(name)-2s %(levelname)-2s:\n\t %(message)s' + _datefmt = '%y%m%d-%H:%M:%S' + + default = logging.getLogger() + """The root logger.""" + cli = logging.getLogger('cli') + """Command-line interface logging.""" + workflow = logging.getLogger('nipype.workflow') + """NiPype's workflow logger.""" + interface = logging.getLogger('nipype.interface') + """NiPype's interface logger.""" + utils = logging.getLogger('nipype.utils') + """NiPype's utils logger.""" + + @classmethod + def init(cls): + """ + Set the log level, initialize all loggers into :py:class:`loggers`. + + * Add new logger levels (25: IMPORTANT, and 15: VERBOSE). + * Add a new sub-logger (``cli``). + * Logger configuration. + + """ + from nipype import config as ncfg + + if not cls.cli.hasHandlers(): + _handler = logging.StreamHandler(stream=sys.stdout) + _handler.setFormatter(logging.Formatter(fmt=cls._fmt, datefmt=cls._datefmt)) + cls.cli.addHandler(_handler) + cls.default.setLevel(execution.log_level) + cls.cli.setLevel(execution.log_level) + cls.interface.setLevel(execution.log_level) + cls.workflow.setLevel(execution.log_level) + cls.utils.setLevel(execution.log_level) + ncfg.update_config( + {'logging': {'log_directory': str(execution.log_dir), 'log_to_file': True}} + ) + + +class seeds(_Config): + """Initialize the PRNG and track random seed assignments""" + + _random_seed = None + master = None + """Master random seed to initialize the Pseudorandom Number Generator (PRNG)""" + ants = None + """Seed used for antsRegistration, antsAI, antsMotionCorr""" + numpy = None + """Seed used by NumPy""" + + @classmethod + def init(cls): + if cls._random_seed is not None: + cls.master = cls._random_seed + if cls.master is None: + cls.master = random.randint(1, 65536) + random.seed(cls.master) # initialize the PRNG + # functions to set program specific seeds + cls.ants = _set_ants_seed() + cls.numpy = _set_numpy_seed() + + +def _set_ants_seed(): + """Fix random seed for antsRegistration, antsAI, antsMotionCorr""" + val = random.randint(1, 65536) + os.environ['ANTS_RANDOM_SEED'] = str(val) + return val + + +def _set_numpy_seed(): + """NumPy's random seed is independent from Python's `random` module""" + import numpy as np + + val = random.randint(1, 65536) + np.random.seed(val) + return val + + +def from_dict(settings, init=True, ignore=None): + """Read settings from a flat dictionary. + + Arguments + --------- + setting : dict + Settings to apply to any configuration + init : `bool` or :py:class:`~collections.abc.Container` + Initialize all, none, or a subset of configurations. + ignore : :py:class:`~collections.abc.Container` + Collection of keys in ``setting`` to ignore + """ + + # Accept global True/False or container of configs to initialize + def initialize(x): + return init if init in (True, False) else x in init + + nipype.load(settings, init=initialize('nipype'), ignore=ignore) + execution.load(settings, init=initialize('execution'), ignore=ignore) + workflow.load(settings, init=initialize('workflow'), ignore=ignore) + seeds.load(settings, init=initialize('seeds'), ignore=ignore) + + loggers.init() + + +def load(filename, skip=None, init=True): + """Load settings from file. + + Arguments + --------- + filename : :py:class:`os.PathLike` + TOML file containing fMRIPost-tedana configuration. + skip : dict or None + Sets of values to ignore during load, keyed by section name + init : `bool` or :py:class:`~collections.abc.Container` + Initialize all, none, or a subset of configurations. + """ + from toml import loads + + skip = skip or {} + + # Accept global True/False or container of configs to initialize + def initialize(x): + return init if init in (True, False) else x in init + + filename = Path(filename) + settings = loads(filename.read_text()) + for sectionname, configs in settings.items(): + if sectionname != 'environment': + section = getattr(sys.modules[__name__], sectionname) + ignore = skip.get(sectionname) + section.load(configs, ignore=ignore, init=initialize(sectionname)) + init_spaces() + + +def get(flat=False): + """Get config as a dict.""" + settings = { + 'environment': environment.get(), + 'execution': execution.get(), + 'workflow': workflow.get(), + 'nipype': nipype.get(), + 'seeds': seeds.get(), + } + if not flat: + return settings + + return { + '.'.join((section, k)): v + for section, configs in settings.items() + for k, v in configs.items() + } + + +def dumps(): + """Format config into toml.""" + from toml import dumps + + return dumps(get()) + + +def to_filename(filename): + """Write settings to file.""" + filename = Path(filename) + filename.write_text(dumps()) + + +def init_spaces(checkpoint=True): + """Initialize the :attr:`~workflow.spaces` setting.""" + from niworkflows.utils.spaces import Reference, SpatialReferences + + spaces = execution.output_spaces or SpatialReferences() + if not isinstance(spaces, SpatialReferences): + spaces = SpatialReferences( + [ref for s in spaces.split(' ') for ref in Reference.from_string(s)] + ) + + if checkpoint and not spaces.is_cached(): + spaces.checkpoint() + + # Add the default standard space if not already present (required by several sub-workflows) + if 'MNI152NLin6Asym' not in spaces.get_spaces(nonstandard=False, dim=(3,)): + spaces.add(Reference('MNI152NLin6Asym', {})) + + # Ensure user-defined spatial references for outputs are correctly parsed. + # Certain options require normalization to a space not explicitly defined by users. + # These spaces will not be included in the final outputs. + cifti_output = workflow.cifti_output + if cifti_output: + # CIFTI grayordinates to corresponding FSL-MNI resolutions. + vol_res = '2' if cifti_output == '91k' else '1' + spaces.add(Reference('MNI152NLin6Asym', {'res': vol_res})) + + # Make the SpatialReferences object available + workflow.spaces = spaces diff --git a/src/fmripost_tedana/config.yml b/src/fmripost_tedana/config.yml deleted file mode 100644 index e69de29..0000000 diff --git a/src/fmripost_tedana/utils/bids.py b/src/fmripost_tedana/utils/bids.py index 626d16a..a567be8 100644 --- a/src/fmripost_tedana/utils/bids.py +++ b/src/fmripost_tedana/utils/bids.py @@ -1,78 +1,473 @@ -"""BIDS-related functions for fmripost_tedana.""" -import yaml -from bids.layout import Query +"""Utilities to handle BIDS inputs.""" -from fmripost_tedana import config +from __future__ import annotations +import json +from collections import defaultdict +from pathlib import Path -def collect_data(): - """Collect first echo's preprocessed file for all requested runs.""" - layout = config.layout - bids_filters = config.bids_filters or {} - participant_label = config.participant_label +from bids.layout import BIDSLayout, Query +from bids.utils import listify +from niworkflows.utils.spaces import SpatialReferences - query = { - "echo": 1, - "space": ["boldref", Query.NONE], - "suffix": "bold", - "extension": [".nii", ".nii.gz"], - } +from fmripost_tedana.data import load as load_data - # Apply filters. These may override anything. - if "echo_files" in bids_filters.keys(): - for acq, entities in bids_filters["echo_files"].items(): - query[acq].update(entities) - echo1_files = layout.get(return_type="file", subject=participant_label, **query) +def extract_entities(file_list: str | list[str]) -> dict: + """Return a dictionary of common entities given a list of files. - return echo1_files + Parameters + ---------- + file_list : str | list[str] + File path or list of file paths. + Returns + ------- + entities : dict + Dictionary of entities. -def collect_run_data(echo1_file): - """Use pybids to retrieve the input data for a given participant.""" - layout = config.layout - bids_filters = config.bids_filters or {} + Examples + -------- + >>> extract_entities("sub-01/anat/sub-01_T1w.nii.gz") + {'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'} + >>> extract_entities(["sub-01/anat/sub-01_T1w.nii.gz"] * 2) + {'subject': '01', 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'} + >>> extract_entities(["sub-01/anat/sub-01_run-1_T1w.nii.gz", + ... "sub-01/anat/sub-01_run-2_T1w.nii.gz"]) + {'subject': '01', 'run': [1, 2], 'suffix': 'T1w', 'datatype': 'anat', 'extension': '.nii.gz'} - queries = { - "echo_files": { - "echo": Query.ANY, - }, - "mask": { - "echo": Query.NONE, - "desc": "brain", - "suffix": "mask", - }, - "confounds": { - "echo": Query.NONE, - "desc": "confounds", - "suffix": "timeseries", - "extension": ".tsv", + """ + from collections import defaultdict + + from bids.layout import parse_file_entities + + entities = defaultdict(list) + for e, v in [ + ev_pair for f in listify(file_list) for ev_pair in parse_file_entities(f).items() + ]: + entities[e].append(v) + + def _unique(inlist): + inlist = sorted(set(inlist)) + if len(inlist) == 1: + return inlist[0] + return inlist + + return {k: _unique(v) for k, v in entities.items()} + + +def collect_derivatives( + raw_dataset: Path | BIDSLayout | None, + derivatives_dataset: Path | BIDSLayout | None, + entities: dict | None, + fieldmap_id: str | None, + spec: dict | None = None, + patterns: list[str] | None = None, + allow_multiple: bool = False, + spaces: SpatialReferences | None = None, +) -> dict: + """Gather existing derivatives and compose a cache. + + TODO: Ingress 'spaces' and search for BOLD+mask in the spaces *or* xfms. + + Parameters + ---------- + raw_dataset : Path | BIDSLayout | None + Path to the raw dataset or a BIDSLayout instance. + derivatives_dataset : Path | BIDSLayout + Path to the derivatives dataset or a BIDSLayout instance. + entities : dict + Dictionary of entities to use for filtering. + fieldmap_id : str | None + Fieldmap ID to use for filtering. + spec : dict | None + Specification dictionary. + patterns : list[str] | None + List of patterns to use for filtering. + allow_multiple : bool + Allow multiple files to be returned for a given query. + spaces : SpatialReferences | None + Spatial references to select for. + + Returns + ------- + derivs_cache : dict + Dictionary with keys corresponding to the derivatives and values + corresponding to the file paths. + """ + if not entities: + entities = {} + + if spec is None or patterns is None: + _spec = json.loads(load_data.readable('io_spec.json').read_text()) + + if spec is None: + spec = _spec['queries'] + + if patterns is None: + patterns = _spec['patterns'] + + # Search for derivatives data + derivs_cache = defaultdict(list, {}) + if derivatives_dataset is not None: + layout = derivatives_dataset + if isinstance(layout, Path): + layout = BIDSLayout( + layout, + config=['bids', 'derivatives'], + validate=False, + ) + + for k, q in spec['derivatives'].items(): + if k.startswith('anat'): + # Allow anatomical derivatives at session level or subject level + query = { + **{'subject': entities['subject'], 'session': [entities.get('session'), None]}, + **q, + } + else: + # Combine entities with query. Query values override file entities. + query = {**entities, **q} + + item = layout.get(return_type='filename', **query) + if k.startswith('anat') and not item: + # If the anatomical derivative is not found, try to find it + # across sessions + query = {**{'subject': entities['subject'], 'session': [Query.ANY]}, **q} + item = layout.get(return_type='filename', **query) + + if not item: + derivs_cache[k] = None + elif not allow_multiple and len(item) > 1 and k.startswith('anat'): + # Raise an error if multiple derivatives are found from different sessions + item_sessions = [layout.get_file(f).entities['session'] for f in item] + if len(set(item_sessions)) > 1: + raise ValueError(f'Multiple anatomical derivatives found for {k}: {item}') + + # Anatomical derivatives are allowed to have multiple files (e.g., T1w and T2w) + # but we just grab the first one + derivs_cache[k] = item[0] + elif not allow_multiple and len(item) > 1: + raise ValueError(f'Multiple files found for {k}: {item}') + else: + derivs_cache[k] = item[0] if len(item) == 1 else item + + for k, q in spec['transforms'].items(): + if k.startswith('anat'): + # Allow anatomical derivatives at session level or subject level + query = { + **{'subject': entities['subject'], 'session': [entities.get('session'), None]}, + **q, + } + else: + # Combine entities with query. Query values override file entities. + query = {**entities, **q} + + if k == 'boldref2fmap': + query['to'] = fieldmap_id + + item = layout.get(return_type='filename', **query) + if k.startswith('anat') and not item: + # If the anatomical derivative is not found, try to find it + # across sessions + query = {**{'subject': entities['subject'], 'session': [Query.ANY]}, **q} + item = layout.get(return_type='filename', **query) + + if not item: + derivs_cache[k] = None + elif not allow_multiple and len(item) > 1 and k.startswith('anat'): + # Anatomical derivatives are allowed to have multiple files (e.g., T1w and T2w) + # but we just grab the first one + derivs_cache[k] = item[0] + elif not allow_multiple and len(item) > 1: + raise ValueError(f'Multiple files found for {k}: {item}') + else: + derivs_cache[k] = item[0] if len(item) == 1 else item + + # Search for requested output spaces + if spaces is not None: + # Put the output-space files/transforms in lists so they can be parallelized with + # template_iterator_wf. + spaces_found, bold_outputspaces, bold_mask_outputspaces = [], [], [] + for space in spaces.references: + # First try to find processed BOLD+mask files in the requested space + bold_query = {**entities, **spec['derivatives']['bold_mni152nlin6asym']} + bold_query['space'] = space.space + bold_query = {**bold_query, **space.spec} + bold_item = layout.get(return_type='filename', **bold_query) + bold_outputspaces.append(bold_item[0] if bold_item else None) + + mask_query = {**entities, **spec['derivatives']['bold_mask_mni152nlin6asym']} + mask_query['space'] = space.space + mask_query = {**mask_query, **space.spec} + mask_item = layout.get(return_type='filename', **mask_query) + bold_mask_outputspaces.append(mask_item[0] if mask_item else None) + + spaces_found.append(bool(bold_item) and bool(mask_item)) + + if all(spaces_found): + derivs_cache['bold_outputspaces'] = bold_outputspaces + derivs_cache['bold_mask_outputspaces'] = bold_mask_outputspaces + else: + # The requested spaces were not found, try to find transforms + print( + 'Not all requested output spaces were found. ' + 'We will try to find transforms to these spaces and apply them to the BOLD data.', + flush=True, + ) + + spaces_found, anat2outputspaces_xfm = [], [] + for space in spaces.references: + base_file = derivs_cache['anat2mni152nlin6asym'] + base_file = layout.get_file(base_file) + # Now try to find transform to the requested space, using the + # entities from the transform to MNI152NLin6Asym + anat2space_query = base_file.entities + anat2space_query['to'] = space.space + item = layout.get(return_type='filename', **anat2space_query) + anat2outputspaces_xfm.append(item[0] if item else None) + spaces_found.append(bool(item)) + + if all(spaces_found): + derivs_cache['anat2outputspaces_xfm'] = anat2outputspaces_xfm + else: + missing_spaces = ', '.join( + [ + s.space + for s, found in zip(spaces.references, spaces_found, strict=False) + if not found + ] + ) + raise ValueError( + f'Transforms to the following requested spaces not found: {missing_spaces}.' + ) + + # Search for raw BOLD data + if not derivs_cache and raw_dataset is not None: + if isinstance(raw_dataset, Path): + raw_layout = BIDSLayout(raw_dataset, config=['bids'], validate=False) + else: + raw_layout = raw_dataset + + for k, q in spec['raw'].items(): + # Combine entities with query. Query values override file entities. + query = {**entities, **q} + item = raw_layout.get(return_type='filename', **query) + if not item: + derivs_cache[k] = None + elif not allow_multiple and len(item) > 1: + raise ValueError(f'Multiple files found for {k}: {item}') + else: + derivs_cache[k] = item[0] if len(item) == 1 else item + + return derivs_cache + + +def write_bidsignore(deriv_dir): + bids_ignore = ( + '*.html', + 'logs/', + 'figures/', # Reports + '*_xfm.*', # Unspecified transform files + '*.surf.gii', # Unspecified structural outputs + # Unspecified functional outputs + '*_boldref.nii.gz', + '*_bold.func.gii', + '*_mixing.tsv', + '*_timeseries.tsv', + ) + ignore_file = Path(deriv_dir) / '.bidsignore' + + ignore_file.write_text('\n'.join(bids_ignore) + '\n') + + +def write_derivative_description(input_dir, output_dir, dataset_links=None): + """Write dataset_description.json file for derivatives. + + Parameters + ---------- + input_dir : :obj:`str` + Path to the primary input dataset being ingested. + This may be a raw BIDS dataset (in the case of raw+derivatives workflows) + or a preprocessing derivatives dataset (in the case of derivatives-only workflows). + output_dir : :obj:`str` + Path to the output xcp-d dataset. + dataset_links : :obj:`dict`, optional + Dictionary of dataset links to include in the dataset description. + """ + import json + import os + + from packaging.version import Version + + from fmripost_tedana import __version__ + + DOWNLOAD_URL = f'https://github.com/nipreps/fmripost_tedana/archive/{__version__}.tar.gz' + + input_dir = Path(input_dir) + output_dir = Path(output_dir) + + orig_dset_description = os.path.join(input_dir, 'dataset_description.json') + if not os.path.isfile(orig_dset_description): + raise FileNotFoundError(f'Dataset description does not exist: {orig_dset_description}') + + with open(orig_dset_description) as fobj: + desc = json.load(fobj) + + # Update dataset description + desc['Name'] = 'fMRIPost-tedana' + desc['BIDSVersion'] = '1.9.0dev' + desc['DatasetType'] = 'derivative' + desc['HowToAcknowledge'] = 'Include the generated boilerplate in the methods section.' + + # Start with GeneratedBy from the primary input dataset's dataset_description.json + desc['GeneratedBy'] = desc.get('GeneratedBy', []) + + # Add GeneratedBy from derivatives' dataset_description.jsons + for name, link in enumerate(dataset_links): + if name not in ('templateflow', 'input'): + dataset_desc = Path(link) / 'dataset_description.json' + if dataset_desc.is_file(): + with open(dataset_desc) as fobj: + dataset_desc_dict = json.load(fobj) + + if 'GeneratedBy' in dataset_desc_dict: + desc['GeneratedBy'].insert(0, dataset_desc_dict['GeneratedBy'][0]) + + # Add GeneratedBy from fMRIPost-tedana + desc['GeneratedBy'].insert( + 0, + { + 'Name': 'fMRIPost-tedana', + 'Version': __version__, + 'CodeURL': DOWNLOAD_URL, }, - } + ) + + # Keys that can only be set by environment + if 'FMRIPOST_TEDANA_DOCKER_TAG' in os.environ: + desc['GeneratedBy'][0]['Container'] = { + 'Type': 'docker', + 'Tag': f'nipreps/fmripost_tedana:{os.environ["FMRIPOST_TEDANA_DOCKER_TAG"]}', + } + + if 'FMRIPOST_TEDANA__SINGULARITY_URL' in os.environ: + desc['GeneratedBy'][0]['Container'] = { + 'Type': 'singularity', + 'URI': os.getenv('FMRIPOST_TEDANA__SINGULARITY_URL'), + } - bids_file = layout.get_file(echo1_file) - # Apply filters. These may override anything. - for acq, entities in bids_filters.items(): - if acq in queries.keys(): - queries[acq].update(entities) - - run_data = { - dtype: layout.get_nearest( - bids_file.path, - return_type="file", - strict=True, - **query, - ) - for dtype, query in queries.items() + # Replace local templateflow path with URL + dataset_links = dataset_links.copy() + dataset_links['templateflow'] = 'https://github.com/templateflow/templateflow' + + # Add DatasetLinks + desc['DatasetLinks'] = desc.get('DatasetLinks', {}) + for k, v in dataset_links.items(): + if k in desc['DatasetLinks'].keys() and str(desc['DatasetLinks'][k]) != str(v): + print(f'"{k}" is already a dataset link. Overwriting.') + + desc['DatasetLinks'][k] = str(v) + + out_desc = Path(output_dir / 'dataset_description.json') + if out_desc.is_file(): + old_desc = json.loads(out_desc.read_text()) + old_version = old_desc['GeneratedBy'][0]['Version'] + if Version(__version__).public != Version(old_version).public: + print(f'Previous output generated by version {old_version} found.') + else: + out_desc.write_text(json.dumps(desc, indent=4)) + + +def validate_input_dir(exec_env, bids_dir, participant_label, need_T1w=True): + # Ignore issues and warnings that should not influence FMRIPREP + import subprocess + import sys + import tempfile + + validator_config_dict = { + 'ignore': [ + 'EVENTS_COLUMN_ONSET', + 'EVENTS_COLUMN_DURATION', + 'TSV_EQUAL_ROWS', + 'TSV_EMPTY_CELL', + 'TSV_IMPROPER_NA', + 'VOLUME_COUNT_MISMATCH', + 'BVAL_MULTIPLE_ROWS', + 'BVEC_NUMBER_ROWS', + 'DWI_MISSING_BVAL', + 'INCONSISTENT_SUBJECTS', + 'INCONSISTENT_PARAMETERS', + 'BVEC_ROW_LENGTH', + 'B_FILE', + 'PARTICIPANT_ID_COLUMN', + 'PARTICIPANT_ID_MISMATCH', + 'TASK_NAME_MUST_DEFINE', + 'PHENOTYPE_SUBJECTS_MISSING', + 'STIMULUS_FILE_MISSING', + 'DWI_MISSING_BVEC', + 'EVENTS_TSV_MISSING', + 'TSV_IMPROPER_NA', + 'ACQTIME_FMT', + 'Participants age 89 or higher', + 'DATASET_DESCRIPTION_JSON_MISSING', + 'FILENAME_COLUMN', + 'WRONG_NEW_LINE', + 'MISSING_TSV_COLUMN_CHANNELS', + 'MISSING_TSV_COLUMN_IEEG_CHANNELS', + 'MISSING_TSV_COLUMN_IEEG_ELECTRODES', + 'UNUSED_STIMULUS', + 'CHANNELS_COLUMN_SFREQ', + 'CHANNELS_COLUMN_LOWCUT', + 'CHANNELS_COLUMN_HIGHCUT', + 'CHANNELS_COLUMN_NOTCH', + 'CUSTOM_COLUMN_WITHOUT_DESCRIPTION', + 'ACQTIME_FMT', + 'SUSPICIOUSLY_LONG_EVENT_DESIGN', + 'SUSPICIOUSLY_SHORT_EVENT_DESIGN', + 'MALFORMED_BVEC', + 'MALFORMED_BVAL', + 'MISSING_TSV_COLUMN_EEG_ELECTRODES', + 'MISSING_SESSION', + ], + 'error': ['NO_T1W'] if need_T1w else [], + 'ignoredFiles': ['/dataset_description.json', '/participants.tsv'], } - # Add metadata to dictionary now. - run_data["EchoTimes"] = [layout.get_EchoTime(f) for f in run_data["echo_files"]] - - config.loggers.workflow.info( - ( - f"Collected run data for {echo1_file}:\n" - f"{yaml.dump(run_data, default_flow_style=False, indent=4)}" - ), - ) + # Limit validation only to data from requested participants + if participant_label: + all_subs = {s.name[4:] for s in bids_dir.glob('sub-*')} + selected_subs = {s[4:] if s.startswith('sub-') else s for s in participant_label} + bad_labels = selected_subs.difference(all_subs) + if bad_labels: + error_msg = ( + 'Data for requested participant(s) label(s) not found. Could ' + 'not find data for participant(s): %s. Please verify the requested ' + 'participant labels.' + ) + if exec_env == 'docker': + error_msg += ( + ' This error can be caused by the input data not being ' + 'accessible inside the docker container. Please make sure all ' + 'volumes are mounted properly (see https://docs.docker.com/' + 'engine/reference/commandline/run/#mount-volume--v---read-only)' + ) + if exec_env == 'singularity': + error_msg += ( + ' This error can be caused by the input data not being ' + 'accessible inside the singularity container. Please make sure ' + 'all paths are mapped properly (see https://www.sylabs.io/' + 'guides/3.0/user-guide/bind_paths_and_mounts.html)' + ) + raise RuntimeError(error_msg % ','.join(bad_labels)) - return run_data + ignored_subs = all_subs.difference(selected_subs) + if ignored_subs: + for sub in ignored_subs: + validator_config_dict['ignoredFiles'].append(f'/sub-{sub}/**') + with tempfile.NamedTemporaryFile(mode='w+', suffix='.json') as temp: + temp.write(json.dumps(validator_config_dict)) + temp.flush() + try: + subprocess.check_call(['bids-validator', str(bids_dir), '-c', temp.name]) # noqa: S607 + except FileNotFoundError: + print('bids-validator does not appear to be installed', file=sys.stderr) diff --git a/src/fmripost_tedana/utils/resampler.py b/src/fmripost_tedana/utils/resampler.py new file mode 100644 index 0000000..5764226 --- /dev/null +++ b/src/fmripost_tedana/utils/resampler.py @@ -0,0 +1,763 @@ +"""Resampler methods for fMRI data.""" + +from __future__ import annotations + +import asyncio +import os +from collections.abc import Callable +from functools import partial +from pathlib import Path +from typing import Annotated, TypeVar + +import h5py +import nibabel as nb +import nitransforms as nt +import niworkflows.data +import numpy as np +import typer +from bids import BIDSLayout +from nitransforms.io.itk import ITKCompositeH5 +from scipy import ndimage as ndi +from scipy.sparse import hstack as sparse_hstack +from sdcflows.transform import grid_bspline_weights +from sdcflows.utils.tools import ensure_positive_cosines +from templateflow import api as tf + +R = TypeVar('R') + +nipreps_cfg = niworkflows.data.load('nipreps.json') + + +def find_bids_root(path: Path) -> Path: + for parent in path.parents: + if Path.exists(parent / 'dataset_description.json'): + return parent + raise ValueError(f'Cannot detect BIDS dataset containing {path}') + + +def resample_vol( + data: np.ndarray, + coordinates: np.ndarray, + pe_info: tuple[int, float], + hmc_xfm: np.ndarray | None, + fmap_hz: np.ndarray, + output: np.dtype | np.ndarray | None = None, + order: int = 3, + mode: str = 'constant', + cval: float = 0.0, + prefilter: bool = True, +) -> np.ndarray: + """Resample a volume at specified coordinates + + This function implements simultaneous head-motion correction and + susceptibility-distortion correction. It accepts coordinates in + the source voxel space. It is the responsibility of the caller to + transform coordinates from any other target space. + + Parameters + ---------- + data + The data array to resample + coordinates + The first-approximation voxel coordinates to sample from ``data`` + The first dimension should have length ``data.ndim``. The further + dimensions have the shape of the target array. + pe_info + The readout vector in the form of (axis, signed-readout-time) + ``(1, -0.04)`` becomes ``[0, -0.04, 0]``, which indicates that a + +1 Hz deflection in the field shifts 0.04 voxels toward the start + of the data array in the second dimension. + hmc_xfm + Affine transformation accounting for head motion from the individual + volume into the BOLD reference space. This affine must be in VOX2VOX + form. + fmap_hz + The fieldmap, sampled to the target space, in Hz + output + The dtype or a pre-allocated array for sampling into the target space. + If pre-allocated, ``output.shape == coordinates.shape[1:]``. + order + Order of interpolation (default: 3 = cubic) + mode + How ``data`` is extended beyond its boundaries. See + :func:`scipy.ndimage.map_coordinates` for more details. + cval + Value to fill past edges of ``data`` if ``mode`` is ``'constant'``. + prefilter + Determines if ``data`` is pre-filtered before interpolation. + + Returns + ------- + resampled_array + The resampled array, with shape ``coordinates.shape[1:]``. + """ + if hmc_xfm is not None: + # Move image with the head + coords_shape = coordinates.shape + coordinates = nb.affines.apply_affine( + hmc_xfm, coordinates.reshape(coords_shape[0], -1).T + ).T.reshape(coords_shape) + else: + # Copy coordinates to avoid interfering with other calls + coordinates = coordinates.copy() + + vsm = fmap_hz * pe_info[1] + coordinates[pe_info[0], ...] += vsm + + jacobian = 1 + np.gradient(vsm, axis=pe_info[0]) + + result = ndi.map_coordinates( + data, + coordinates, + output=output, + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, + ) + result *= jacobian + return result + + +async def worker(job: Callable[[], R], semaphore) -> R: + async with semaphore: + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, job) + + +async def resample_series_async( + data: np.ndarray, + coordinates: np.ndarray, + pe_info: list[tuple[int, float]], + hmc_xfms: list[np.ndarray] | None, + fmap_hz: np.ndarray, + output_dtype: np.dtype | None = None, + order: int = 3, + mode: str = 'constant', + cval: float = 0.0, + prefilter: bool = True, + max_concurrent: int = min(os.cpu_count(), 12), +) -> np.ndarray: + """Resample a 4D time series at specified coordinates + + This function implements simultaneous head-motion correction and + susceptibility-distortion correction. It accepts coordinates in + the source voxel space. It is the responsibility of the caller to + transform coordinates from any other target space. + + Parameters + ---------- + data + The data array to resample + coordinates + The first-approximation voxel coordinates to sample from ``data``. + The first dimension should have length 3. + The further dimensions determine the shape of the target array. + pe_info + A list of readout vectors in the form of (axis, signed-readout-time) + ``(1, -0.04)`` becomes ``[0, -0.04, 0]``, which indicates that a + +1 Hz deflection in the field shifts 0.04 voxels toward the start + of the data array in the second dimension. + hmc_xfm + A sequence of affine transformations accounting for head motion from + the individual volume into the BOLD reference space. + These affines must be in VOX2VOX form. + fmap_hz + The fieldmap, sampled to the target space, in Hz + output_dtype + The dtype of the output array. + order + Order of interpolation (default: 3 = cubic) + mode + How ``data`` is extended beyond its boundaries. See + :func:`scipy.ndimage.map_coordinates` for more details. + cval + Value to fill past edges of ``data`` if ``mode`` is ``'constant'``. + prefilter + Determines if ``data`` is pre-filtered before interpolation. + max_concurrent + Maximum number of volumes to resample concurrently + + Returns + ------- + resampled_array + The resampled array, with shape ``coordinates.shape[1:] + (N,)``, + where N is the number of volumes in ``data``. + """ + if data.ndim == 3: + return resample_vol( + data, + coordinates, + pe_info[0], + hmc_xfms[0] if hmc_xfms else None, + fmap_hz, + output_dtype, + order, + mode, + cval, + prefilter, + ) + + semaphore = asyncio.Semaphore(max_concurrent) + + # Order F ensures individual volumes are contiguous in memory + # Also matches NIfTI, making final save more efficient + out_array = np.zeros(coordinates.shape[1:] + data.shape[-1:], dtype=output_dtype, order='F') + + tasks = [ + asyncio.create_task( + worker( + partial( + resample_vol, + data=volume, + coordinates=coordinates, + pe_info=pe_info[volid], + hmc_xfm=hmc_xfms[volid] if hmc_xfms else None, + fmap_hz=fmap_hz, + output=out_array[..., volid], + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, + ), + semaphore, + ) + ) + for volid, volume in enumerate(np.rollaxis(data, -1, 0)) + ] + + await asyncio.gather(*tasks) + + return out_array + + +def resample_series( + data: np.ndarray, + coordinates: np.ndarray, + pe_info: list[tuple[int, float]], + hmc_xfms: list[np.ndarray] | None, + fmap_hz: np.ndarray, + output_dtype: np.dtype | None = None, + order: int = 3, + mode: str = 'constant', + cval: float = 0.0, + prefilter: bool = True, + nthreads: int = 1, +) -> np.ndarray: + """Resample a 4D time series at specified coordinates + + This function implements simultaneous head-motion correction and + susceptibility-distortion correction. It accepts coordinates in + the source voxel space. It is the responsibility of the caller to + transform coordinates from any other target space. + + Parameters + ---------- + data + The data array to resample + coordinates + The first-approximation voxel coordinates to sample from ``data``. + The first dimension should have length 3. + The further dimensions determine the shape of the target array. + pe_info + A list of readout vectors in the form of (axis, signed-readout-time) + ``(1, -0.04)`` becomes ``[0, -0.04, 0]``, which indicates that a + +1 Hz deflection in the field shifts 0.04 voxels toward the start + of the data array in the second dimension. + hmc_xfm + A sequence of affine transformations accounting for head motion from + the individual volume into the BOLD reference space. + These affines must be in VOX2VOX form. + fmap_hz + The fieldmap, sampled to the target space, in Hz + output_dtype + The dtype of the output array. + order + Order of interpolation (default: 3 = cubic) + mode + How ``data`` is extended beyond its boundaries. See + :func:`scipy.ndimage.map_coordinates` for more details. + cval + Value to fill past edges of ``data`` if ``mode`` is ``'constant'``. + prefilter + Determines if ``data`` is pre-filtered before interpolation. + + Returns + ------- + resampled_array + The resampled array, with shape ``coordinates.shape[1:] + (N,)``, + where N is the number of volumes in ``data``. + """ + return asyncio.run( + resample_series_async( + data=data, + coordinates=coordinates, + pe_info=pe_info, + hmc_xfms=hmc_xfms, + fmap_hz=fmap_hz, + output_dtype=output_dtype, + order=order, + mode=mode, + cval=cval, + prefilter=prefilter, + max_concurrent=nthreads, + ) + ) + + +def parse_combined_hdf5(h5_fn, to_ras=True): + # Borrowed from https://github.com/feilong/process + # process.resample.parse_combined_hdf5() + h = h5py.File(h5_fn) + xform = ITKCompositeH5.from_h5obj(h) + affine = xform[0].to_ras() + # Confirm these transformations are applicable + if h['TransformGroup']['2']['TransformType'][:][0] != b'DisplacementFieldTransform_float_3_3': + raise ValueError('Unsupported transform type') + + if not np.array_equal( + h['TransformGroup']['2']['TransformFixedParameters'][:], + np.array( + [ + 193.0, + 229.0, + 193.0, + 96.0, + 132.0, + -78.0, + 1.0, + 1.0, + 1.0, + -1.0, + 0.0, + 0.0, + 0.0, + -1.0, + 0.0, + 0.0, + 0.0, + 1.0, + ] + ), + ): + raise ValueError('Unsupported fixed parameters') + + warp = h['TransformGroup']['2']['TransformParameters'][:] + warp = warp.reshape((193, 229, 193, 3)).transpose(2, 1, 0, 3) + warp *= np.array([-1, -1, 1]) + warp_affine = np.array( + [ + [1.0, 0.0, 0.0, -96.0], + [0.0, 1.0, 0.0, -132.0], + [0.0, 0.0, 1.0, -78.0], + [0.0, 0.0, 0.0, 1.0], + ] + ) + return affine, warp, warp_affine + + +def load_ants_h5(filename: Path) -> nt.TransformChain: + """Load ANTs H5 files as a nitransforms TransformChain""" + affine, warp, warp_affine = parse_combined_hdf5(filename) + warp_transform = nt.DenseFieldTransform(nb.Nifti1Image(warp, warp_affine)) + return nt.TransformChain([warp_transform, nt.Affine(affine)]) + + +def load_transforms(xfm_paths: list[Path]) -> nt.base.TransformBase: + """Load a series of transforms as a nitransforms TransformChain + + An empty list will return an identity transform + """ + chain = None + for path in xfm_paths[::-1]: + path = Path(path) + if path.suffix == '.h5': + xfm = load_ants_h5(path) + else: + xfm = nt.linear.load(path) + if chain is None: + chain = xfm + else: + chain += xfm + if chain is None: + chain = nt.base.TransformBase() + return chain + + +def aligned(aff1: np.ndarray, aff2: np.ndarray) -> bool: + """Determine if two affines have aligned grids""" + return np.allclose( + np.linalg.norm(np.cross(aff1[:-1, :-1].T, aff2[:-1, :-1].T), axis=1), + 0, + atol=1e-3, + ) + + +def as_affine(xfm: nt.base.TransformBase) -> nt.Affine | None: + # Identity transform + if type(xfm) is nt.base.TransformBase: + return nt.Affine() + + if isinstance(xfm, nt.Affine): + return xfm + + if isinstance(xfm, nt.TransformChain) and all(isinstance(x, nt.Affine) for x in xfm): + return xfm.asaffine() + + return None + + +def resample_fieldmap( + coefficients: list[nb.Nifti1Image], + fmap_reference: nb.Nifti1Image, + target: nb.Nifti1Image, + transforms: nt.TransformChain, +) -> nb.Nifti1Image: + """Resample a fieldmap from B-Spline coefficients into a target space + + If the coefficients and target are aligned, the field is reconstructed + directly in the target space. + If not, then the field is reconstructed to the ``fmap_reference`` + resolution, and then resampled according to transforms. + + The former method only applies if the transform chain can be + collapsed to a single affine transform. + + Parameters + ---------- + coefficients + list of B-spline coefficient files. The affine matrices are used + to reconstruct the knot locations. + fmap_reference + The intermediate reference to reconstruct the fieldmap in, if + it cannot be reconstructed directly in the target space. + target + The target space to to resample the fieldmap into. + transforms + A nitransforms TransformChain that maps images from the fieldmap + space into the target space. + + Returns + ------- + fieldmap + The fieldmap encoded in ``coefficients``, resampled in the same + space as ``target`` + """ + + direct = False + affine_xfm = as_affine(transforms) + if affine_xfm is not None: + # Transforms maps RAS coordinates in the target to RAS coordinates in + # the fieldmap space. Composed with target.affine, we have a target voxel + # to fieldmap RAS affine. Hence, this is projected into fieldmap space. + projected_affine = affine_xfm.matrix @ target.affine + # If the coordinates have the same rotation from voxels, we can construct + # bspline weights efficiently. + direct = aligned(projected_affine, coefficients[-1].affine) + + if direct: + reference, _ = ensure_positive_cosines( + target.__class__(target.dataobj, projected_affine, target.header), + ) + else: + if not aligned(fmap_reference.affine, coefficients[-1].affine): + raise ValueError('Reference passed is not aligned with spline grids') + reference, _ = ensure_positive_cosines(fmap_reference) + + # Generate tensor-product B-Spline weights + colmat = sparse_hstack( + [grid_bspline_weights(reference, level) for level in coefficients] + ).tocsr() + coefficients = np.hstack( + [level.get_fdata(dtype='float32').reshape(-1) for level in coefficients] + ) + + # Reconstruct the fieldmap (in Hz) from coefficients + fmap_img = nb.Nifti1Image( + np.reshape(colmat @ coefficients, reference.shape[:3]), + reference.affine, + ) + + if not direct: + fmap_img = transforms.apply(fmap_img, reference=target) + + fmap_img.header.set_intent('estimate', name='fieldmap Hz') + fmap_img.header.set_data_dtype('float32') + fmap_img.header['cal_max'] = max((abs(fmap_img.dataobj.min()), fmap_img.dataobj.max())) + fmap_img.header['cal_min'] = -fmap_img.header['cal_max'] + + return fmap_img + + +def resample_bold( + source: nb.Nifti1Image, + target: nb.Nifti1Image, + transforms: nt.TransformChain, + fieldmap: nb.Nifti1Image | None, + pe_info: list[tuple[int, float]] | None, + nthreads: int = 1, +) -> nb.Nifti1Image: + """Resample a 4D bold series into a target space, applying head-motion + and susceptibility-distortion correction simultaneously. + + Parameters + ---------- + source + The 4D bold series to resample. + target + An image sampled in the target space. + transforms + A nitransforms TransformChain that maps images from the individual + BOLD volume space into the target space. + fieldmap + The fieldmap, in Hz, sampled in the target space + pe_info + A list of readout vectors in the form of (axis, signed-readout-time) + ``(1, -0.04)`` becomes ``[0, -0.04, 0]``, which indicates that a + +1 Hz deflection in the field shifts 0.04 voxels toward the start + of the data array in the second dimension. + nthreads + Number of threads to use for parallel resampling + + Returns + ------- + resampled_bold + The BOLD series resampled into the target space + """ + # HMC goes last + if not isinstance(transforms[-1], nt.linear.LinearTransformsMapping): + raise ValueError('Last transform must be a linear mapping') + + # Retrieve the RAS coordinates of the target space + coordinates = nt.base.SpatialReference.factory(target).ndcoords.astype('f4').T + + # We will operate in voxel space, so get the source affine + vox2ras = source.affine + ras2vox = np.linalg.inv(vox2ras) + # Transform RAS2RAS head motion transforms to VOX2VOX + hmc_xfms = [ras2vox @ xfm.matrix @ vox2ras for xfm in transforms[-1]] + + # Remove the head-motion transforms and add a mapping from boldref + # world space to voxels. This new transform maps from world coordinates + # in the target space to voxel coordinates in the source space. + ref2vox = nt.TransformChain(transforms[:-1] + [nt.Affine(ras2vox)]) + mapped_coordinates = ref2vox.map(coordinates) + + # Some identities to reduce special casing downstream + if fieldmap is None: + fieldmap = nb.Nifti1Image(np.zeros(target.shape[:3], dtype='f4'), target.affine) + if pe_info is None: + pe_info = [[0, 0] for _ in range(source.shape[-1])] + + resampled_data = resample_series( + data=source.get_fdata(dtype='f4'), + coordinates=mapped_coordinates.T.reshape((3, *target.shape[:3])), + pe_info=pe_info, + hmc_xfms=hmc_xfms, + fmap_hz=fieldmap.get_fdata(dtype='f4'), + output_dtype='f4', + nthreads=nthreads, + ) + resampled_img = nb.Nifti1Image(resampled_data, target.affine, target.header) + resampled_img.set_data_dtype('f4') + + return resampled_img + + +def genref( + source_img: nb.Nifti1Image, + target_zooms: float | tuple[float, float, float], +) -> nb.Nifti1Image: + """Create a reference image with target voxel sizes, preserving + the original field of view + """ + factor = np.array(target_zooms) / source_img.header.get_zooms()[:3] + # Generally round up to the nearest voxel, but not for slivers of voxels + target_shape = np.ceil(np.array(source_img.shape[:3]) / factor - 0.01) + target_affine = nb.affines.rescale_affine( + source_img.affine, source_img.shape, target_zooms, target_shape + ) + return nb.Nifti1Image( + nb.fileslice.strided_scalar(target_shape.astype(int)), + target_affine, + source_img.header, + ) + + +def mkents(source, target, **entities): + """Helper to create entity query for transforms""" + return {'from': source, 'to': target, 'suffix': 'xfm', **entities} + + +def main( + bold_file: Path, + derivs_path: Path, + output_dir: Path, + space: Annotated[str, typer.Option(help='Target space to resample to')], + resolution: Annotated[str, typer.Option(help='Target resolution')] = None, + nthreads: Annotated[ + int, + typer.Option(help='Number of resampling threads (0 for all cores)'), + ] = 1, +): + """Resample a bold file to a target space using the transforms found + in a derivatives directory. + """ + bids_root = find_bids_root(bold_file) + raw = BIDSLayout(bids_root) + derivs = BIDSLayout(derivs_path, config=[nipreps_cfg], validate=False) + + if resolution is not None: + zooms = tuple(int(dim) for dim in resolution.split('x')) + if len(zooms) not in (1, 3): + raise ValueError(f'Unknown resolution: {resolution}') + + cpu_count = os.cpu_count() + if nthreads < 1: + nthreads = cpu_count + elif nthreads > cpu_count: + print(f'Warning: More threads requested ({nthreads}) than cores ({cpu_count})') + + bold = raw.files[str(bold_file)] + bold_meta = bold.get_metadata() + entities = bold.get_entities() + entities.pop('datatype') + entities.pop('extension') + entities.pop('suffix') + + bold_xfms = [] + fmap_xfms = [] + + try: + hmc = derivs.get(extension='.txt', **mkents('orig', 'boldref', **entities))[0] + except IndexError as err: + raise ValueError('Could not find HMC transforms') from err + + bold_xfms.append(hmc) + + if space == 'boldref': + reference = derivs.get(desc='coreg', suffix='boldref', extension='.nii.gz', **entities)[0] + else: + try: + coreg = derivs.get(extension='.txt', **mkents('boldref', 'T1w', **entities))[0] + except IndexError as err: + raise ValueError('Could not find coregistration transform') from err + + bold_xfms.append(coreg) + fmap_xfms.append(coreg) + + if space in ('anat', 'T1w'): + reference = derivs.get( + subject=entities['subject'], + desc='preproc', + suffix='T1w', + extension='.nii.gz', + )[0] + if resolution is not None: + ref_img = genref(nb.load(reference), zooms) + elif space not in ('anat', 'boldref', 'T1w'): + try: + template_reg = derivs.get( + datatype='anat', + extension='.h5', + subject=entities['subject'], + **mkents('T1w', space), + )[0] + except IndexError as err: + raise ValueError(f'Could not find template registration for {space}') from err + + bold_xfms.append(template_reg) + fmap_xfms.append(template_reg) + + # Get mask, as shape/affine is all we need + reference = tf.get( + template=space, + extension='.nii.gz', + desc='brain', + suffix='mask', + resolution=resolution, + ) + if not reference: + # Get a hires image to resample + reference = tf.get( + template=space, + extension='.nii.gz', + desc='brain', + suffix='mask', + resolution='1', + ) + ref_img = genref(nb.load(reference), zooms) + + fmapregs = derivs.get(extension='.txt', **mkents('boldref', derivs.get_fmapids(), **entities)) + if not fmapregs: + print('No fieldmap registrations found') + elif len(fmapregs) > 1: + raise ValueError(f'Found fieldmap registrations: {fmapregs}\nPass one as an argument.') + + fieldmap = None + if fmapregs: + fmapreg = fmapregs[0] + fmapid = fmapregs[0].entities['to'] + fieldmap_coeffs = derivs.get( + fmapid=fmapid, + desc=['coeff', 'coeff0', 'coeff1'], + extension='.nii.gz', + ) + fmapref = derivs.get( + fmapid=fmapid, + desc='preproc', + extension='.nii.gz', + )[0] + transforms = load_transforms(fmap_xfms) + # We get an inverse transform, so need to add it separately + fmap_xfms.insert(0, fmapreg) + transforms += ~nt.linear.load(Path(fmapreg)) + print(transforms.transforms) + + print(f'Resampling fieldmap {fmapid} into {space}:{resolution}') + print('Coefficients:') + print('\n'.join(f'\t{Path(c).name}' for c in fieldmap_coeffs)) + print(f'Reference: {Path(reference).name}') + print('Transforms:') + print('\n'.join(f'\t{Path(xfm).name}' for xfm in fmap_xfms)) + fieldmap = resample_fieldmap( + coefficients=[nb.load(coeff) for coeff in fieldmap_coeffs], + fmap_reference=nb.load(fmapref), + target=ref_img, + transforms=transforms, + ) + fieldmap.to_filename(output_dir / f'{fmapid}.nii.gz') + + pe_dir = bold_meta['PhaseEncodingDirection'] + ro_time = bold_meta['TotalReadoutTime'] + pe_axis = 'ijk'.index(pe_dir[0]) + pe_flip = pe_dir.endswith('-') + + bold_img = nb.load(bold_file) + source, axcodes = ensure_positive_cosines(bold_img) + axis_flip = axcodes[pe_axis] in 'LPI' + + pe_info = (pe_axis, -ro_time if (axis_flip ^ pe_flip) else ro_time) + + if ref_img is None: + ref_img = nb.load(reference) + + print() + print(f'Resampling BOLD {bold_file.name} ({pe_info})') + print(f'Reference: {Path(reference).name}') + print('Transforms:') + print('\n'.join(f'\t{Path(xfm).name}' for xfm in bold_xfms)) + output_file = output_dir / bold_file.name + resample_bold( + source=source, + target=ref_img, + transforms=load_transforms(bold_xfms), + fieldmap=fieldmap, + pe_info=[pe_info for _ in range(source.shape[-1])], + nthreads=nthreads, + ).to_filename(output_file) + return output_file + + +if __name__ == '__main__': + typer.run(main) diff --git a/src/fmripost_tedana/utils/utils.py b/src/fmripost_tedana/utils/utils.py new file mode 100644 index 0000000..41240d0 --- /dev/null +++ b/src/fmripost_tedana/utils/utils.py @@ -0,0 +1,492 @@ +"""Utility functions for ICA-AROMA.""" + +import json +import logging +import os.path as op +import shutil + +import nibabel as nb +import numpy as np +import pandas as pd +from nilearn import masking +from nilearn._utils import load_niimg + +# Define criteria needed for classification (thresholds and +# hyperplane-parameters) +THR_CSF = 0.10 +THR_HFC = 0.35 +HYPERPLANE = np.array([-19.9751070082159, 9.95127547670627, 24.8333160239175]) + +LGR = logging.getLogger(__name__) + + +def cross_correlation(a, b): + """Perform cross-correlations between columns of two matrices. + + Parameters + ---------- + a : (M x X) array_like + First array to cross-correlate + b : (N x X) array_like + Second array to cross-correlate + + Returns + ------- + correlations : (M x N) array_like + Cross-correlations of columns of a against columns of b. + """ + if a.ndim != 2: + raise ValueError(f'Input `a` must be 2D, not {a.ndim}D') + + if b.ndim != 2: + raise ValueError(f'Input `b` must be 2D, not {b.ndim}D') + + _, ncols_a = a.shape + # nb variables in columns rather than rows hence transpose + # extract just the cross terms between cols in a and cols in b + return np.corrcoef(a.T, b.T)[:ncols_a, ncols_a:] + + +def classification(features_df: pd.DataFrame): + """Classify components as motion or non-motion based on four features. + + The four features used for classification are: maximum RP correlation, + high-frequency content, edge-fraction, and CSF-fraction. + + Parameters + ---------- + features_df : (C x 4) :obj:`pandas.DataFrame` + DataFrame with the following columns: + "edge_fract", "csf_fract", "max_RP_corr", and "HFC". + + Returns + ------- + clf_df + clf_metadata + """ + clf_metadata = { + 'classification': { + 'LongName': 'Component classification', + 'Description': ('Classification from the classification procedure.'), + 'Levels': { + 'accepted': 'A component that is determined not to be associated with motion.', + 'rejected': 'A motion-related component.', + }, + }, + 'rationale': { + 'LongName': 'Rationale for component classification', + 'Description': ( + 'The reason for the classification. ' + 'In cases where components are classified based on more than one criterion, ' + 'they are listed sequentially, separated by semicolons.' + ), + 'Levels': { + 'CSF': f'The csf_fract value is higher than {THR_CSF}', + 'HFC': f'The HFC value is higher than {THR_HFC}', + 'hyperplane': ( + 'After the max_RP_corr and edge_fract values are projected ' + 'to a hyperplane, the projected point is less than zero.' + ), + }, + }, + } + + # Classify the ICs as motion (rejected) or non-motion (accepted) + clf_df = pd.DataFrame(index=features_df.index, columns=['classification', 'rationale']) + clf_df['classification'] = 'accepted' + clf_df['rationale'] = '' + + # CSF + rej_csf = features_df['csf_fract'] > THR_CSF + clf_df.loc[rej_csf, 'classification'] = 'rejected' + clf_df.loc[rej_csf, 'rationale'] += 'CSF;' + + # HFC + rej_hfc = features_df['HFC'] > THR_HFC + clf_df.loc[rej_hfc, 'classification'] = 'rejected' + clf_df.loc[rej_hfc, 'rationale'] += 'HFC;' + + # Hyperplane + # Project edge & max_RP_corr feature scores to new 1D space + x = features_df[['max_RP_corr', 'edge_fract']].values + proj = HYPERPLANE[0] + np.dot(x, HYPERPLANE[1:]) + rej_hyperplane = proj > 0 + clf_df.loc[rej_hyperplane, 'classification'] = 'rejected' + clf_df.loc[rej_hyperplane, 'rationale'] += 'hyperplane;' + + # Check the classifications + is_motion = (features_df['csf_fract'] > THR_CSF) | (features_df['HFC'] > THR_HFC) | (proj > 0) + if not np.array_equal(is_motion, (clf_df['classification'] == 'rejected').values): + raise ValueError('Classification error: classifications do not match criteria.') + + # Remove trailing semicolons + clf_df['rationale'] = clf_df['rationale'].str.rstrip(';') + + return clf_df, clf_metadata + + +def write_metrics(features_df, out_dir, metric_metadata=None): + """Write out feature/classification information and metadata. + + Parameters + ---------- + features_df : (C x 5) :obj:`pandas.DataFrame` + DataFrame with metric values and classifications. + Must have the following columns: "edge_fract", "csf_fract", "max_RP_corr", "HFC", and + "classification". + out_dir : :obj:`str` + Output directory. + metric_metadata : :obj:`dict` or None, optional + Metric metadata in a dictionary. + + Returns + ------- + motion_ICs : array_like + Array containing the indices of the components identified as motion components. + + Output + ------ + AROMAnoiseICs.csv : A text file containing the indices of the + components identified as motion components + desc-AROMA_metrics.tsv + desc-AROMA_metrics.json + """ + # Put the indices of motion-classified ICs in a text file (starting with 1) + motion_ICs = features_df['classification'][features_df['classification'] == 'rejected'].index + motion_ICs = motion_ICs.values + + with open(op.join(out_dir, 'AROMAnoiseICs.csv'), 'w') as file_obj: + out_str = ','.join(motion_ICs.astype(str)) + file_obj.write(out_str) + + # Create a summary overview of the classification + out_file = op.join(out_dir, 'desc-AROMA_metrics.tsv') + features_df.to_csv(out_file, sep='\t', index_label='IC') + + if isinstance(metric_metadata, dict): + with open(op.join(out_dir, 'desc-AROMA_metrics.json'), 'w') as file_obj: + json.dump(metric_metadata, file_obj, sort_keys=True, indent=4) + + return motion_ICs + + +def denoising(in_file, out_dir, mixing, den_type, den_idx): + """Remove noise components from fMRI data. + + Parameters + ---------- + in_file : str + Full path to the data file (nii.gz) which has to be denoised + out_dir : str + Full path of the output directory + mixing : numpy.ndarray of shape (T, C) + Mixing matrix. + den_type : {"aggr", "nonaggr", "both"} + Type of requested denoising ('aggr': aggressive, 'nonaggr': + non-aggressive, 'both': both aggressive and non-aggressive + den_idx : array_like + Index of the components that should be regressed out + + Output + ------ + desc-smoothAROMA_bold.nii.gz : The denoised fMRI data + """ + # Check if denoising is needed (i.e. are there motion components?) + motion_components_found = den_idx.size > 0 + + nonaggr_denoised_file = op.join(out_dir, 'desc-smoothAROMAnonaggr_bold.nii.gz') + aggr_denoised_file = op.join(out_dir, 'desc-smoothAROMAaggr_bold.nii.gz') + + if motion_components_found: + motion_components = mixing[:, den_idx] + + # Create a fake mask to make it easier to reshape the full data to 2D + img = load_niimg(in_file) + full_mask = nb.Nifti1Image(np.ones(img.shape[:3], int), img.affine) + data = masking.apply_mask(img, full_mask) # T x S + + # Non-aggressive denoising of the data using fsl_regfilt + # (partial regression), if requested + if den_type in ('nonaggr', 'both'): + # Fit GLM to all components + betas = np.linalg.lstsq(mixing, data, rcond=None)[0] + + # Denoise the data using the betas from just the bad components. + pred_data = np.dot(motion_components, betas[den_idx, :]) + data_denoised = data - pred_data + + # Save to file. + img_denoised = masking.unmask(data_denoised, full_mask) + img_denoised.to_filename(nonaggr_denoised_file) + + # Aggressive denoising of the data using fsl_regfilt (full regression) + if den_type in ('aggr', 'both'): + # Denoise the data with the bad components. + betas = np.linalg.lstsq(motion_components, data, rcond=None)[0] + pred_data = np.dot(motion_components, betas) + data_denoised = data - pred_data + + # Save to file. + img_denoised = masking.unmask(data_denoised, full_mask) + img_denoised.to_filename(aggr_denoised_file) + else: + LGR.warning( + ' - None of the components were classified as motion, ' + 'so no denoising is applied (the input file is copied ' + 'as-is).' + ) + if den_type in ('nonaggr', 'both'): + shutil.copyfile(in_file, nonaggr_denoised_file) + + if den_type in ('aggr', 'both'): + shutil.copyfile(in_file, aggr_denoised_file) + + +def motpars_fmriprep2fsl(confounds): + """Convert fMRIPrep motion parameters to FSL format. + + Parameters + ---------- + confounds : str or pandas.DataFrame + Confounds data from fMRIPrep. + Relevant columns have the format "[rot|trans]_[x|y|z]". + Rotations are in radians. + + Returns + ------- + motpars_fsl : (T x 6) numpy.ndarray + Motion parameters in FSL format, with rotations first (in radians) and + translations second. + """ + if isinstance(confounds, str) and op.isfile(confounds): + confounds = pd.read_table(confounds) + elif not isinstance(confounds, pd.DataFrame): + raise ValueError('Input must be an existing file or a DataFrame.') + + # Rotations are in radians + motpars_fsl = confounds[['rot_x', 'rot_y', 'rot_z', 'trans_x', 'trans_y', 'trans_z']].values + return motpars_fsl + + +def motpars_spm2fsl(motpars): + """Convert SPM format motion parameters to FSL format. + + Parameters + ---------- + motpars : str or array_like + SPM-format motion parameters. + Rotations are in degrees and translations come first. + + Returns + ------- + motpars_fsl : (T x 6) numpy.ndarray + Motion parameters in FSL format, with rotations first (in radians) and + translations second. + """ + if isinstance(motpars, str) and op.isfile(motpars): + motpars = np.loadtxt(motpars) + elif not isinstance(motpars, np.ndarray): + raise ValueError('Input must be an existing file or a numpy array.') + + if motpars.shape[1] != 6: + raise ValueError(f'Motion parameters must have exactly 6 columns, not {motpars.shape[1]}.') + + # Split translations from rotations + trans, rot = motpars[:, :3], motpars[:, 3:] + + # Convert rotations from degrees to radians + rot *= np.pi / 180.0 + + # Place rotations first + motpars_fsl = np.hstack((rot, trans)) + return motpars_fsl + + +def motpars_afni2fsl(motpars): + """Convert AFNI format motion parameters to FSL format. + + Parameters + ---------- + motpars : str or array_like + AfNI-format motion parameters in 1D file. + Rotations are in degrees and translations come first. + + Returns + ------- + motpars_fsl : (T x 6) numpy.ndarray + Motion parameters in FSL format, with rotations first (in radians) and + translations second. + """ + if isinstance(motpars, str) and op.isfile(motpars): + motpars = np.loadtxt(motpars) + elif not isinstance(motpars, np.ndarray): + raise ValueError('Input must be an existing file or a numpy array.') + + if motpars.shape[1] != 6: + raise ValueError(f'Motion parameters must have exactly 6 columns, not {motpars.shape[1]}.') + + # Split translations from rotations + trans, rot = motpars[:, :3], motpars[:, 3:] + + # Convert rotations from degrees to radians + rot *= np.pi / 180.0 + + # Place rotations first + motpars_fsl = np.hstack((rot, trans)) + return motpars_fsl + + +def load_motpars(motion_file, source='auto'): + """Load motion parameters from file. + + Parameters + ---------- + motion_file : str + Motion file. + source : {"auto", "spm", "afni", "fsl", "fmriprep"}, optional + Source of the motion data. + If "auto", try to deduce the source based on the name of the file. + + Returns + ------- + motpars : (T x 6) numpy.ndarray + Motion parameters in FSL format, with rotations first (in radians) and + translations second. + """ + if source == 'auto': + if op.basename(motion_file).startswith('rp_') and motion_file.endswith('.txt'): + source = 'spm' + elif motion_file.endswith('.1D'): + source = 'afni' + elif motion_file.endswith('.tsv'): + source = 'fmriprep' + elif motion_file.endswith('.txt'): + source = 'fsl' + else: + raise Exception('Motion parameter source could not be determined automatically.') + + if source == 'spm': + motpars = motpars_spm2fsl(motion_file) + elif source == 'afni': + motpars = motpars_afni2fsl(motion_file) + elif source == 'fsl': + motpars = np.loadtxt(motion_file) + elif source == 'fmriprep': + motpars = motpars_fmriprep2fsl(motion_file) + else: + raise ValueError(f'Source "{source}" not supported.') + + return motpars + + +def get_resource_path(): + """Return the path to general resources. + + Returns the path to general resources, terminated with separator. + Resources are kept outside package folder in "resources". + Based on function by Yaroslav Halchenko used in Neurosynth Python package. + + Returns + ------- + resource_path : str + Absolute path to resources folder. + """ + return op.abspath(op.join(op.dirname(__file__), 'resources') + op.sep) + + +def get_spectrum(data: np.array, tr: float): + """Return the power spectrum and corresponding frequencies of a time series. + + Parameters + ---------- + data : numpy.ndarray of shape (T, C) or (T,) + A time series of shape T (time) by C (component), + on which you would like to perform an fft. + tr : :obj:`float` + Repetition time (TR) of the data, in seconds. + + Returns + ------- + power_spectrum : numpy.ndarray of shape (F, C) + Power spectrum of the input time series. C is component, F is frequency. + freqs : numpy.ndarray of shape (F,) + Frequencies corresponding to the columns of power_spectrum. + """ + if data.ndim > 2: + raise ValueError(f'Input `data` must be 1D or 2D, not {data.ndim}D') + + if data.ndim == 1: + data = data[:, None] + + power_spectrum = np.abs(np.fft.rfft(data, axis=0)) ** 2 + freqs = np.fft.rfftfreq((power_spectrum.shape[0] * 2) - 1, tr) + idx = np.argsort(freqs) + return power_spectrum[idx, :], freqs[idx] + + +def _get_wf_name(bold_fname, prefix): + """Derive the workflow name for supplied BOLD file. + + >>> _get_wf_name("/completely/made/up/path/sub-01_task-nback_bold.nii.gz", "aroma") + 'aroma_task_nback_wf' + >>> _get_wf_name( + ... "/completely/made/up/path/sub-01_task-nback_run-01_echo-1_bold.nii.gz", + ... "preproc", + ... ) + 'preproc_task_nback_run_01_echo_1_wf' + + """ + from nipype.utils.filemanip import split_filename + + fname = split_filename(bold_fname)[1] + fname_nosub = '_'.join(fname.split('_')[1:-1]) + return f'{prefix}_{fname_nosub.replace("-", "_")}_wf' + + +def update_dict(orig_dict, new_dict): + """Update dictionary with values from another dictionary. + + Parameters + ---------- + orig_dict : dict + Original dictionary. + new_dict : dict + Dictionary with new values. + + Returns + ------- + updated_dict : dict + Updated dictionary. + """ + updated_dict = orig_dict.copy() + for key, value in new_dict.items(): + if (orig_dict.get(key) is not None) and (value is not None): + print(f'Updating {key} from {orig_dict[key]} to {value}') + updated_dict[key].update(value) + elif value is not None: + updated_dict[key] = value + + return updated_dict + + +def _convert_to_tsv(in_file): + """Convert a file to TSV format. + + Parameters + ---------- + in_file : str + Input file. + + Returns + ------- + out_file : str + Output file. + """ + import os + + import numpy as np + + out_file = os.path.abspath('out_file.tsv') + arr = np.loadtxt(in_file) + np.savetxt(out_file, arr, delimiter='\t') + return out_file diff --git a/src/fmripost_tedana/workflows/__init__.py b/src/fmripost_tedana/workflows/__init__.py index e69de29..f83238c 100644 --- a/src/fmripost_tedana/workflows/__init__.py +++ b/src/fmripost_tedana/workflows/__init__.py @@ -0,0 +1,5 @@ +"""Workflow functions for fMRIPost-tedana.""" + +from fmripost_tedana.workflows import tedana, base + +__all__ = ['tedana', 'base'] diff --git a/src/fmripost_tedana/workflows/base.py b/src/fmripost_tedana/workflows/base.py new file mode 100644 index 0000000..776666d --- /dev/null +++ b/src/fmripost_tedana/workflows/base.py @@ -0,0 +1,563 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +""" +fMRIPost tedana workflows +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autofunction:: init_fmripost_tedana_wf +.. autofunction:: init_single_subject_wf + +""" + +import os +import sys +from collections import defaultdict +from copy import deepcopy + +import yaml +from nipype.pipeline import engine as pe +from packaging.version import Version + +from fmripost_tedana import config +from fmripost_tedana.utils.utils import _get_wf_name, update_dict + + +def init_fmripost_tedana_wf(): + """Build *fMRIPost-tedana*'s pipeline. + + This workflow organizes the execution of fMRIPost-tedana, + with a sub-workflow for each subject. + + Workflow Graph + .. workflow:: + :graph2use: orig + :simple_form: yes + + from fmripost_tedana.workflows.tests import mock_config + from fmripost_tedana.workflows.base import init_fmripost_tedana_wf + + with mock_config(): + wf = init_fmripost_tedana_wf() + + """ + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + + ver = Version(config.environment.version) + + fmripost_tedana_wf = Workflow(name=f'fmripost_tedana_{ver.major}_{ver.minor}_wf') + fmripost_tedana_wf.base_dir = config.execution.work_dir + + for subject_id in config.execution.participant_label: + single_subject_wf = init_single_subject_wf(subject_id) + + single_subject_wf.config['execution']['crashdump_dir'] = str( + config.execution.output_dir / f'sub-{subject_id}' / 'log' / config.execution.run_uuid + ) + for node in single_subject_wf._get_all_nodes(): + node.config = deepcopy(single_subject_wf.config) + + fmripost_tedana_wf.add_nodes([single_subject_wf]) + + # Dump a copy of the config file into the log directory + log_dir = ( + config.execution.output_dir / f'sub-{subject_id}' / 'log' / config.execution.run_uuid + ) + log_dir.mkdir(exist_ok=True, parents=True) + config.to_filename(log_dir / 'fmripost_tedana.toml') + + return fmripost_tedana_wf + + +def init_single_subject_wf(subject_id: str): + """Organize the postprocessing pipeline for a single subject. + + It collects and reports information about the subject, + and prepares sub-workflows to postprocess each BOLD series. + + Workflow Graph + .. workflow:: + :graph2use: orig + :simple_form: yes + + from fmripost_tedana.workflows.tests import mock_config + from fmripost_tedana.workflows.base import init_single_subject_wf + + with mock_config(): + wf = init_single_subject_wf('01') + + Parameters + ---------- + subject_id : :obj:`str` + Subject label for this single-subject workflow. + + Notes + ----- + 1. Load fMRIPost-tedana config file. + 2. Collect fMRIPrep derivatives. + - Echo-wise BOLD files in native space. + - Two main possibilities: + 1. bids_dir is a raw BIDS dataset and preprocessing derivatives + are provided through ``--datasets``. + In this scenario, we only need minimal derivatives. + 2. bids_dir is a derivatives dataset and we need to collect compliant + derivatives to get the data into the right space. + - This is possible if the user ran fMRIPrep with ``--me-output-echos``. + 3. Optionally estimate T2* across runs. + - Need to warp T2* or BOLD data to same space, depending on approach (avg vs concat). + 4. Loop over runs. + 5. Collect each run's associated files. + - Transform(s) to MNI152NLin2009cAsym (for component visualization). + - Confounds file (for dummy scans and if external confounds are needed for the + decision tree). + - Optional T2* map from fMRIPrep. + - Optional ICA mixing matrix from another dataset (e.g., fMRIPost-AROMA). + 6. Run tedana. + 7. Denoise available BOLD data in requested spaces with ICA-tedana. + - Do we need to optimally combine with new T2* map and warp within fMRIPost-tedana? + + """ + from bids.utils import listify + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + from niworkflows.interfaces.bids import BIDSInfo + from niworkflows.interfaces.nilearn import NILEARN_VERSION + + from fmripost_tedana.interfaces.bids import DerivativesDataSink + from fmripost_tedana.interfaces.reportlets import AboutSummary, SubjectSummary + from fmripost_tedana.utils.bids import collect_derivatives + + spaces = config.workflow.spaces + + workflow = Workflow(name=f'sub_{subject_id}_wf') + workflow.__desc__ = f""" +Results included in this manuscript come from postprocessing +performed using *fMRIPost-tedana* {config.environment.version} (@tedana), +which is based on *Nipype* {config.environment.nipype_version} +(@nipype1; @nipype2; RRID:SCR_002502). + +""" + workflow.__postdesc__ = f""" + +Many internal operations of *fMRIPost-tedana* use +*Nilearn* {NILEARN_VERSION} [@nilearn, RRID:SCR_001362]. +For more details of the pipeline, see [the section corresponding +to workflows in *fMRIPost-tedana*'s documentation]\ +(https://fmripost_tedana.readthedocs.io/en/latest/workflows.html \ +"FMRIPrep's documentation"). + + +### Copyright Waiver + +The above boilerplate text was automatically generated by fMRIPost-tedana +with the express intention that users should copy and paste this +text into their manuscripts *unchanged*. +It is released under the [CC0]\ +(https://creativecommons.org/publicdomain/zero/1.0/) license. + +### References + +""" + entities = config.execution.bids_filters or {} + entities['subject'] = subject_id + + if config.execution.datasets: + # Raw dataset + derivatives dataset + config.loggers.workflow.info('Raw+derivatives workflow mode enabled') + subject_data = collect_derivatives( + raw_dataset=config.execution.layout, + derivatives_dataset=None, + entities=entities, + fieldmap_id=None, + allow_multiple=True, + spaces=None, + ) + subject_data['bold'] = listify(subject_data['bold_raw']) + else: + # Derivatives dataset only + config.loggers.workflow.info('Derivatives-only workflow mode enabled') + subject_data = collect_derivatives( + raw_dataset=None, + derivatives_dataset=config.execution.layout, + entities=entities, + fieldmap_id=None, + allow_multiple=True, + spaces=None, + ) + # Patch standard-space BOLD files into 'bold' key + subject_data['bold'] = listify(subject_data['bold_mni152nlin6asym']) + + # Make sure we always go through these two checks + if not subject_data['bold']: + task_id = config.execution.task_id + raise RuntimeError( + f'No BOLD images found for participant {subject_id} and ' + f'task {task_id if task_id else ""}. ' + 'All workflows require BOLD images. ' + f'Please check your BIDS filters: {config.execution.bids_filters}.' + ) + + config.loggers.workflow.info( + f'Collected subject data:\n{yaml.dump(subject_data, default_flow_style=False, indent=4)}', + ) + + bids_info = pe.Node( + BIDSInfo( + bids_dir=config.execution.bids_dir, + bids_validate=False, + in_file=subject_data['bold'][0], + ), + name='bids_info', + ) + + summary = pe.Node( + SubjectSummary( + bold=subject_data['bold'], + std_spaces=spaces.get_spaces(nonstandard=False), + nstd_spaces=spaces.get_spaces(standard=False), + ), + name='summary', + run_without_submitting=True, + ) + workflow.connect([(bids_info, summary, [('subject', 'subject_id')])]) + + about = pe.Node( + AboutSummary(version=config.environment.version, command=' '.join(sys.argv)), + name='about', + run_without_submitting=True, + ) + + ds_report_summary = pe.Node( + DerivativesDataSink( + source_file=subject_data['bold'][0], + base_directory=config.execution.output_dir, + desc='summary', + datatype='figures', + ), + name='ds_report_summary', + run_without_submitting=True, + ) + workflow.connect([(summary, ds_report_summary, [('out_report', 'in_file')])]) + + ds_report_about = pe.Node( + DerivativesDataSink( + source_file=subject_data['bold'][0], + base_directory=config.execution.output_dir, + desc='about', + datatype='figures', + ), + name='ds_report_about', + run_without_submitting=True, + ) + workflow.connect([(about, ds_report_about, [('out_report', 'in_file')])]) + + # Append the functional section to the existing anatomical excerpt + # That way we do not need to stream down the number of bold datasets + func_pre_desc = f""" +Functional data postprocessing + +: For each of the {len(subject_data['bold'])} BOLD runs found per subject +(across all tasks and sessions), the following postprocessing was performed. +""" + workflow.__desc__ += func_pre_desc + + # TODO: Estimate T2* (or maybe just collect T2* from the fMRIPrep derivatives?), + # warp it to shared space, and average it across runs. + # Then pass that along to the single-run workflow. + for bold_file in subject_data['bold']: + single_run_wf = init_single_run_wf(bold_file) + workflow.add_nodes([single_run_wf]) + + return clean_datasinks(workflow) + + +def init_single_run_wf(bold_file): + """Set up a single-run workflow for fMRIPost-tedana.""" + from fmriprep.utils.misc import estimate_bold_mem_usage + from nipype.interfaces import utility as niu + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + + from fmripost_tedana.utils.bids import collect_derivatives, extract_entities + from fmripost_tedana.workflows.tedana import init_denoise_wf, init_ica_tedana_wf + from fmripost_tedana.workflows.outputs import init_func_fit_reports_wf + + spaces = config.workflow.spaces + omp_nthreads = config.nipype.omp_nthreads + + workflow = Workflow(name=_get_wf_name(bold_file, 'single_run')) + workflow.__desc__ = '' + + bold_metadata = config.execution.layout.get_metadata(bold_file) + mem_gb = estimate_bold_mem_usage(bold_file)[1] + + entities = config.execution.bids_filters or {} + entities = {**entities, **extract_entities(bold_file)} + + functional_cache = defaultdict(list, {}) + if config.execution.datasets: + # Collect native-space derivatives and transforms + functional_cache = collect_derivatives( + raw_dataset=config.execution.layout, + derivatives_dataset=None, + entities=entities, + fieldmap_id=None, + allow_multiple=False, + spaces=None, + ) + for deriv_dir in config.execution.datasets.values(): + functional_cache = update_dict( + functional_cache, + collect_derivatives( + raw_dataset=None, + derivatives_dataset=deriv_dir, + entities=entities, + fieldmap_id=None, + allow_multiple=False, + spaces=spaces, + ), + ) + + if not functional_cache['bold_confounds']: + if config.workflow.dummy_scans is None: + raise ValueError( + 'No confounds detected. ' + 'Automatical dummy scan detection cannot be performed. ' + 'Please set the `--dummy-scans` flag explicitly.' + ) + + # TODO: Calculate motion parameters from motion correction transform + raise ValueError('Motion parameters cannot be extracted from transforms yet.') + + else: + # Collect MNI152NLin6Asym:res-2 derivatives + # Only derivatives dataset was passed in, so we expected standard-space derivatives + functional_cache.update( + collect_derivatives( + raw_dataset=None, + derivatives_dataset=config.execution.layout, + entities=entities, + fieldmap_id=None, + allow_multiple=False, + spaces=spaces, + ), + ) + + config.loggers.workflow.info( + ( + f'Collected run data for {os.path.basename(bold_file)}:\n' + f'{yaml.dump(functional_cache, default_flow_style=False, indent=4)}' + ), + ) + + if config.workflow.dummy_scans is not None: + skip_vols = config.workflow.dummy_scans + else: + if not functional_cache['bold_confounds']: + raise ValueError( + 'No confounds detected. ' + 'Automatic dummy scan detection cannot be performed. ' + 'Please set the `--dummy-scans` flag explicitly.' + ) + skip_vols = get_nss(functional_cache['bold_confounds']) + + # Run ICA-tedana + ica_tedana_wf = init_ica_tedana_wf(bold_file=bold_file, metadata=bold_metadata, mem_gb=mem_gb) + ica_tedana_wf.inputs.inputnode.confounds = functional_cache['bold_confounds'] + ica_tedana_wf.inputs.inputnode.skip_vols = skip_vols + + mni6_buffer = pe.Node(niu.IdentityInterface(fields=['bold', 'bold_mask']), name='mni6_buffer') + + if ('bold_mni152nlin6asym' not in functional_cache) and ('bold_raw' in functional_cache): + # Resample to MNI152NLin6Asym:res-2, for ICA-tedana classification + from fmriprep.workflows.bold.apply import init_bold_volumetric_resample_wf + from fmriprep.workflows.bold.stc import init_bold_stc_wf + from niworkflows.interfaces.header import ValidateImage + from templateflow.api import get as get_template + + from fmripost_tedana.interfaces.misc import ApplyTransforms + + workflow.__desc__ += """\ +Raw BOLD series were resampled to MNI152NLin6Asym:res-2, for ICA-tedana classification. +""" + + validate_bold = pe.Node( + ValidateImage(in_file=functional_cache['bold_raw']), + name='validate_bold', + ) + + stc_buffer = pe.Node( + niu.IdentityInterface(fields=['bold_file']), + name='stc_buffer', + ) + run_stc = ('SliceTiming' in bold_metadata) and 'slicetiming' not in config.workflow.ignore + if run_stc: + bold_stc_wf = init_bold_stc_wf( + mem_gb=mem_gb, + metadata=bold_metadata, + name='bold_stc_wf', + ) + bold_stc_wf.inputs.inputnode.skip_vols = skip_vols + workflow.connect([ + (validate_bold, bold_stc_wf, [('out_file', 'inputnode.bold_file')]), + (bold_stc_wf, stc_buffer, [('outputnode.stc_file', 'bold_file')]), + ]) # fmt:skip + else: + workflow.connect([(validate_bold, stc_buffer, [('out_file', 'bold_file')])]) + + mni6_mask = str( + get_template( + 'MNI152NLin6Asym', + resolution=2, + desc='brain', + suffix='mask', + extension=['.nii', '.nii.gz'], + ) + ) + + bold_MNI6_wf = init_bold_volumetric_resample_wf( + metadata=bold_metadata, + fieldmap_id=None, # XXX: Ignoring the field map for now + omp_nthreads=omp_nthreads, + mem_gb=mem_gb, + jacobian='fmap-jacobian' not in config.workflow.ignore, + name='bold_MNI6_wf', + ) + bold_MNI6_wf.inputs.inputnode.motion_xfm = functional_cache['bold_hmc'] + bold_MNI6_wf.inputs.inputnode.boldref2fmap_xfm = functional_cache['boldref2fmap'] + bold_MNI6_wf.inputs.inputnode.boldref2anat_xfm = functional_cache['boldref2anat'] + bold_MNI6_wf.inputs.inputnode.anat2std_xfm = functional_cache['anat2mni152nlin6asym'] + bold_MNI6_wf.inputs.inputnode.resolution = '02' + # use mask as boldref? + bold_MNI6_wf.inputs.inputnode.bold_ref_file = functional_cache['bold_mask_native'] + bold_MNI6_wf.inputs.inputnode.target_mask = mni6_mask + bold_MNI6_wf.inputs.inputnode.target_ref_file = mni6_mask + + workflow.connect([ + # Resample BOLD to MNI152NLin6Asym, may duplicate bold_std_wf above + # XXX: Ignoring the field map for now + # (inputnode, bold_MNI6_wf, [ + # ('fmap_ref', 'inputnode.fmap_ref'), + # ('fmap_coeff', 'inputnode.fmap_coeff'), + # ('fmap_id', 'inputnode.fmap_id'), + # ]), + (stc_buffer, bold_MNI6_wf, [('bold_file', 'inputnode.bold_file')]), + (bold_MNI6_wf, mni6_buffer, [('outputnode.bold_file', 'bold')]), + ]) # fmt:skip + + # Warp the mask as well + mask_to_mni6 = pe.Node( + ApplyTransforms( + interpolation='GenericLabel', + input_image=functional_cache['bold_mask_native'], + reference_image=mni6_mask, + transforms=[ + functional_cache['anat2mni152nlin6asym'], + functional_cache['boldref2anat'], + ], + ), + name='mask_to_mni6', + ) + workflow.connect([(mask_to_mni6, mni6_buffer, [('output_image', 'bold_mask')])]) + + elif 'bold_mni152nlin6asym' in functional_cache: + workflow.__desc__ += """\ +Preprocessed BOLD series in MNI152NLin6Asym:res-2 space were collected for ICA-tedana +classification. +""" + mni6_buffer.inputs.bold = functional_cache['bold_mni152nlin6asym'] + mni6_buffer.inputs.bold_mask = functional_cache['bold_mask_mni152nlin6asym'] + + else: + raise ValueError('No valid BOLD series found for ICA-tedana classification.') + + workflow.connect([ + (mni6_buffer, ica_tedana_wf, [ + ('bold', 'inputnode.bold_std'), + ('bold_mask', 'inputnode.bold_mask_std'), + ]), + ]) # fmt:skip + + # Generate reportlets + func_fit_reports_wf = init_func_fit_reports_wf(output_dir=config.execution.output_dir) + func_fit_reports_wf.inputs.inputnode.source_file = bold_file + func_fit_reports_wf.inputs.inputnode.anat2std_xfm = functional_cache['anat2mni152nlin6asym'] + func_fit_reports_wf.inputs.inputnode.anat_dseg = functional_cache['anat_dseg'] + workflow.connect([(mni6_buffer, func_fit_reports_wf, [('bold', 'inputnode.bold_mni6')])]) + + if config.workflow.denoise_method: + # Now denoise the output-space BOLD data using ICA-tedana + denoise_wf = init_denoise_wf(bold_file=bold_file, metadata=bold_metadata) + denoise_wf.inputs.inputnode.skip_vols = skip_vols + denoise_wf.inputs.inputnode.space = 'MNI152NLin6Asym' + denoise_wf.inputs.inputnode.res = '2' + denoise_wf.inputs.inputnode.confounds_file = functional_cache['bold_confounds'] + + workflow.connect([ + (mni6_buffer, denoise_wf, [ + ('bold', 'inputnode.bold_file'), + ('bold_mask', 'inputnode.bold_mask'), + ]), + (ica_tedana_wf, denoise_wf, [ + ('outputnode.mixing', 'inputnode.mixing'), + ('outputnode.tedana_features', 'inputnode.classifications'), + ]), + ]) # fmt:skip + + # Fill-in datasinks seen so far + for node in workflow.list_node_names(): + if node.split('.')[-1].startswith('ds_'): + workflow.get_node(node).inputs.base_directory = config.execution.output_dir + workflow.get_node(node).inputs.source_file = bold_file + + return workflow + + +def _prefix(subid): + return subid if subid.startswith('sub-') else f'sub-{subid}' + + +def clean_datasinks(workflow: pe.Workflow) -> pe.Workflow: + """Overwrite ``out_path_base`` of smriprep's DataSinks.""" + for node in workflow.list_node_names(): + if node.split('.')[-1].startswith('ds_'): + workflow.get_node(node).interface.out_path_base = '' + return workflow + + +def get_nss(confounds_file): + """Get number of non-steady state volumes.""" + import numpy as np + import pandas as pd + + df = pd.read_table(confounds_file) + + nss_cols = [c for c in df.columns if c.startswith('non_steady_state_outlier')] + + dummy_scans = 0 + if nss_cols: + initial_volumes_df = df[nss_cols] + dummy_scans = np.any(initial_volumes_df.to_numpy(), axis=1) + dummy_scans = np.where(dummy_scans)[0] + + # reasonably assumes all NSS volumes are contiguous + dummy_scans = int(dummy_scans[-1] + 1) + + return dummy_scans diff --git a/src/fmripost_tedana/workflows/confounds.py b/src/fmripost_tedana/workflows/confounds.py new file mode 100644 index 0000000..84973af --- /dev/null +++ b/src/fmripost_tedana/workflows/confounds.py @@ -0,0 +1,207 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +""" +Calculate BOLD confounds +^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autofunction:: init_carpetplot_wf + +""" + + +def init_carpetplot_wf( + mem_gb: float, + metadata: dict, + cifti_output: bool, + name: str = 'bold_carpet_wf', +): + """Build a workflow to generate *carpet* plots. + + Resamples the MNI parcellation + (ad-hoc parcellation derived from the Harvard-Oxford template and others). + + Parameters + ---------- + mem_gb : :obj:`float` + Size of BOLD file in GB - please note that this size + should be calculated after resamplings that may extend + the FoV + metadata : :obj:`dict` + BIDS metadata for BOLD file + name : :obj:`str` + Name of workflow (default: ``bold_carpet_wf``) + + Inputs + ------ + bold + BOLD image, in MNI152NLin6Asym space + 2mm resolution. + bold_mask + BOLD series mask in same space as ``bold``. + confounds_file + TSV of all aggregated confounds + cifti_bold + BOLD image in CIFTI format, to be used in place of volumetric BOLD + crown_mask + Mask of brain edge voxels + acompcor_mask + Mask of deep WM+CSF + dummy_scans + Number of nonsteady states to be dropped at the beginning of the timeseries. + + Outputs + ------- + out_carpetplot + Path of the generated SVG file + """ + from fmriprep.interfaces.confounds import FMRISummary + from nipype.interfaces import utility as niu + from nipype.pipeline import engine as pe + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + from templateflow.api import get as get_template + + from fmripost_aroma.config import DEFAULT_MEMORY_MIN_GB + from fmripost_aroma.interfaces.bids import DerivativesDataSink + from fmripost_aroma.interfaces.misc import ApplyTransforms + + inputnode = pe.Node( + niu.IdentityInterface( + fields=[ + 'bold', + 'bold_mask', + 'confounds_file', + 'cifti_bold', + 'dummy_scans', + 'desc', + ], + ), + name='inputnode', + ) + + outputnode = pe.Node(niu.IdentityInterface(fields=['out_carpetplot']), name='outputnode') + + # Carpetplot and confounds plot + conf_plot = pe.Node( + FMRISummary( + tr=metadata['RepetitionTime'], + confounds_list=[ + ('trans_x', 'mm', 'x'), + ('trans_y', 'mm', 'y'), + ('trans_z', 'mm', 'z'), + ('rot_x', 'deg', 'pitch'), + ('rot_y', 'deg', 'roll'), + ('rot_z', 'deg', 'yaw'), + ('framewise_displacement', 'mm', 'FD'), + ], + ), + name='conf_plot', + mem_gb=mem_gb, + ) + ds_report_bold_conf = pe.Node( + DerivativesDataSink( + datatype='figures', + extension='svg', + dismiss_entities=('echo', 'den', 'res'), + ), + name='ds_report_bold_conf', + run_without_submitting=True, + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + + parcels = pe.Node(niu.Function(function=_carpet_parcellation), name='parcels') + parcels.inputs.nifti = not cifti_output + + # Warp segmentation into MNI152NLin6Asym space + resample_parc = pe.Node( + ApplyTransforms( + dimension=3, + input_image=str( + get_template( + 'MNI152NLin2009cAsym', + resolution=1, + desc='carpet', + suffix='dseg', + extension=['.nii', '.nii.gz'], + ) + ), + transforms=[ + str( + get_template( + template='MNI152NLin6Asym', + mode='image', + suffix='xfm', + extension='.h5', + **{'from': 'MNI152NLin2009cAsym'}, + ), + ), + ], + interpolation='GenericLabel', + args='-u int', + ), + name='resample_parc', + ) + + workflow = Workflow(name=name) + if cifti_output: + workflow.connect(inputnode, 'cifti_bold', conf_plot, 'in_cifti') + + workflow.connect([ + (inputnode, resample_parc, [('bold_mask', 'reference_image')]), + (inputnode, conf_plot, [ + ('bold', 'in_nifti'), + ('confounds_file', 'confounds_file'), + ('dummy_scans', 'drop_trs'), + ]), + (resample_parc, parcels, [('output_image', 'segmentation')]), + (parcels, conf_plot, [('out', 'in_segm')]), + (inputnode, ds_report_bold_conf, [('desc', 'desc')]), + (conf_plot, ds_report_bold_conf, [('out_file', 'in_file')]), + (conf_plot, outputnode, [('out_file', 'out_carpetplot')]), + ]) # fmt:skip + return workflow + + +def _carpet_parcellation(segmentation, nifti=False): + """Generate the union of two masks.""" + from pathlib import Path + + import nibabel as nb + import numpy as np + + img = nb.load(segmentation) + + lut = np.zeros((256,), dtype='uint8') + lut[100:201] = 1 if nifti else 0 # Ctx GM + lut[30:99] = 2 if nifti else 0 # dGM + lut[1:11] = 3 if nifti else 1 # WM+CSF + lut[255] = 5 if nifti else 0 # Cerebellum + # Apply lookup table + seg = lut[np.uint16(img.dataobj)] + # seg[np.bool_(nb.load(crown_mask).dataobj)] = 6 if nifti else 2 + # Separate deep from shallow WM+CSF + # seg[np.bool_(nb.load(acompcor_mask).dataobj)] = 4 if nifti else 1 + + outimg = img.__class__(seg.astype('uint8'), img.affine, img.header) + outimg.set_data_dtype('uint8') + out_file = Path('segments.nii.gz').absolute() + outimg.to_filename(out_file) + return str(out_file) diff --git a/src/fmripost_tedana/workflows/main.py b/src/fmripost_tedana/workflows/main.py deleted file mode 100644 index 44cda50..0000000 --- a/src/fmripost_tedana/workflows/main.py +++ /dev/null @@ -1,44 +0,0 @@ -"""Base workflows for the tedana BIDS App.""" -from niworkflows.engine.workflows import LiterateWorkflow as Workflow - - -def init_tedana_wf(): - """Identify subjects in dataset.""" - ... - - -def init_single_subject_wf(): - """Identify runs for a given subject.""" - ... - - -def init_denoise_run_wf(): - """Run tedana on a single run. - - Steps - ----- - 1. A BIDS data grabber to collect the first echo from each run. - 2. A BIDS data grabber to grab the other echoes, brain mask, echo times, - and potentially confounds. - 3. Drop dummy volumes as requested. - 4. Run tedana. - 5. Reorganize tedana outputs into BIDS format. - 6. Generate nireports HTML report. - """ - from fmripost_tedana.interfaces.bids import collect_run_data - from fmripost_tedana.interfaces.misc import RemoveNonSteadyStateVolumes - from fmripost_tedana.interfaces.nilearn import LoadConfounds - from fmripost_tedana.interfaces.tedana import t2smap_workflow_mark, tedana_workflow_mark - - # creating workflow with two input fields - wf = Workflow( - name="denoise_run_wf", - input_spec=["first_echo"], - ) - # adding a task and connecting task's input to the workflow input - wf.add(collect_run_data(name="collect_run_data")) - # adding another task and connecting - # task's input to the "mult" task's output - wf.add(tedana_workflow_mark(name="tedana", x=wf.collect_run_data.lzout.out)) - # setting workflow output - wf.set_output([("out", wf.add.lzout.out)]) diff --git a/src/fmripost_tedana/workflows/outputs.py b/src/fmripost_tedana/workflows/outputs.py new file mode 100644 index 0000000..f4f0e55 --- /dev/null +++ b/src/fmripost_tedana/workflows/outputs.py @@ -0,0 +1,144 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""Writing out derivative files.""" + +from __future__ import annotations + +from fmriprep.utils.bids import dismiss_echo +from nipype.interfaces import utility as niu +from nipype.pipeline import engine as pe +from niworkflows.utils.images import dseg_label + +from fmripost_tedana.config import DEFAULT_MEMORY_MIN_GB +from fmripost_tedana.interfaces.bids import DerivativesDataSink +from fmripost_tedana.interfaces.misc import ApplyTransforms + + +def init_func_fit_reports_wf( + *, + output_dir: str, + name='func_fit_reports_wf', +) -> pe.Workflow: + """Set up a battery of datasinks to store reports in the right location. + + Parameters + ---------- + freesurfer : :obj:`bool` + FreeSurfer was enabled + output_dir : :obj:`str` + Directory in which to save derivatives + name : :obj:`str` + Workflow name (default: func_fit_reports_wf) + + Inputs + ------ + source_file + Input BOLD images + """ + from nireports.interfaces.reporting.base import ( + SimpleBeforeAfterRPT as SimpleBeforeAfter, + ) + from templateflow.api import get as get_template + + from fmripost_tedana.interfaces.nilearn import MeanImage + + workflow = pe.Workflow(name=name) + + inputfields = [ + 'source_file', + 'bold_mni6', + 'anat_dseg', + 'anat2std_xfm', + ] + inputnode = pe.Node(niu.IdentityInterface(fields=inputfields), name='inputnode') + + # Average the BOLD image over time + calculate_mean_bold = pe.Node( + MeanImage(), + name='calculate_mean_bold', + mem_gb=1, + ) + workflow.connect([(inputnode, calculate_mean_bold, [('bold_mni6', 'bold_file')])]) + + # Warp the tissue segmentation to MNI + dseg_to_mni6 = pe.Node( + ApplyTransforms(interpolation='GenericLabel'), + name='dseg_to_mni6', + mem_gb=1, + ) + workflow.connect([ + (inputnode, dseg_to_mni6, [ + ('anat_dseg', 'input_image'), + ('anat2std_xfm', 'transforms'), + ('bold_mni6', 'reference_image'), + ]), + ]) # fmt:skip + + mni6_wm = pe.Node( + niu.Function(function=dseg_label), + name='mni6_wm', + mem_gb=DEFAULT_MEMORY_MIN_GB, + ) + mni6_wm.inputs.label = 2 # BIDS default is WM=2 + workflow.connect([(dseg_to_mni6, mni6_wm, [('output_image', 'in_seg')])]) + + # EPI-MNI registration + epi_mni_report = pe.Node( + SimpleBeforeAfter( + after=str( + get_template( + 'MNI152NLin6Asym', + resolution=2, + desc='brain', + suffix='T1w', + extension=['.nii', '.nii.gz'], + ) + ), + before_label='EPI', + after_label='MNI152NLin6Asym', + dismiss_affine=True, + ), + name='epi_mni_report', + mem_gb=0.1, + ) + workflow.connect([ + (calculate_mean_bold, epi_mni_report, [('out_file', 'before')]), + (mni6_wm, epi_mni_report, [('out', 'wm_seg')]), + ]) # fmt:skip + + ds_epi_mni_report = pe.Node( + DerivativesDataSink( + base_directory=output_dir, + desc='normalization', + suffix='bold', + datatype='figures', + dismiss_entities=dismiss_echo(), + ), + name='ds_epi_mni_report', + ) + workflow.connect([ + (inputnode, ds_epi_mni_report, [('source_file', 'source_file')]), + (epi_mni_report, ds_epi_mni_report, [('out_report', 'in_file')]), + ]) # fmt:skip + + return workflow diff --git a/src/fmripost_tedana/workflows/tedana.py b/src/fmripost_tedana/workflows/tedana.py new file mode 100644 index 0000000..66dabab --- /dev/null +++ b/src/fmripost_tedana/workflows/tedana.py @@ -0,0 +1,581 @@ +# emacs: -*- mode: python; py-indent-offset: 4; indent-tabs-mode: nil -*- +# vi: set ft=python sts=4 ts=4 sw=4 et: +# +# Copyright 2023 The NiPreps Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# We support and encourage derived works from this project, please read +# about our expectations at +# +# https://www.nipreps.org/community/licensing/ +# +"""fMRIPost-AROMA workflows to run ME-ICA.""" + +from nipype.interfaces import utility as niu +from nipype.pipeline import engine as pe + +from fmripost_tedana import config +from fmripost_tedana.interfaces.aroma import AROMAClassifier +from fmripost_tedana.interfaces.bids import DerivativesDataSink +from fmripost_tedana.utils.utils import _get_wf_name + + +def init_tedana_wf( + *, + bold_file: str, + metadata: dict, + mem_gb: dict, + susan_fwhm: float = 6.0, +): + """Build a workflow that runs `ME-ICA`_. + + This workflow wraps `ME-ICA`_ to identify and remove motion-related + independent components from a BOLD time series. + + The following steps are performed: + + #. Remove non-steady state volumes from the bold series. + #. Smooth data using FSL `susan`, with a kernel width FWHM=6.0mm. + #. Run FSL `melodic` outside of ME-ICA to generate the report + #. Run ME-ICA + #. Aggregate components and classifications to TSVs + + There is a current discussion on whether other confounds should be extracted + before or after denoising `here + `__. + + .. _ME-ICA: https://github.com/maartenmennes/ME-ICA + + Workflow Graph + .. workflow:: + :graph2use: orig + :simple_form: yes + + from fmripost_aroma.workflows.bold.confounds import init_ica_aroma_wf + + wf = init_ica_aroma_wf( + bold_file="fake.nii.gz", + metadata={"RepetitionTime": 1.0}, + susan_fwhm=6.0, + ) + + Parameters + ---------- + bold_file + BOLD series used as name source for derivatives + metadata : :obj:`dict` + BIDS metadata for BOLD file + susan_fwhm : :obj:`float` + Kernel width (FWHM in mm) for the smoothing step with + FSL ``susan`` (default: 6.0mm) + + Inputs + ------ + bold_std + BOLD series in template space + bold_mask_std + BOLD series mask in template space + confounds + fMRIPrep-formatted confounds file, which must include the following columns: + "trans_x", "trans_y", "trans_z", "rot_x", "rot_y", "rot_z". + skip_vols + number of non steady state volumes + + Outputs + ------- + mixing + FSL MELODIC mixing matrix + aroma_features + TSV of feature values used to classify components in ``mixing``. + features_metadata + Dictionary describing the ME-ICA run + aroma_confounds + TSV of confounds identified as noise by ME-ICA + """ + + from nipype.interfaces import fsl + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + + from fmripost_aroma.interfaces.confounds import ICAConfounds + from fmripost_aroma.interfaces.nilearn import MeanImage, MedianValue + from fmripost_aroma.interfaces.reportlets import ICAAROMARPT, ICAAROMAMetricsRPT + from fmripost_aroma.utils.utils import _convert_to_tsv + + workflow = Workflow(name=_get_wf_name(bold_file, 'aroma')) + workflow.__postdesc__ = f"""\ +Automatic removal of motion artifacts using independent component analysis +[ME-ICA, @ica_aroma] was performed on the *preprocessed BOLD on MNI152NLin6Asym space* +time-series after removal of non-steady state volumes and spatial smoothing +with a nonlinear filter that preserves underlying structure [SUSAN, @susan], +using a FWHM of {susan_fwhm} mm. +Additionally, the component time-series classified as "noise" were collected and placed +in the corresponding confounds file. +""" + + inputnode = pe.Node( + niu.IdentityInterface( + fields=[ + 'bold_std', + 'bold_mask_std', + 'confounds', + 'skip_vols', + ], + ), + name='inputnode', + ) + + outputnode = pe.Node( + niu.IdentityInterface( + fields=[ + 'mixing', + 'aroma_features', + 'features_metadata', + 'aroma_confounds', + ], + ), + name='outputnode', + ) + + rm_non_steady_state = pe.Node( + niu.Function(function=_remove_volumes, output_names=['bold_cut']), + name='rm_nonsteady', + ) + workflow.connect([ + (inputnode, rm_non_steady_state, [ + ('skip_vols', 'skip_vols'), + ('bold_std', 'bold_file'), + ]), + ]) # fmt:skip + + calc_median_val = pe.Node( + MedianValue(), + name='calc_median_val', + ) + workflow.connect([ + (inputnode, calc_median_val, [('bold_mask_std', 'mask_file')]), + (rm_non_steady_state, calc_median_val, [('bold_cut', 'bold_file')]), + ]) # fmt:skip + + calc_bold_mean = pe.Node( + MeanImage(), + name='calc_bold_mean', + ) + workflow.connect([ + (inputnode, calc_bold_mean, [('bold_mask_std', 'mask_file')]), + (rm_non_steady_state, calc_bold_mean, [('bold_cut', 'bold_file')]), + ]) # fmt:skip + + getusans = pe.Node( + niu.Function(function=_getusans_func, output_names=['usans']), + name='getusans', + mem_gb=0.01, + ) + workflow.connect([ + (calc_median_val, getusans, [('median_value', 'thresh')]), + (calc_bold_mean, getusans, [('out_file', 'image')]), + ]) # fmt:skip + + smooth = pe.Node( + fsl.SUSAN( + fwhm=susan_fwhm, + output_type='NIFTI' if config.execution.low_mem else 'NIFTI_GZ', + ), + name='smooth', + mem_gb=mem_gb['resampled'], + ) + workflow.connect([ + (rm_non_steady_state, smooth, [('bold_cut', 'in_file')]), + (getusans, smooth, [('usans', 'usans')]), + (calc_median_val, smooth, [(('median_value', _getbtthresh), 'brightness_threshold')]), + ]) # fmt:skip + + # ICA with MELODIC + melodic = pe.Node( + fsl.MELODIC( + no_bet=True, + tr_sec=float(metadata['RepetitionTime']), + mm_thresh=0.5, + out_stats=True, + dim=config.workflow.melodic_dim, + ), + name='melodic', + mem_gb=mem_gb['resampled'], + ) + workflow.connect([ + (inputnode, melodic, [('bold_mask_std', 'mask')]), + (smooth, melodic, [('smoothed_file', 'in_files')]), + ]) # fmt:skip + + select_melodic_files = pe.Node( + niu.Function( + function=_select_melodic_files, + input_names=['melodic_dir'], + output_names=['mixing', 'component_maps', 'component_stats'], + ), + name='select_melodic_files', + ) + workflow.connect([(melodic, select_melodic_files, [('out_dir', 'melodic_dir')])]) + + # Run the ME-ICA classifier + ica_aroma = pe.Node( + AROMAClassifier(TR=metadata['RepetitionTime']), + name='ica_aroma', + ) + workflow.connect([ + (inputnode, ica_aroma, [ + ('confounds', 'motpars'), + ('skip_vols', 'skip_vols'), + ]), + (select_melodic_files, ica_aroma, [ + ('mixing', 'mixing'), + ('component_maps', 'component_maps'), + ('component_stats', 'component_stats'), + ]), + (ica_aroma, outputnode, [ + ('aroma_features', 'aroma_features'), + ('aroma_metadata', 'features_metadata'), + ]), + ]) # fmt:skip + + # Generate reportlets + aroma_rpt = pe.Node( + ICAAROMARPT(), + name='aroma_rpt', + ) + workflow.connect([ + (inputnode, aroma_rpt, [('bold_mask_std', 'report_mask')]), + (smooth, aroma_rpt, [('smoothed_file', 'in_file')]), + (melodic, aroma_rpt, [('out_dir', 'melodic_dir')]), + (ica_aroma, aroma_rpt, [('aroma_noise_ics', 'aroma_noise_ics')]), + ]) # fmt:skip + + ds_report_ica_aroma = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + datatype='figures', + desc='aroma', + dismiss_entities=('echo', 'den', 'res'), + ), + name='ds_report_ica_aroma', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([(aroma_rpt, ds_report_ica_aroma, [('out_report', 'in_file')])]) + + # extract the confound ICs from the results + ica_aroma_confound_extraction = pe.Node( + ICAConfounds(err_on_aroma_warn=config.workflow.err_on_warn), + name='ica_aroma_confound_extraction', + ) + workflow.connect([ + (inputnode, ica_aroma_confound_extraction, [('skip_vols', 'skip_vols')]), + (select_melodic_files, ica_aroma_confound_extraction, [('mixing', 'mixing')]), + (ica_aroma, ica_aroma_confound_extraction, [('aroma_features', 'aroma_features')]), + (ica_aroma_confound_extraction, outputnode, [ + ('aroma_confounds', 'aroma_confounds'), + ('mixing', 'mixing'), + ]), + ]) # fmt:skip + + ds_components = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + compress=True, + datatype='func', + threshold='0p5', + desc='melodic', + suffix='components', + ), + name='ds_components', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([(select_melodic_files, ds_components, [('component_maps', 'in_file')])]) + + convert_to_tsv = pe.Node( + niu.Function(function=_convert_to_tsv, output_names=['out_file']), + name='convert_to_tsv', + ) + workflow.connect([(ica_aroma_confound_extraction, convert_to_tsv, [('mixing', 'in_file')])]) + + ds_mixing = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + datatype='func', + res='2', + desc='melodic', + suffix='mixing', + extension='tsv', + ), + name='ds_mixing', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([(convert_to_tsv, ds_mixing, [('out_file', 'in_file')])]) + + ds_aroma_features = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + datatype='func', + desc='aroma', + suffix='metrics', + extension='tsv', + dismiss_entities=('echo', 'den', 'res'), + ), + name='ds_aroma_features', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([ + (ica_aroma, ds_aroma_features, [ + ('aroma_features', 'in_file'), + ('aroma_metadata', 'meta_dict'), + ]), + ]) # fmt:skip + + ds_aroma_confounds = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + datatype='func', + desc='aroma', + suffix='timeseries', + extension='tsv', + dismiss_entities=('echo', 'den', 'res'), + ), + name='ds_aroma_confounds', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([ + (ica_aroma_confound_extraction, ds_aroma_confounds, [('aroma_confounds', 'in_file')]), + ]) # fmt:skip + + metrics_rpt = pe.Node( + ICAAROMAMetricsRPT(), + name='metrics_rpt', + ) + workflow.connect([(ica_aroma, metrics_rpt, [('aroma_features', 'aroma_features')])]) + + ds_report_metrics = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + datatype='figures', + desc='metrics', + dismiss_entities=('echo', 'den', 'res'), + ), + name='ds_report_metrics', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([(metrics_rpt, ds_report_metrics, [('out_report', 'in_file')])]) + + return workflow + + +def init_denoise_wf(bold_file, metadata): + """Build a workflow that denoises a BOLD series using AROMA confounds. + + This workflow performs the denoising in the requested output space(s). + It doesn't currently work on CIFTIs. + """ + from niworkflows.engine.workflows import LiterateWorkflow as Workflow + + from fmripost_aroma.interfaces.confounds import ICADenoise + from fmripost_aroma.workflows.confounds import init_carpetplot_wf + + workflow = Workflow(name=_get_wf_name(bold_file, 'denoise')) + + inputnode = pe.Node( + niu.IdentityInterface( + fields=[ + 'bold_file', + 'bold_mask', + 'confounds_file', + 'mixing', + 'classifications', + 'skip_vols', + 'space', + 'cohort', + 'res', + ], + ), + name='inputnode', + ) + + rm_non_steady_state = pe.Node( + niu.Function(function=_remove_volumes, output_names=['bold_cut']), + name='rm_nonsteady', + ) + workflow.connect([ + (inputnode, rm_non_steady_state, [ + ('skip_vols', 'skip_vols'), + ('bold_file', 'bold_file'), + ]), + ]) # fmt:skip + + preproc_carpetplot_wf = init_carpetplot_wf( + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + metadata=metadata, + cifti_output=False, + name='preproc_carpet_wf', + ) + preproc_carpetplot_wf.inputs.inputnode.desc = 'preprocCarpetplot' + workflow.connect([ + (inputnode, preproc_carpetplot_wf, [ + ('bold_mask', 'inputnode.bold_mask'), + ('confounds_file', 'inputnode.confounds_file'), + ('skip_vols', 'inputnode.dummy_scans'), + ]), + (inputnode, preproc_carpetplot_wf, [('bold_file', 'inputnode.bold')]), + ]) # fmt:skip + + for denoise_method in config.workflow.denoise_method: + denoise = pe.Node( + ICADenoise(method=denoise_method), + name=f'denoise_{denoise_method}', + ) + workflow.connect([ + (inputnode, denoise, [ + ('mixing', 'mixing'), + ('classifications', 'metrics'), + ('bold_mask', 'mask_file'), + ('skip_vols', 'skip_vols'), + ]), + (rm_non_steady_state, denoise, [('bold_cut', 'bold_file')]), + ]) # fmt:skip + + add_non_steady_state = pe.Node( + niu.Function(function=_add_volumes, output_names=['bold_add']), + name=f'add_non_steady_state_{denoise_method}', + ) + workflow.connect([ + (inputnode, add_non_steady_state, [ + ('bold_file', 'bold_file'), + ('skip_vols', 'skip_vols'), + ]), + (denoise, add_non_steady_state, [('denoised_file', 'bold_cut_file')]), + ]) # fmt:skip + + ds_denoised = pe.Node( + DerivativesDataSink( + base_directory=config.execution.output_dir, + source_file=bold_file, + compress=True, + datatype='func', + desc=f'{denoise_method}Denoised', + suffix='bold', + ), + name=f'ds_denoised_{denoise_method}', + run_without_submitting=True, + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + ) + workflow.connect([ + (inputnode, ds_denoised, [ + ('space', 'space'), + ('cohort', 'cohort'), + ('res', 'res'), + ]), + (add_non_steady_state, ds_denoised, [('bold_add', 'in_file')]), + ]) # fmt:skip + + carpetplot_wf = init_carpetplot_wf( + mem_gb=config.DEFAULT_MEMORY_MIN_GB, + metadata=metadata, + cifti_output=False, + name=f'{denoise_method}_carpet_wf', + ) + carpetplot_wf.inputs.inputnode.desc = f'{denoise_method}Carpetplot' + workflow.connect([ + (inputnode, carpetplot_wf, [ + ('bold_mask', 'inputnode.bold_mask'), + ('confounds_file', 'inputnode.confounds_file'), + ('skip_vols', 'inputnode.dummy_scans'), + ]), + (ds_denoised, carpetplot_wf, [('out_file', 'inputnode.bold')]), + ]) # fmt:skip + + return workflow + + +def _getbtthresh(medianval): + return 0.75 * medianval + + +def _getusans_func(image, thresh): + return [(image, thresh)] + + +def _remove_volumes(bold_file, skip_vols): + """Remove skip_vols from bold_file.""" + import os + + import nibabel as nb + from nipype.utils.filemanip import fname_presuffix + + if skip_vols == 0: + return bold_file + + out = fname_presuffix(bold_file, prefix='cut_', newpath=os.getcwd()) + bold_img = nb.load(bold_file) + bold_img.__class__( + bold_img.dataobj[..., skip_vols:], bold_img.affine, bold_img.header + ).to_filename(out) + return out + + +def _add_volumes(bold_file, bold_cut_file, skip_vols): + """Prepend skip_vols from bold_file onto bold_cut_file.""" + import os + + import nibabel as nb + import numpy as np + from nipype.utils.filemanip import fname_presuffix + + if skip_vols == 0: + return bold_cut_file + + bold_img = nb.load(bold_file) + bold_cut_img = nb.load(bold_cut_file) + + bold_data = np.concatenate((bold_img.dataobj[..., :skip_vols], bold_cut_img.dataobj), axis=3) + + out = fname_presuffix(bold_cut_file, prefix='addnonsteady_', newpath=os.getcwd()) + bold_img.__class__(bold_data, bold_img.affine, bold_img.header).to_filename(out) + return out + + +def _select_melodic_files(melodic_dir): + """Select the mixing and component maps from the Melodic output.""" + import os + + mixing = os.path.join(melodic_dir, 'melodic_mix') + if not os.path.isfile(mixing): + raise FileNotFoundError(f'Missing MELODIC mixing matrix: {mixing}') + + component_maps = os.path.join(melodic_dir, 'melodic_IC.nii.gz') + if not os.path.isfile(component_maps): + raise FileNotFoundError(f'Missing MELODIC ICs: {component_maps}') + + component_stats = os.path.join(melodic_dir, 'melodic_ICstats') + if not os.path.isfile(component_stats): + raise FileNotFoundError(f'Missing MELODIC IC stats: {component_stats}') + + return mixing, component_maps, component_stats