43
43
import os
44
44
import sys
45
45
from copy import deepcopy
46
+ from typing import Optional
46
47
47
48
from nipype .interfaces import utility as niu
48
49
from nipype .pipeline import engine as pe
49
50
from packaging .version import Version
50
51
51
- from .. import config
52
- from ..interfaces import DerivativesDataSink
53
- from ..interfaces .reports import AboutSummary , SubjectSummary
54
- from .bold import init_func_preproc_wf
52
+ from nibabies import config
53
+ from nibabies .interfaces import DerivativesDataSink
54
+ from nibabies .interfaces .reports import AboutSummary , SubjectSummary
55
+ from nibabies .utils .bids import parse_bids_for_age_months
56
+ from nibabies .workflows .bold import init_func_preproc_wf
55
57
56
58
57
- def init_nibabies_wf (participants_table ):
59
+ def init_nibabies_wf (subworkflows_list ):
58
60
"""
59
61
Build *NiBabies*'s pipeline.
60
62
@@ -76,8 +78,9 @@ def init_nibabies_wf(participants_table):
76
78
77
79
Parameters
78
80
----------
79
- participants_table: :obj:`dict`
80
- Keys of participant labels and values of the sessions to process.
81
+ subworkflows_list: :obj:`list` of :obj:`tuple`
82
+ A list of the subworkflows to create.
83
+ Each subject session is run as an individual workflow.
81
84
"""
82
85
from niworkflows .engine .workflows import LiterateWorkflow as Workflow
83
86
from niworkflows .interfaces .bids import BIDSFreeSurferDir
@@ -86,52 +89,76 @@ def init_nibabies_wf(participants_table):
86
89
nibabies_wf = Workflow (name = f"nibabies_{ ver .major } _{ ver .minor } _wf" )
87
90
nibabies_wf .base_dir = config .execution .work_dir
88
91
92
+ execution_spaces = init_execution_spaces ()
93
+
89
94
freesurfer = config .workflow .run_reconall
90
95
if freesurfer :
91
96
fsdir = pe .Node (
92
97
BIDSFreeSurferDir (
93
98
derivatives = config .execution .output_dir ,
94
99
freesurfer_home = os .getenv ("FREESURFER_HOME" ),
95
- spaces = config . workflow . spaces .get_fs_spaces (),
100
+ spaces = execution_spaces .get_fs_spaces (),
96
101
),
97
102
name = f"fsdir_run_{ config .execution .run_uuid .replace ('-' , '_' )} " ,
98
103
run_without_submitting = True ,
99
104
)
100
105
if config .execution .fs_subjects_dir is not None :
101
106
fsdir .inputs .subjects_dir = str (config .execution .fs_subjects_dir .absolute ())
102
107
103
- for subject_id , sessions in participants_table .items ():
104
- for session_id in sessions :
105
- single_subject_wf = init_single_subject_wf (subject_id , session_id = session_id )
108
+ for subject_id , session_id in subworkflows_list :
109
+ # Calculate the age and age-specific spaces
110
+ age = parse_bids_for_age_months (config .execution .bids_dir , subject_id , session_id )
111
+ if config .workflow .age_months :
112
+ config .loggers .cli .warning (
113
+ "`--age-months` is deprecated and will be removed in a future release."
114
+ "Please use a `sessions.tsv` or `participants.tsv` file to track participants age."
115
+ )
116
+ age = config .workflow .age_months
117
+ if age is None :
118
+ raise RuntimeError (
119
+ "Could not find age for sub-{subject}{session}" .format (
120
+ subject = subject_id , session = f'_ses-{ session_id } ' if session_id else ''
121
+ )
122
+ )
123
+ output_spaces = init_workflow_spaces (execution_spaces , age )
106
124
107
- bids_level = [f"sub-{ subject_id } " ]
108
- if session_id :
109
- bids_level .append (f"ses-{ session_id } " )
125
+ # skull strip template cohort
126
+ single_subject_wf = init_single_subject_wf (
127
+ subject_id ,
128
+ session_id = session_id ,
129
+ age = age ,
130
+ spaces = output_spaces ,
131
+ )
110
132
111
- log_dir = (
112
- config .execution .nibabies_dir .joinpath (* bids_level )
113
- / "log"
114
- / config .execution .run_uuid
115
- )
133
+ bids_level = [f"sub-{ subject_id } " ]
134
+ if session_id :
135
+ bids_level .append (f"ses-{ session_id } " )
116
136
117
- single_subject_wf .config ["execution" ]["crashdump_dir" ] = str (log_dir )
118
- for node in single_subject_wf ._get_all_nodes ():
119
- node .config = deepcopy (single_subject_wf .config )
120
- if freesurfer :
121
- nibabies_wf .connect (
122
- fsdir , "subjects_dir" , single_subject_wf , "inputnode.subjects_dir"
123
- )
124
- else :
125
- nibabies_wf .add_nodes ([single_subject_wf ])
137
+ log_dir = (
138
+ config .execution .nibabies_dir .joinpath (* bids_level ) / "log" / config .execution .run_uuid
139
+ )
126
140
127
- # Dump a copy of the config file into the log directory
128
- log_dir .mkdir (exist_ok = True , parents = True )
129
- config .to_filename (log_dir / "nibabies.toml" )
141
+ single_subject_wf .config ["execution" ]["crashdump_dir" ] = str (log_dir )
142
+ for node in single_subject_wf ._get_all_nodes ():
143
+ node .config = deepcopy (single_subject_wf .config )
144
+ if freesurfer :
145
+ nibabies_wf .connect (fsdir , "subjects_dir" , single_subject_wf , "inputnode.subjects_dir" )
146
+ else :
147
+ nibabies_wf .add_nodes ([single_subject_wf ])
148
+
149
+ # Dump a copy of the config file into the log directory
150
+ log_dir .mkdir (exist_ok = True , parents = True )
151
+ config .to_filename (log_dir / "nibabies.toml" )
130
152
131
153
return nibabies_wf
132
154
133
155
134
- def init_single_subject_wf (subject_id , session_id = None ):
156
+ def init_single_subject_wf (
157
+ subject_id : str ,
158
+ session_id : Optional [str ] = None ,
159
+ age : Optional [int ] = None ,
160
+ spaces = None ,
161
+ ):
135
162
"""
136
163
Organize the preprocessing pipeline for a single subject, at a single session.
137
164
@@ -158,6 +185,8 @@ def init_single_subject_wf(subject_id, session_id=None):
158
185
Subject label for this single-subject workflow.
159
186
session_id : :obj:`str` or None
160
187
Session identifier.
188
+ age: :obj:`int` or None
189
+ Age (in months) of subject.
161
190
162
191
Inputs
163
192
------
@@ -196,7 +225,6 @@ def init_single_subject_wf(subject_id, session_id=None):
196
225
anat_only = config .workflow .anat_only
197
226
derivatives = config .execution .derivatives or {}
198
227
anat_modality = "t1w" if subject_data ["t1w" ] else "t2w"
199
- spaces = config .workflow .spaces
200
228
# Make sure we always go through these two checks
201
229
if not anat_only and not subject_data ["bold" ]:
202
230
task_id = config .execution .task_id
@@ -315,7 +343,7 @@ def init_single_subject_wf(subject_id, session_id=None):
315
343
# Preprocessing of anatomical (includes registration to UNCInfant)
316
344
anat_preproc_wf = init_infant_anat_wf (
317
345
ants_affine_init = True ,
318
- age_months = config . workflow . age_months ,
346
+ age_months = age ,
319
347
anat_modality = anat_modality ,
320
348
t1w = subject_data ["t1w" ],
321
349
t2w = subject_data ["t2w" ],
@@ -419,7 +447,7 @@ def init_single_subject_wf(subject_id, session_id=None):
419
447
func_preproc_wfs = []
420
448
has_fieldmap = bool (fmap_estimators )
421
449
for bold_file in subject_data ['bold' ]:
422
- func_preproc_wf = init_func_preproc_wf (bold_file , has_fieldmap = has_fieldmap )
450
+ func_preproc_wf = init_func_preproc_wf (bold_file , spaces , has_fieldmap = has_fieldmap )
423
451
if func_preproc_wf is None :
424
452
continue
425
453
@@ -526,8 +554,51 @@ def _prefix(subid):
526
554
return subid if subid .startswith ("sub-" ) else f"sub-{ subid } "
527
555
528
556
529
- def _select_iter_idx (in_list , idx ):
530
- """Returns a specific index of a list/tuple"""
531
- if isinstance (in_list , (tuple , list )):
532
- return in_list [idx ]
533
- raise AttributeError (f"Input { in_list } is incompatible type: { type (in_list )} " )
557
+ def init_workflow_spaces (execution_spaces , age ):
558
+ """
559
+ Create output spaces at a per-subworkflow level.
560
+
561
+ This address the case where a multi-session subject is run, and requires separate template cohorts.
562
+ """
563
+ from niworkflows .utils .spaces import Reference
564
+
565
+ from nibabies .utils .misc import cohort_by_months
566
+
567
+ spaces = deepcopy (execution_spaces )
568
+
569
+ if not spaces .references :
570
+ # Ensure age specific template is added if nothing is present
571
+ cohort = cohort_by_months ("MNIInfant" , age )
572
+ spaces .add (("MNIInfant" , {"res" : "native" , "cohort" : cohort }))
573
+
574
+ if not spaces .is_cached ():
575
+ spaces .checkpoint ()
576
+
577
+ # Ensure user-defined spatial references for outputs are correctly parsed.
578
+ # Certain options require normalization to a space not explicitly defined by users.
579
+ # These spaces will not be included in the final outputs.
580
+ if config .workflow .use_aroma :
581
+ # Make sure there's a normalization to FSL for AROMA to use.
582
+ spaces .add (Reference ("MNI152NLin6Asym" , {"res" : "2" }))
583
+
584
+ if config .workflow .cifti_output :
585
+ # CIFTI grayordinates to corresponding FSL-MNI resolutions.
586
+ vol_res = "2" if config .workflow .cifti_output == "91k" else "1"
587
+ spaces .add (Reference ("fsaverage" , {"den" : "164k" }))
588
+ spaces .add (Reference ("MNI152NLin6Asym" , {"res" : vol_res }))
589
+ # Ensure a non-native version of MNIInfant is added as a target
590
+ cohort = cohort_by_months ("MNIInfant" , age )
591
+ spaces .add (Reference ("MNIInfant" , {"cohort" : cohort }))
592
+
593
+ return spaces
594
+
595
+
596
+ def init_execution_spaces ():
597
+ from niworkflows .utils .spaces import Reference , SpatialReferences
598
+
599
+ spaces = config .execution .output_spaces or SpatialReferences ()
600
+ if not isinstance (spaces , SpatialReferences ):
601
+ spaces = SpatialReferences (
602
+ [ref for s in spaces .split (" " ) for ref in Reference .from_string (s )]
603
+ )
604
+ return spaces
0 commit comments