Skip to content

Commit 1c217b6

Browse files
committed
ENH: Refactor workflow for simplicity - explicit iteration over DWIs
This will reduce the attrition of tracking Nipype's iterables, and make the development of workflows easier.
1 parent c868f76 commit 1c217b6

File tree

4 files changed

+173
-134
lines changed

4 files changed

+173
-134
lines changed

dmriprep/utils/misc.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,19 @@ def check_deps(workflow):
1515
and which(node.interface._cmd.split()[0]) is None
1616
)
1717
)
18+
19+
20+
def sub_prefix(subid):
21+
"""
22+
Make sure the subject ID has the sub- prefix.
23+
24+
Examples
25+
--------
26+
>>> sub_prefix("sub-01")
27+
'sub-01'
28+
29+
>>> sub_prefix("01")
30+
'sub-01'
31+
32+
"""
33+
return f"sub-{subid.replace('sub-', '')}"

dmriprep/workflows/base.py

Lines changed: 43 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from nipype.interfaces import utility as niu
99

1010
from niworkflows.engine.workflows import LiterateWorkflow as Workflow
11-
from niworkflows.anat.coregistration import init_bbreg_wf
1211
from niworkflows.interfaces.bids import BIDSInfo, BIDSFreeSurferDir
1312
from niworkflows.utils.misc import fix_multi_T1w_source_name
1413
from niworkflows.utils.spaces import Reference
@@ -124,6 +123,8 @@ def init_single_subject_wf(subject_id):
124123
FreeSurfer's ``$SUBJECTS_DIR``
125124
126125
"""
126+
from ..utils.misc import sub_prefix as _prefix
127+
127128
name = f"single_subject_{subject_id}_wf"
128129
subject_data = collect_data(config.execution.layout, subject_id)[0]
129130

@@ -285,7 +286,7 @@ def init_single_subject_wf(subject_id):
285286
if anat_only:
286287
return workflow
287288

288-
from .dwi.base import init_early_b0ref_wf
289+
from .dwi.base import init_dwi_preproc_wf
289290
# Append the dMRI section to the existing anatomical excerpt
290291
# That way we do not need to stream down the number of DWI datasets
291292
anat_preproc_wf.__postdesc__ = (
@@ -300,115 +301,39 @@ def init_single_subject_wf(subject_id):
300301
"""
301302
)
302303

303-
layout = config.execution.layout
304-
dwi_data = tuple(
305-
[
306-
(dwi, layout.get_metadata(dwi), layout.get_bvec(dwi), layout.get_bval(dwi))
307-
for dwi in subject_data["dwi"]
308-
]
309-
)
310-
311-
inputnode = pe.Node(niu.IdentityInterface(fields=["dwi_data"]), name="inputnode")
312-
inputnode.iterables = [("dwi_data", dwi_data)]
313-
314-
referencenode = pe.JoinNode(
315-
niu.IdentityInterface(
316-
fields=[
317-
"dwi_file",
318-
"metadata",
319-
"dwi_reference",
320-
"dwi_mask",
321-
"gradients_rasb",
322-
]
323-
),
324-
name="referencenode",
325-
joinsource="inputnode",
326-
run_without_submitting=True,
327-
)
328-
329-
split_info = pe.Node(
330-
niu.Function(
331-
function=_unpack, output_names=["dwi_file", "metadata", "bvec", "bval"]
332-
),
333-
name="split_info",
334-
run_without_submitting=True,
335-
)
336-
337-
early_b0ref_wf = init_early_b0ref_wf()
338-
339-
# fmt:off
340-
workflow.connect([
341-
(inputnode, split_info, [("dwi_data", "in_tuple")]),
342-
(split_info, early_b0ref_wf, [
343-
("dwi_file", "inputnode.dwi_file"),
344-
("bvec", "inputnode.in_bvec"),
345-
("bval", "inputnode.in_bval"),
346-
]),
347-
(split_info, referencenode, [("dwi_file", "dwi_file"),
348-
("metadata", "metadata")]),
349-
(early_b0ref_wf, referencenode, [
350-
("outputnode.dwi_reference", "dwi_reference"),
351-
("outputnode.dwi_mask", "dwi_mask"),
352-
("outputnode.gradients_rasb", "gradients_rasb"),
353-
]),
354-
])
355-
# fmt:on
356-
357-
if config.workflow.run_reconall:
358-
from niworkflows.interfaces.nibabel import ApplyMask
359-
360-
# Mask the T1w
361-
t1w_brain = pe.Node(ApplyMask(), name="t1w_brain")
362-
363-
bbr_wf = init_bbreg_wf(
364-
debug=config.execution.debug,
365-
epi2t1w_init=config.workflow.dwi2t1w_init,
366-
omp_nthreads=config.nipype.omp_nthreads,
367-
)
368-
369-
ds_report_reg = pe.Node(
370-
DerivativesDataSink(base_directory=str(output_dir), datatype="figures",),
371-
name="ds_report_reg",
372-
run_without_submitting=True,
373-
)
374-
375-
def _bold_reg_suffix(fallback):
376-
return "coreg" if fallback else "bbregister"
304+
for dwi_file in subject_data["dwi"]:
305+
dwi_preproc_wf = init_dwi_preproc_wf(dwi_file)
377306

378-
# fmt:off
307+
# fmt: off
379308
workflow.connect([
380-
# T1w Mask
381-
(anat_preproc_wf, t1w_brain, [
382-
("outputnode.t1w_preproc", "in_file"),
383-
("outputnode.t1w_mask", "in_mask"),
384-
]),
385-
# BBRegister
386-
(early_b0ref_wf, bbr_wf, [
387-
("outputnode.dwi_reference", "inputnode.in_file")
388-
]),
389-
(fsinputnode, bbr_wf, [("subjects_dir", "inputnode.subjects_dir")]),
390-
(bids_info, bbr_wf, [(("subject", _prefix), "inputnode.subject_id")]),
391-
(anat_preproc_wf, bbr_wf, [
392-
("outputnode.fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm")
393-
]),
394-
(split_info, ds_report_reg, [("dwi_file", "source_file")]),
395-
(bbr_wf, ds_report_reg, [
396-
('outputnode.out_report', 'in_file'),
397-
(('outputnode.fallback', _bold_reg_suffix), 'desc')]),
309+
(anat_preproc_wf, dwi_preproc_wf,
310+
[("outputnode.t1w_preproc", "inputnode.t1w_preproc"),
311+
("outputnode.t1w_mask", "inputnode.t1w_mask"),
312+
("outputnode.t1w_dseg", "inputnode.t1w_dseg"),
313+
("outputnode.t1w_aseg", "inputnode.t1w_aseg"),
314+
("outputnode.t1w_aparc", "inputnode.t1w_aparc"),
315+
("outputnode.t1w_tpms", "inputnode.t1w_tpms"),
316+
("outputnode.template", "inputnode.template"),
317+
("outputnode.anat2std_xfm", "inputnode.anat2std_xfm"),
318+
("outputnode.std2anat_xfm", "inputnode.std2anat_xfm"),
319+
# Undefined if --fs-no-reconall, but this is safe
320+
("outputnode.subjects_dir", "inputnode.subjects_dir"),
321+
("outputnode.t1w2fsnative_xfm", "inputnode.t1w2fsnative_xfm"),
322+
("outputnode.fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm")]),
323+
(bids_info, dwi_preproc_wf, [("subject", "inputnode.subject_id")]),
398324
])
399-
# fmt:on
325+
# fmt: on
400326

401327
if "fieldmap" in config.workflow.ignore:
402328
return workflow
403329

404-
from niworkflows.interfaces.utility import KeySelect
405330
from sdcflows import fieldmaps as fm
406331
from sdcflows.utils.wrangler import find_estimators
407332
from sdcflows.workflows.base import init_fmap_preproc_wf
408333

409334
# SDCFlows connection
410335
# Step 1: Run basic heuristics to identify available data for fieldmap estimation
411-
estimators = find_estimators(layout)
336+
estimators = find_estimators(config.execution.layout)
412337

413338
if not estimators and config.workflow.use_syn: # Add fieldmap-less estimators
414339
# estimators = [fm.FieldmapEstimation()]
@@ -444,31 +369,25 @@ def _bold_reg_suffix(fallback):
444369
s.metadata for s in estimator.sources
445370
]
446371
else:
447-
est_id = estimator.bids_id
448-
fmap_select = pe.MapNode(
449-
KeySelect(fields=["metadata", "dwi_reference", "dwi_mask", "gradients_rasb",]),
450-
name=f"fmap_select_{est_id}",
451-
run_without_submitting=True,
452-
iterfields=["key"]
453-
)
454-
fmap_select.inputs.key = [
455-
str(s.path) for s in estimator.sources if s.suffix in ("epi", "dwi", "sbref")
456-
]
457-
# fmt:off
458-
workflow.connect([
459-
(referencenode, fmap_select, [("dwi_file", "keys"),
460-
("metadata", "metadata"),
461-
("dwi_reference", "dwi_reference"),
462-
("gradients_rasb", "gradients_rasb")]),
463-
])
464-
# fmt:on
372+
raise NotImplementedError
373+
# from niworkflows.interfaces.utility import KeySelect
374+
# est_id = estimator.bids_id
375+
# fmap_select = pe.MapNode(
376+
# KeySelect(fields=["metadata", "dwi_reference", "dwi_mask", "gradients_rasb",]),
377+
# name=f"fmap_select_{est_id}",
378+
# run_without_submitting=True,
379+
# iterfields=["key"]
380+
# )
381+
# fmap_select.inputs.key = [
382+
# str(s.path) for s in estimator.sources if s.suffix in ("epi", "dwi", "sbref")
383+
# ]
384+
# # fmt:off
385+
# workflow.connect([
386+
# (referencenode, fmap_select, [("dwi_file", "keys"),
387+
# ("metadata", "metadata"),
388+
# ("dwi_reference", "dwi_reference"),
389+
# ("gradients_rasb", "gradients_rasb")]),
390+
# ])
391+
# # fmt:on
465392

466393
return workflow
467-
468-
469-
def _prefix(subid):
470-
return "-".join(("sub", subid.lstrip("sub-")))
471-
472-
473-
def _unpack(in_tuple):
474-
return in_tuple

dmriprep/workflows/dwi/base.py

Lines changed: 113 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
"""Orchestrating the dMRI-preprocessing workflow."""
22
from ... import config
3+
from pathlib import Path
34
from nipype.pipeline import engine as pe
45
from nipype.interfaces import utility as niu
56

67
from niworkflows.engine.workflows import LiterateWorkflow as Workflow
7-
from ...interfaces.vectors import CheckGradientTable
8-
from .util import init_dwi_reference_wf
9-
from .outputs import init_reportlets_wf
8+
from ...interfaces import DerivativesDataSink
109

1110

12-
def init_early_b0ref_wf(name="early_b0ref_wf",):
11+
def init_dwi_preproc_wf(dwi_file):
1312
"""
14-
Build an early :math:`b = 0` average reference for internal consumption of *dMRIPrep*.
13+
Build a preprocessing workflow for one DWI run.
1514
1615
Workflow Graph
1716
.. workflow::
@@ -20,9 +19,14 @@ def init_early_b0ref_wf(name="early_b0ref_wf",):
2019
2120
from dmriprep.config.testing import mock_config
2221
from dmriprep import config
23-
from dmriprep.workflows.dwi.base import init_early_b0ref_wf
22+
from dmriprep.workflows.dwi.base import init_dwi_preproc_wf
2423
with mock_config():
25-
wf = init_early_b0ref_wf()
24+
wf = init_dwi_preproc_wf()
25+
26+
Parameters
27+
----------
28+
dwi_file : :obj:`os.PathLike`
29+
One diffusion MRI dataset to be processed.
2630
2731
Inputs
2832
------
@@ -49,13 +53,48 @@ def init_early_b0ref_wf(name="early_b0ref_wf",):
4953
* :py:func:`~dmriprep.workflows.dwi.outputs.init_reportlets_wf`
5054
5155
"""
56+
from ...interfaces.vectors import CheckGradientTable
57+
from .util import init_dwi_reference_wf
58+
from .outputs import init_reportlets_wf
59+
60+
layout = config.execution.layout
61+
62+
dwi_file = Path(dwi_file)
63+
config.loggers.workflow.debug(
64+
f"Creating DWI preprocessing workflow for <{dwi_file.name}>"
65+
)
66+
5267
# Build workflow
53-
workflow = Workflow(name=name)
68+
workflow = Workflow(name=_get_wf_name(dwi_file.name))
5469

5570
inputnode = pe.Node(
56-
niu.IdentityInterface(fields=["dwi_file", "in_bvec", "in_bval"]),
71+
niu.IdentityInterface(
72+
fields=[
73+
# DWI
74+
"dwi_file",
75+
"in_bvec",
76+
"in_bval",
77+
# From anatomical
78+
"t1w_preproc",
79+
"t1w_mask",
80+
"t1w_dseg",
81+
"t1w_aseg",
82+
"t1w_aparc",
83+
"t1w_tpms",
84+
"template",
85+
"anat2std_xfm",
86+
"std2anat_xfm",
87+
"subjects_dir",
88+
"subject_id",
89+
"t1w2fsnative_xfm",
90+
"fsnative2t1w_xfm",
91+
]
92+
),
5793
name="inputnode",
5894
)
95+
inputnode.inputs.dwi_file = str(dwi_file.absolute())
96+
inputnode.inputs.in_bvec = str(layout.get_bvec(dwi_file))
97+
inputnode.inputs.in_bval = str(layout.get_bval(dwi_file))
5998

6099
outputnode = pe.Node(
61100
niu.IdentityInterface(fields=["dwi_reference", "dwi_mask", "gradients_rasb"]),
@@ -84,6 +123,52 @@ def init_early_b0ref_wf(name="early_b0ref_wf",):
84123
])
85124
# fmt:on
86125

126+
if config.workflow.run_reconall:
127+
from niworkflows.interfaces.nibabel import ApplyMask
128+
from niworkflows.anat.coregistration import init_bbreg_wf
129+
from ...utils.misc import sub_prefix as _prefix
130+
131+
# Mask the T1w
132+
t1w_brain = pe.Node(ApplyMask(), name="t1w_brain")
133+
134+
bbr_wf = init_bbreg_wf(
135+
debug=config.execution.debug,
136+
epi2t1w_init=config.workflow.dwi2t1w_init,
137+
omp_nthreads=config.nipype.omp_nthreads,
138+
)
139+
140+
ds_report_reg = pe.Node(
141+
DerivativesDataSink(
142+
base_directory=str(config.execution.output_dir), datatype="figures",
143+
),
144+
name="ds_report_reg",
145+
run_without_submitting=True,
146+
)
147+
148+
def _bold_reg_suffix(fallback):
149+
return "coreg" if fallback else "bbregister"
150+
151+
# fmt:off
152+
workflow.connect([
153+
(inputnode, bbr_wf, [
154+
("fsnative2t1w_xfm", "inputnode.fsnative2t1w_xfm"),
155+
(("subject_id", _prefix), "inputnode.subject_id"),
156+
("subjects_dir", "inputnode.subjects_dir"),
157+
]),
158+
# T1w Mask
159+
(inputnode, t1w_brain, [("t1w_preproc", "in_file"),
160+
("t1w_mask", "in_mask")]),
161+
(inputnode, ds_report_reg, [("dwi_file", "source_file")]),
162+
# BBRegister
163+
(dwi_reference_wf, bbr_wf, [
164+
("outputnode.ref_image", "inputnode.in_file")
165+
]),
166+
(bbr_wf, ds_report_reg, [
167+
('outputnode.out_report', 'in_file'),
168+
(('outputnode.fallback', _bold_reg_suffix), 'desc')]),
169+
])
170+
# fmt:on
171+
87172
# REPORTING ############################################################
88173
reportlets_wf = init_reportlets_wf(str(config.execution.output_dir))
89174
# fmt:off
@@ -98,3 +183,22 @@ def init_early_b0ref_wf(name="early_b0ref_wf",):
98183
# fmt:on
99184

100185
return workflow
186+
187+
188+
def _get_wf_name(filename):
189+
"""
190+
Derive the workflow name for supplied DWI file.
191+
192+
Examples
193+
--------
194+
>>> _get_wf_name('/completely/made/up/path/sub-01_dir-AP_acq-64grad_dwi.nii.gz')
195+
'dwi_preproc_dir_AP_acq_64grad_wf'
196+
197+
>>> _get_wf_name('/completely/made/up/path/sub-01_dir-RL_run-01_echo-1_dwi.nii.gz')
198+
'dwi_preproc_dir_RL_run_01_echo_1_wf'
199+
200+
"""
201+
from pathlib import Path
202+
fname = Path(filename).name.rpartition(".nii")[0].replace("_dwi", "_wf")
203+
fname_nosub = '_'.join(fname.split("_")[1:])
204+
return f"dwi_preproc_{fname_nosub.replace('.', '_').replace(' ', '').replace('-', '_')}"

0 commit comments

Comments
 (0)