Skip to content

Commit d76755c

Browse files
committed
ENH: Large refactor of the orchestration workflow [wipe phdiff_*]
This PR: - [x] Refines the work in nipreps#53 addressing nipreps#40. - [x] Adds tests to cover the orchestration workflow. - [x] Fixes nipreps#7 (added regression tests for this bug). - [x] Updates the interface of phdiff workflows: removes the metadata input, which was defined for this workflow solely. - [x] Makes it trivial to extend the phdiff workflow to phase1/phase2 workflows (nipreps#15).
1 parent 83e2dbf commit d76755c

File tree

4 files changed

+78
-57
lines changed

4 files changed

+78
-57
lines changed

sdcflows/workflows/base.py

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,6 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
7979
method (for reporting purposes)
8080
8181
"""
82-
if ignore is None:
83-
ignore = tuple()
84-
85-
if not isinstance(ignore, (list, tuple)):
86-
ignore = tuple(ignore)
87-
88-
# TODO: To be removed (filter out unsupported fieldmaps):
89-
fmaps = [fmap for fmap in fmaps if fmap['suffix'] in FMAP_PRIORITY]
90-
9182
workflow = Workflow(name='sdc_estimate_wf' if fmaps else 'sdc_bypass_wf')
9283
inputnode = pe.Node(niu.IdentityInterface(
9384
fields=['epi_file', 'epi_brain', 'epi_mask', 't1w_brain', 'std2anat_xfm']),
@@ -99,7 +90,8 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
9990
name='outputnode')
10091

10192
# No fieldmaps - forward inputs to outputs
102-
if not fmaps or 'fieldmaps' in ignore:
93+
ignored = False if ignore is None else 'fieldmaps' in ignore
94+
if not fmaps or ignored:
10395
workflow.__postdesc__ = """\
10496
Susceptibility distortion correction (SDC) has been skipped because the
10597
dataset does not contain extra field map acquisitions correctly described
@@ -119,18 +111,26 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
119111
accurate co-registration with the anatomical reference.
120112
"""
121113

122-
# In case there are multiple fieldmaps prefer EPI
123-
fmaps.sort(key=lambda fmap: FMAP_PRIORITY[fmap['suffix']])
124-
fmap = fmaps[0]
114+
only_syn = 'syn' in fmaps and len(fmaps) == 1
125115

126116
# PEPOLAR path
127117
if 'epi' in fmaps:
128118
from .pepolar import init_pepolar_unwarp_wf, check_pes
119+
120+
# SyN works without this metadata
121+
if epi_meta.get('PhaseEncodingDirection') is None:
122+
raise ValueError(
123+
'PhaseEncodingDirection is not defined within the metadata retrieved '
124+
'for the intended EPI (DWI, BOLD, or SBRef) run.')
129125
outputnode.inputs.method = 'PEB/PEPOLAR (phase-encoding based / PE-POLARity)'
130126

131-
# Filter out EPI fieldmaps to be used
132-
fmaps_epi = [(epi.path, epi.get_metadata()['PhaseEncodingDirection'])
133-
for epi in fmaps['epi']]
127+
fmaps_epi = [(v[0], v[1].get('PhaseEncodingDirection'))
128+
for v in fmaps['epi']]
129+
130+
if not all(list(zip(*fmaps_epi))[1]):
131+
raise ValueError(
132+
'At least one of the EPI runs with alternative phase-encoding '
133+
'blips is missing the required "PhaseEncodingDirection" metadata entry.')
134134

135135
# Find matched PE directions
136136
matched_pe = check_pes(fmaps_epi, epi_meta['PhaseEncodingDirection'])
@@ -150,32 +150,34 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
150150
])
151151

152152
# FIELDMAP path
153-
elif 'fieldmap' in fmaps:
153+
elif 'fieldmap' in fmaps or 'phasediff' in fmaps:
154154
from .unwarp import init_sdc_unwarp_wf
155-
# Import specific workflows here, so we don't break everything with one
156-
# unused workflow.
157-
suffices = {f.suffix for f in fmaps['fieldmap']}
158-
if 'fieldmap' in suffices:
155+
156+
# SyN works without this metadata
157+
if epi_meta.get('PhaseEncodingDirection') is None:
158+
raise ValueError(
159+
'PhaseEncodingDirection is not defined within the metadata retrieved '
160+
'for the intended EPI (DWI, BOLD, or SBRef) run.')
161+
162+
if 'fieldmap' in fmaps:
159163
from .fmap import init_fmap_wf
160-
outputnode.inputs.method = 'FMB (fieldmap-based)'
164+
outputnode.inputs.method = 'FMB (fieldmap-based) - directly measured B0 map'
161165
fmap_wf = init_fmap_wf(
162166
omp_nthreads=omp_nthreads,
163167
fmap_bspline=False)
164168
# set inputs
165-
fmap_wf.inputs.inputnode.magnitude = fmap['magnitude']
166-
fmap_wf.inputs.inputnode.fieldmap = fmap['fieldmap']
167-
elif 'phasediff' in suffices:
169+
fmap_wf.inputs.inputnode.magnitude = [
170+
m for m, _ in fmaps['fieldmap']['magnitude']]
171+
fmap_wf.inputs.inputnode.fieldmap = [
172+
m for m, _ in fmaps['fieldmap']['fieldmap']]
173+
elif 'phasediff' in fmaps:
168174
from .phdiff import init_phdiff_wf
175+
outputnode.inputs.method = 'FMB (fieldmap-based) - phase-difference map'
169176
fmap_wf = init_phdiff_wf(omp_nthreads=omp_nthreads)
170177
# set inputs
171-
fmap_wf.inputs.inputnode.phasediff = fmap['phasediff']
172178
fmap_wf.inputs.inputnode.magnitude = [
173-
fmap_ for key, fmap_ in sorted(fmap.items())
174-
if key.startswith("magnitude")
175-
]
176-
else:
177-
raise ValueError('Fieldmaps of types %s are not supported' %
178-
', '.join(['"%s"' % f for f in suffices]))
179+
m for m, _ in fmaps['phasediff']['magnitude']]
180+
fmap_wf.inputs.inputnode.phasediff = fmaps['phasediff']['phases']
179181

180182
sdc_unwarp_wf = init_sdc_unwarp_wf(
181183
omp_nthreads=omp_nthreads,
@@ -193,24 +195,27 @@ def init_sdc_estimate_wf(fmaps, epi_meta, omp_nthreads=1, debug=False, ignore=No
193195
('outputnode.fmap_ref', 'inputnode.fmap_ref'),
194196
('outputnode.fmap_mask', 'inputnode.fmap_mask')]),
195197
])
198+
elif not only_syn:
199+
raise ValueError('Fieldmaps of types %s are not supported' %
200+
', '.join(['"%s"' % f for f in fmaps]))
196201

197202
# FIELDMAP-less path
198-
if any(fm['suffix'] == 'syn' for fm in fmaps):
203+
if 'syn' in fmaps:
199204
from .syn import init_syn_sdc_wf
200205
syn_sdc_wf = init_syn_sdc_wf(
201206
epi_pe=epi_meta.get('PhaseEncodingDirection', None),
202207
omp_nthreads=omp_nthreads)
203208

204209
workflow.connect([
205210
(inputnode, syn_sdc_wf, [
211+
('epi_file', 'inputnode.in_reference'),
212+
('epi_brain', 'inputnode.in_reference_brain'),
206213
('t1w_brain', 'inputnode.t1w_brain'),
207-
('epi_file', 'inputnode.epi_file'),
208-
('epi_brain', 'inputnode.epi_brain'),
209214
('std2anat_xfm', 'inputnode.std2anat_xfm')]),
210215
])
211216

212217
# XXX Eliminate branch when forcing isn't an option
213-
if fmap['suffix'] == 'syn': # No fieldmaps, but --use-syn
218+
if only_syn: # No fieldmaps, but --use-syn
214219
outputnode.inputs.method = 'FLB ("fieldmap-less", SyN-based)'
215220
sdc_unwarp_wf = syn_sdc_wf
216221
else: # --force-syn was called when other fieldmap was present

sdcflows/workflows/gre.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,
128128
"""
129129
workflow = Workflow(name=name)
130130
inputnode = pe.Node(niu.IdentityInterface(
131-
fields=['fmap_mask', 'fmap_ref', 'fmap']), name='inputnode')
132-
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap']),
131+
fields=['fmap_mask', 'fmap_ref', 'fmap', 'metadata']), name='inputnode')
132+
outputnode = pe.Node(niu.IdentityInterface(fields=['out_fmap', 'metadata']),
133133
name='outputnode')
134134
if fmap_bspline:
135135
from ..interfaces.fmap import FieldEnhance
@@ -156,11 +156,12 @@ def init_fmap_postproc_wf(omp_nthreads, fmap_bspline, median_kernel_size=3,
156156

157157
workflow.connect([
158158
(inputnode, cleanup_wf, [('fmap_mask', 'inputnode.in_mask')]),
159-
(inputnode, recenter, [('fmap', 'in_file')]),
159+
(inputnode, recenter, [(('fmap', _pop), 'in_file')]),
160160
(recenter, denoise, [('out', 'in_file')]),
161161
(denoise, demean, [('out_file', 'in_file')]),
162162
(demean, cleanup_wf, [('out', 'inputnode.in_file')]),
163163
(cleanup_wf, outputnode, [('outputnode.out_file', 'out_fmap')]),
164+
(inputnode, outputnode, [(('metadata', _pop), 'metadata')]),
164165
])
165166

166167
return workflow
@@ -218,3 +219,9 @@ def _demean(in_file, in_mask=None, usemode=True):
218219
newpath=getcwd())
219220
nb.Nifti1Image(data, nii.affine, nii.header).to_filename(out_file)
220221
return out_file
222+
223+
224+
def _pop(inlist):
225+
if isinstance(inlist, (tuple, list)):
226+
return inlist[0]
227+
return inlist

sdcflows/workflows/phdiff.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
5959
6060
Inputs
6161
------
62-
magnitude : pathlike
63-
Path to the corresponding magnitude path(s).
64-
phasediff : pathlike
65-
Path to the corresponding phase-difference file.
66-
metadata : dict
67-
Metadata dictionary corresponding to the phasediff input
62+
magnitude : list of os.pathlike
63+
List of path(s) the GRE magnitude maps.
64+
phasediff : list of tuple(os.pathlike, dict)
65+
List containing one GRE phase-difference map with its corresponding metadata
66+
(requires ``EchoTime1`` and ``EchoTime2``), or the phase maps for the two
67+
subsequent echoes, with their metadata (requires ``EchoTime``).
6868
6969
Outputs
7070
-------
@@ -90,39 +90,48 @@ def init_phdiff_wf(omp_nthreads, name='phdiff_wf'):
9090
further improvements of HCP Pipelines [@hcppipelines].
9191
"""
9292

93-
inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff', 'metadata']),
93+
inputnode = pe.Node(niu.IdentityInterface(fields=['magnitude', 'phasediff']),
9494
name='inputnode')
9595

9696
outputnode = pe.Node(niu.IdentityInterface(
9797
fields=['fmap', 'fmap_ref', 'fmap_mask']), name='outputnode')
9898

99+
split = pe.MapNode(niu.Function(function=_split, output_names=['map_file', 'meta']),
100+
iterfield=['phasediff'], run_without_submitting=True, name='split')
101+
99102
magnitude_wf = init_magnitude_wf(omp_nthreads=omp_nthreads)
100103

101104
# phase diff -> radians
102-
phmap2rads = pe.Node(PhaseMap2rads(), name='phmap2rads',
103-
run_without_submitting=True)
105+
phmap2rads = pe.MapNode(PhaseMap2rads(), name='phmap2rads',
106+
iterfield=['in_file'], run_without_submitting=True)
104107
# FSL PRELUDE will perform phase-unwrapping
105-
prelude = pe.Node(fsl.PRELUDE(), name='prelude')
108+
prelude = pe.MapNode(fsl.PRELUDE(), iterfield=['phase_file'], name='prelude')
106109

107110
fmap_postproc_wf = init_fmap_postproc_wf(omp_nthreads=omp_nthreads,
108111
fmap_bspline=False)
109112
compfmap = pe.Node(Phasediff2Fieldmap(), name='compfmap')
110113

111114
workflow.connect([
112-
(inputnode, compfmap, [('metadata', 'metadata')]),
115+
(inputnode, split, [('phasediff', 'phasediff')]),
113116
(inputnode, magnitude_wf, [('magnitude', 'inputnode.magnitude')]),
114117
(magnitude_wf, prelude, [('outputnode.fmap_ref', 'magnitude_file'),
115118
('outputnode.fmap_mask', 'mask_file')]),
116-
(inputnode, phmap2rads, [('phasediff', 'in_file')]),
119+
(split, phmap2rads, [('map_file', 'in_file')]),
117120
(phmap2rads, prelude, [('out_file', 'phase_file')]),
121+
(split, fmap_postproc_wf, [('meta', 'inputnode.metadata')]),
118122
(prelude, fmap_postproc_wf, [('unwrapped_phase_file', 'inputnode.fmap')]),
119123
(magnitude_wf, fmap_postproc_wf, [
120124
('outputnode.fmap_mask', 'inputnode.fmap_mask'),
121125
('outputnode.fmap_ref', 'inputnode.fmap_ref')]),
122-
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file')]),
126+
(fmap_postproc_wf, compfmap, [('outputnode.out_fmap', 'in_file'),
127+
('outputnode.metadata', 'metadata')]),
123128
(compfmap, outputnode, [('out_file', 'fmap')]),
124129
(magnitude_wf, outputnode, [('outputnode.fmap_ref', 'fmap_ref'),
125130
('outputnode.fmap_mask', 'fmap_mask')]),
126131
])
127132

128133
return workflow
134+
135+
136+
def _split(phasediff):
137+
return phasediff

sdcflows/workflows/tests/test_phdiff.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ def test_workflow(bids_layouts, tmpdir, output_path, dataset, workdir):
2323
return_type='file',
2424
extension=['.nii', '.nii.gz'])
2525

26-
phdiff_file = data.get(suffix='phasediff', acq='v4',
27-
extension=['.nii', '.nii.gz'])[0]
26+
phdiff_files = data.get(suffix='phasediff', acq='v4',
27+
extension=['.nii', '.nii.gz'])
2828

29-
phdiff_wf.inputs.inputnode.phasediff = phdiff_file.path
30-
phdiff_wf.inputs.inputnode.metadata = phdiff_file.get_metadata()
29+
phdiff_wf.inputs.inputnode.phasediff = [
30+
(ph.path, ph.get_metadata()) for ph in phdiff_files]
3131

3232
if output_path:
3333
from ...interfaces.reportlets import FieldmapReportlet
@@ -36,7 +36,7 @@ def test_workflow(bids_layouts, tmpdir, output_path, dataset, workdir):
3636
dsink = pe.Node(DerivativesDataSink(
3737
base_directory=str(output_path), keep_dtype=True), name='dsink')
3838
dsink.interface.out_path_base = 'sdcflows'
39-
dsink.inputs.source_file = phdiff_file.path
39+
dsink.inputs.source_file = phdiff_files[0].path
4040

4141
wf.connect([
4242
(phdiff_wf, rep, [

0 commit comments

Comments
 (0)