diff --git a/CHANGELOG.md b/CHANGELOG.md index c41b04fefd..cc824259c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated some output filenaming conventions for human-readability and to move closer to BIDS-derivatives compliance - Changed motion filter from single dictionary to list of dictionaries - Changed CI logic to allow non-release tags +- Refactored ROI resampling to be more efficient (resample to output resolution before analysis, and resample each atlas just once if used for multiple analyses) ### Upgraded dependencies - `nibabel` 2.3.3 → 3.0.1 diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py index 78fbda4251..cb24e99e47 100644 --- a/CPAC/anat_preproc/anat_preproc.py +++ b/CPAC/anat_preproc/anat_preproc.py @@ -18,7 +18,7 @@ fslmaths_command, \ VolumeRemoveIslands from CPAC.pipeline.engine import flatten_list -from CPAC.utils.docs import docstring_parameter +from CPAC.utils.docs import docstring_parameter, list_items_unbracketed from CPAC.utils.interfaces.fsl import Merge as fslMerge from CPAC.utils.interfaces.function.seg_preproc import \ pick_tissue_from_labels_file_interface @@ -1411,9 +1411,10 @@ def acpc_align_brain_with_mask(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'desc-preproc_T1w': (acpc_align, 'outputspec.acpc_aligned_head'), 'desc-acpcbrain_T1w': (acpc_align, 'outputspec.acpc_aligned_brain'), - 'space-T1w_desc-brain_mask': (acpc_align, 'outputspec.acpc_brain_mask'), - 'space-T1w_desc-prebrain_mask': (strat_pool.get_data('space-T1w_desc-brain_mask')) - } + 'space-T1w_desc-brain_mask': (acpc_align, + 'outputspec.acpc_brain_mask'), + 'space-T1w_desc-prebrain_mask': strat_pool.get_data( + 'space-T1w_desc-brain_mask')} return (wf, outputs) @@ -2478,9 +2479,11 @@ def brain_extraction_T2(wf, cfg, strat_pool, pipe_num, opt=None): ["space-T2w_desc-brain_mask", "space-T2w_desc-acpcbrain_mask"])], "outputs": ["desc-brain_T2w"]} ''' - if cfg.anatomical_preproc['acpc_alignment']['run'] and cfg.anatomical_preproc['acpc_alignment']['acpc_target'] == 'brain': + if (cfg.anatomical_preproc['acpc_alignment']['run'] + and cfg.anatomical_preproc['acpc_alignment']['acpc_target'] == 'brain' + ): outputs = { - 'desc-brain_T2w': (strat_pool.get_data(["desc-acpcbrain_T2w"])) + 'desc-brain_T2w': strat_pool.get_data(["desc-acpcbrain_T2w"]) } else: anat_skullstrip_orig_vol = pe.Node(interface=afni.Calc(), @@ -2679,9 +2682,8 @@ def freesurfer_postproc(wf, cfg, strat_pool, pipe_num, opt=None): # we're grabbing the postproc outputs and appending them to # the reconall outputs -@docstring_parameter(postproc_outputs=str(flatten_list( - freesurfer_postproc, 'outputs') - ).lstrip('[').replace("'", '"')) +@docstring_parameter(postproc_outputs=list_items_unbracketed( + flatten_list(freesurfer_postproc, 'outputs'))) def freesurfer_reconall(wf, cfg, strat_pool, pipe_num, opt=None): ''' {{"name": "freesurfer_reconall", @@ -2696,7 +2698,7 @@ def freesurfer_reconall(wf, cfg, strat_pool, pipe_num, opt=None): "brainmask", "wmparc", "T1", - {postproc_outputs}}} + {postproc_outputs}]}} ''' reconall = pe.Node(interface=freesurfer.ReconAll(), diff --git a/CPAC/nuisance/nuisance.py b/CPAC/nuisance/nuisance.py index b743e33749..738afb5450 100644 --- a/CPAC/nuisance/nuisance.py +++ b/CPAC/nuisance/nuisance.py @@ -2492,8 +2492,8 @@ def nuisance_regression(wf, cfg, strat_pool, pipe_num, opt, space, res=None): # sometimes mm dimensions match but the voxel dimensions don't # so here we align the mask to the resampled data before applying match_grid = pe.Node(afni.Resample(), - name='align_template_mask_to_template_data_' - f'{name_suff}') + name='align_template_mask_to_template_' + f'data_{name_suff}') match_grid.inputs.outputtype = 'NIFTI_GZ' match_grid.inputs.resample_mode = 'Cu' node, out = strat_pool.get_data('FSL-AFNI-brain-mask') diff --git a/CPAC/pipeline/cpac_pipeline.py b/CPAC/pipeline/cpac_pipeline.py index 0cf2e7997a..b4083da668 100644 --- a/CPAC/pipeline/cpac_pipeline.py +++ b/CPAC/pipeline/cpac_pipeline.py @@ -1134,10 +1134,10 @@ def connect_pipeline(wf, cfg, rpool, pipeline_blocks): def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None, num_ants_cores=1): - from CPAC.utils.datasource import gather_extraction_maps + from CPAC.utils.datasource import gather_atlases, gather_extraction_maps # Workflow setup - wf = initialize_nipype_wf(cfg, sub_dict) + wf = initialize_nipype_wf(cfg, sub_dict, name=pipeline_name) # Extract credentials path if it exists try: @@ -1396,6 +1396,7 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None, pipeline_blocks += [surface_postproc] # Extractions and Derivatives + pipeline_blocks += [gather_atlases] tse_atlases, sca_atlases = gather_extraction_maps(cfg) cfg.timeseries_extraction['tse_atlases'] = tse_atlases cfg.seed_based_correlation_analysis['sca_atlases'] = sca_atlases diff --git a/CPAC/pipeline/engine.py b/CPAC/pipeline/engine.py index 13882e77d5..c6188200df 100644 --- a/CPAC/pipeline/engine.py +++ b/CPAC/pipeline/engine.py @@ -1507,26 +1507,18 @@ def connect_block(self, wf, cfg, rpool): if raw_label not in new_json_info['CpacVariant']: new_json_info['CpacVariant'][raw_label] = [] new_json_info['CpacVariant'][raw_label].append(node_name) - - rpool.set_data(label, - connection[0], - connection[1], - new_json_info, - pipe_idx, node_name, fork) - - wf, post_labels = rpool.post_process( - wf, label, connection, new_json_info, pipe_idx, - pipe_x, outs) - - if rpool.func_reg: - for postlabel in post_labels: - connection = (postlabel[1], postlabel[2]) - wf = rpool.derivative_xfm(wf, postlabel[0], - connection, - new_json_info, - pipe_idx, - pipe_x) + if isinstance(connection, list): + for _connection in connection: + wf = load_into_rpool( + wf, rpool, label, _connection, + new_json_info, pipe_idx, node_name, + fork, pipe_x, outs) + else: + wf = load_into_rpool( + wf, rpool, label, connection, + new_json_info, pipe_idx, node_name, fork, + pipe_x, outs) return wf @@ -1564,6 +1556,50 @@ def flatten_list(node_block_function: Union[FunctionType, list, Tuple], return flat_list +def load_into_rpool(wf, rpool, label, connection, new_json_info, pipe_idx, + node_name, fork, pipe_x, outs): + """ + Loads a single resource into a ResourcePool + + Parameters + ---------- + wf : pe.Workflow + + rpool : ResourcePool + + label : str + + connection : 2-tuple + + new_json_info : dict + + pipe_idx : dict + + node_name : str + + fork : bool + + pipe_x : int + + outs : dict + + Returns + ------- + wf : pe.Workflow + """ + rpool.set_data(label, connection[0], connection[1], new_json_info, + pipe_idx, node_name, fork) + wf, post_labels = rpool.post_process( + wf, label, connection, new_json_info, pipe_idx, + pipe_x, outs) + if rpool.func_reg: + for postlabel in post_labels: + connection = (postlabel[1], postlabel[2]) + wf = rpool.derivative_xfm(wf, postlabel[0], connection, + new_json_info, pipe_idx, pipe_x) + return wf + + def wrap_block(node_blocks, interface, wf, cfg, strat_pool, pipe_num, opt): """Wrap a list of node block functions to make them easier to use within other node blocks. @@ -1612,7 +1648,7 @@ def wrap_block(node_blocks, interface, wf, cfg, strat_pool, pipe_num, opt): for in_resource, val in interface.items(): if isinstance(val, tuple): strat_pool.set_data(in_resource, val[0], val[1], {}, "", "", - fork=True)# + fork=True) if 'sub_num' not in strat_pool.get_pool_info(): strat_pool.set_pool_info({'sub_num': 0}) sub_num = strat_pool.get_pool_info()['sub_num'] diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index 214ae36b72..47215707d0 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -23,9 +23,8 @@ import nipype.interfaces.utility as util from CPAC.sca.utils import * -from CPAC.utils.utils import extract_one_d -from CPAC.utils.datasource import resample_func_roi, \ - create_roi_mask_dataflow, create_spatial_map_dataflow +# from CPAC.utils.utils import extract_one_d +from CPAC.utils.datasource import resample_func_roi, roi_input_node from CPAC.timeseries.timeseries_analysis import get_roi_timeseries, \ get_spatial_map_timeseries, resample_function @@ -405,12 +404,14 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [("atlas-sca-Avg", "atlas-sca-Avg_name"), + "space-template_desc-preproc_bold"], "outputs": ["desc-MeanSCA_timeseries", "space-template_desc-MeanSCA_correlations", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-Avg') is False: + return wf, {} # same workflow, except to run TSE and send it to the resource # pool so that it will not get sent to SCA resample_functional_roi_for_sca = pe.Node( @@ -429,31 +430,21 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow_for_sca = create_roi_mask_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['Avg'], - f'roi_dataflow_for_sca_{pipe_num}' - ) - - roi_dataflow_for_sca.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries_for_sca = get_roi_timeseries( f'roi_timeseries_for_sca_{pipe_num}') - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to roi - wf.connect(node, out, - resample_functional_roi_for_sca, 'in_func') - wf.connect(roi_dataflow_for_sca, 'outputspec.out_file', - resample_functional_roi_for_sca, 'in_roi') + wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), + resample_functional_roi_for_sca, 'in_func') + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-Avg', pipe_num) + wf.connect(fork_atlases, 'atlas_file', + resample_functional_roi_for_sca, 'in_roi') # connect it to the roi_timeseries wf.connect(resample_functional_roi_for_sca, 'out_roi', - roi_timeseries_for_sca, 'input_roi.roi') + roi_timeseries_for_sca, 'input_roi.roi') wf.connect(resample_functional_roi_for_sca, 'out_func', - roi_timeseries_for_sca, 'inputspec.rest') + roi_timeseries_for_sca, 'inputspec.rest') sca_roi = create_sca(f'sca_roi_{pipe_num}') @@ -471,7 +462,7 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): # extract_one_d)), 'space-template_desc-MeanSCA_correlations': (sca_roi, 'outputspec.correlation_stack'), - 'atlas_name': (roi_dataflow_for_sca, 'outputspec.out_name') + 'atlas_name': (fork_atlases, 'atlas_name') } return (wf, outputs) @@ -486,13 +477,15 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", - "space-template_desc-bold_mask"], + "inputs": [("atlas-sca-DualReg", "atlas-sca-DualReg_name"), + ("space-template_desc-preproc_bold", + "space-template_desc-bold_mask")], "outputs": ["space-template_desc-DualReg_correlations", "desc-DualReg_statmap", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-DualReg') is False: + return wf, {} resample_spatial_map_to_native_space_for_dr = pe.Node( interface=fsl.FLIRT(), name=f'resample_spatial_map_to_native_space_for_DR_{pipe_num}' @@ -506,16 +499,6 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'identity_matrix'] ) - spatial_map_dataflow_for_dr = create_spatial_map_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['DualReg'], - f'spatial_map_dataflow_for_DR_{pipe_num}' - ) - - spatial_map_dataflow_for_dr.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - spatial_map_timeseries_for_dr = get_spatial_map_timeseries( f'spatial_map_timeseries_for_DR_{pipe_num}' ) @@ -528,8 +511,9 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): resample_spatial_map_to_native_space_for_dr, 'reference') wf.connect(node, out, spatial_map_timeseries_for_dr, 'inputspec.subject_rest') - - wf.connect(spatial_map_dataflow_for_dr, 'select_spatial_map.out_file', + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-DualReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_spatial_map_to_native_space_for_dr, 'in_file') # connect it to the spatial_map_timeseries @@ -556,11 +540,9 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): (dr_temp_reg, 'outputspec.temp_reg_map'), 'desc-DualReg_statmap': (dr_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': - (spatial_map_dataflow_for_dr, 'select_spatial_map.out_name') - } + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): @@ -572,13 +554,15 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", - "space-template_desc-bold_mask"], + "inputs": [("atlas-sca-MultReg", "atlas-sca-MultReg_name"), + ("space-template_desc-preproc_bold", + "space-template_desc-bold_mask")], "outputs": ["space-template_desc-MultReg_correlations", "desc-MultReg_statmap", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-MultReg') is False: + return wf, {} # same workflow, except to run TSE and send it to the resource # pool so that it will not get sent to SCA resample_functional_roi_for_multreg = pe.Node( @@ -591,25 +575,16 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow_for_multreg = create_roi_mask_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['MultReg'], - f'roi_dataflow_for_mult_reg_{pipe_num}') - - roi_dataflow_for_multreg.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries_for_multreg = get_roi_timeseries( f'roi_timeseries_for_mult_reg_{pipe_num}') - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to roi - wf.connect(node, out, resample_functional_roi_for_multreg, 'in_func') - wf.connect(roi_dataflow_for_multreg, - 'outputspec.out_file', - resample_functional_roi_for_multreg, - 'in_roi') + wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), + resample_functional_roi_for_multreg, 'in_func') + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-MultReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', + resample_functional_roi_for_multreg, 'in_roi') # connect it to the roi_timeseries wf.connect(resample_functional_roi_for_multreg, @@ -647,7 +622,6 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): (sc_temp_reg, 'outputspec.temp_reg_map'), 'desc-MultReg_statmap': (sc_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': (roi_dataflow_for_multreg, 'outputspec.out_name') - } + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index bdc6b2f1b2..107bb01042 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -22,9 +22,7 @@ create_connectome_nilearn, \ get_connectome_method from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.utils.datasource import create_roi_mask_dataflow, \ - create_spatial_map_dataflow, \ - resample_func_roi +from CPAC.utils.datasource import resample_func_roi, roi_input_node def get_voxel_timeseries(wf_name='voxel_timeseries'): @@ -797,8 +795,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", - "space-template_desc-brain_mask"], + "inputs": [("atlas-tse-Avg", "atlas-tse-Avg_name"), + "space-template_desc-preproc_bold"], "outputs": ["space-template_desc-Mean_timeseries", "space-template_desc-ndmg_correlations", "atlas_name", @@ -807,6 +805,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "space-template_desc-PearsonNilearn_correlations", "space-template_desc-PartialNilearn_correlations"]} ''' + if strat_pool.check_rpool('atlas-tse-Avg') is False: + return wf, {} resample_functional_roi = pe.Node(resample_function(), name='resample_functional_roi_' f'{pipe_num}') @@ -816,28 +816,16 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow = create_roi_mask_dataflow( - cfg.timeseries_extraction['tse_atlases']['Avg'], - f'roi_dataflow_{pipe_num}') - - roi_dataflow.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries = get_roi_timeseries(f'roi_timeseries_{pipe_num}') #roi_timeseries.inputs.inputspec.output_type = cfg.timeseries_extraction[ # 'roi_tse_outputs'] node, out = strat_pool.get_data("space-template_desc-preproc_bold") wf.connect(node, out, resample_functional_roi, 'in_func') - - wf.connect(roi_dataflow, 'outputspec.out_file', - resample_functional_roi, 'in_roi') + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-Avg', pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_functional_roi, 'in_roi') # connect it to the roi_timeseries - # workflow.connect(roi_dataflow, 'outputspec.out_file', - # roi_timeseries, 'input_roi.roi') wf.connect(resample_functional_roi, 'out_roi', roi_timeseries, 'input_roi.roi') wf.connect(resample_functional_roi, 'out_func', @@ -865,32 +853,11 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): method=cm_measure, pipe_num=pipe_num ) - brain_mask_node, brain_mask_out = strat_pool.get_data([ - 'space-template_desc-brain_mask']) - if 'func_to_ROI' in realignment: - resample_brain_mask_roi = pe.Node( - resample_function(), - name=f'resample_brain_mask_roi_{pipe_num}') - resample_brain_mask_roi.inputs.realignment = realignment - resample_brain_mask_roi.inputs.identity_matrix = ( - cfg.registration_workflows['functional_registration'][ - 'func_registration_to_template' - ]['FNIRT_pipelines']['identity_matrix']) - wf.connect([ - (brain_mask_node, resample_brain_mask_roi, [ - (brain_mask_out, 'in_func')]), - (roi_dataflow, resample_brain_mask_roi, [ - ('outputspec.out_file', 'in_roi')]), - (resample_brain_mask_roi, timeseries_correlation, [ - ('out_func', 'inputspec.mask')])]) - else: - wf.connect(brain_mask_node, brain_mask_out, - timeseries_correlation, 'inputspec.mask') timeseries_correlation.inputs.inputspec.method = cm_measure wf.connect([ - (roi_dataflow, timeseries_correlation, [ - ('outputspec.out_name', 'inputspec.atlas_name')]), + (fork_atlases, timeseries_correlation, [ + ('atlas_name', 'inputspec.atlas_name')]), (resample_functional_roi, timeseries_correlation, [ ('out_roi', 'inputspec.in_rois'), ('out_func', 'inputspec.in_file')])]) @@ -903,7 +870,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'space-template_desc-Mean_timeseries': ( roi_timeseries, 'outputspec.roi_csv'), - 'atlas_name': (roi_dataflow, 'outputspec.out_name'), + 'atlas_name': (fork_atlases, 'atlas_name'), **matrix_outputs } # - NDMG @@ -924,9 +891,9 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): mem_x=(1928411764134803 / 302231454903657293676544, 'ts')) wf.connect(roi_timeseries, 'outputspec.roi_ts', ndmg_graph, 'ts') - wf.connect(roi_dataflow, 'outputspec.out_file', ndmg_graph, 'labels') - outputs['space-template_desc-ndmg_correlations' - ] = (ndmg_graph, 'out_file') + wf.connect(fork_atlases, 'atlas_file', ndmg_graph, 'labels') + outputs['space-template_desc-ndmg_correlations'] = (ndmg_graph, + 'out_file') return (wf, outputs) @@ -938,11 +905,13 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [("atlas-tse-Voxel", "atlas-tse-Voxel_name"), + "space-template_desc-preproc_bold"], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-tse-Voxel') is False: + return wf, {} resample_functional_to_mask = pe.Node(resample_function(), name='resample_functional_to_mask_' f'{pipe_num}') @@ -953,34 +922,27 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - mask_dataflow = create_roi_mask_dataflow(cfg.timeseries_extraction[ - 'tse_atlases']['Voxel'], - f'mask_dataflow_{pipe_num}') - voxel_timeseries = get_voxel_timeseries( f'voxel_timeseries_{pipe_num}') #voxel_timeseries.inputs.inputspec.output_type = cfg.timeseries_extraction[ # 'roi_tse_outputs'] - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to mask - wf.connect(node, out, - resample_functional_to_mask, 'in_func') - wf.connect(mask_dataflow, 'outputspec.out_file', - resample_functional_to_mask, 'in_roi') + wf.connect(node, out, resample_functional_to_mask, 'in_func') + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-Voxel', pipe_num) + wf.connect(fork_atlases, 'atlas_file', + resample_functional_to_mask, 'in_roi') # connect it to the voxel_timeseries wf.connect(resample_functional_to_mask, 'out_roi', - voxel_timeseries, 'input_mask.mask') + voxel_timeseries, 'input_mask.mask') wf.connect(resample_functional_to_mask, 'out_func', - voxel_timeseries, 'inputspec.rest') + voxel_timeseries, 'inputspec.rest') outputs = { - 'desc-Voxel_timeseries': - (voxel_timeseries, 'outputspec.mask_outputs'), - 'atlas_name': (mask_dataflow, 'outputspec.out_name') - } + 'desc-Voxel_timeseries': (voxel_timeseries, 'outputspec.mask_outputs'), + 'atlas_name': (fork_atlases, 'atlas_name')} return (wf, outputs) @@ -999,12 +961,14 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", - "space-template_desc-bold_mask"], + "inputs": [("atlas-tse-SpatialReg", "atlas-tse-SpatialReg_name"), + ("space-template_desc-preproc_bold", + "space-template_desc-bold_mask")], "outputs": ["desc-SpatReg_timeseries", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-tse-SpatialReg') is False: + return wf, {} resample_spatial_map_to_native_space = pe.Node( interface=fsl.FLIRT(), name=f'resample_spatial_map_to_native_space_{pipe_num}', @@ -1018,24 +982,18 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'func_registration_to_template']['FNIRT_pipelines'][ 'identity_matrix']) - spatial_map_dataflow = create_spatial_map_dataflow( - cfg.timeseries_extraction['tse_atlases']['SpatialReg'], - f'spatial_map_dataflow_{pipe_num}') - - spatial_map_dataflow.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path']) - spatial_map_timeseries = get_spatial_map_timeseries( f'spatial_map_timeseries_{pipe_num}') spatial_map_timeseries.inputs.inputspec.demean = True - node, out = strat_pool.get_data("space-template_desc-preproc_bold") + node, out = strat_pool.get_data('space-template_desc-preproc_bold') # resample the input functional file and functional mask # to spatial map wf.connect(node, out, resample_spatial_map_to_native_space, 'reference') - wf.connect(spatial_map_dataflow, 'select_spatial_map.out_file', + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-SpatialReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_spatial_map_to_native_space, 'in_file') wf.connect(node, out, spatial_map_timeseries, 'inputspec.subject_rest') @@ -1049,9 +1007,8 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): # 'atlas_name' will be an iterable and will carry through outputs = { - 'desc-SpatReg_timeseries': - (spatial_map_timeseries, 'outputspec.subject_timeseries'), - 'atlas_name': (spatial_map_dataflow, 'select_spatial_map.out_name') - } + 'desc-SpatReg_timeseries': (spatial_map_timeseries, + 'outputspec.subject_timeseries'), + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 57793faa03..4e8bca76af 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -16,17 +16,21 @@ # License along with C-PAC. If not, see . import csv import json +import os import re +from itertools import chain from typing import Tuple from nipype import logging from nipype.interfaces import utility as util from CPAC.pipeline import nipype_pipeline_engine as pe +from CPAC.pipeline.schema import valid_options from CPAC.resources.templates.lookup_table import format_identifier, \ lookup_identifier from CPAC.utils import function from CPAC.utils.bids_utils import bids_remove_entity +from CPAC.utils.docs import docstring_parameter, list_items_unbracketed from CPAC.utils.interfaces.function import Function -from CPAC.utils.utils import get_scan_params +from CPAC.utils.utils import get_scan_params, insert_in_dict_of_lists logger = logging.getLogger('nipype.workflow') @@ -688,7 +692,7 @@ def create_check_for_s3_node(name, file_path, img_type='other', function=check_for_s3, as_module=True), iterfield=['file_path'], - name='check_for_s3_%s' % name) + name=f'check_for_s3_{name}') else: check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', @@ -697,7 +701,7 @@ def create_check_for_s3_node(name, file_path, img_type='other', output_names=['local_path'], function=check_for_s3, as_module=True), - name='check_for_s3_%s' % name) + name=f'check_for_s3_{name}') check_s3_node.inputs.set( file_path=file_path, @@ -921,7 +925,8 @@ def res_string_to_tuple(resolution): return (float(resolution.replace('mm', '')),) * 3 -def resolve_resolution(resolution, template, template_name, tag=None): +def resolve_resolution(resolution, template, template_name, tag=None, + resample_mode='Cu'): from nipype.interfaces import afni from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import check_for_s3 @@ -959,7 +964,7 @@ def resolve_resolution(resolution, template, template_name, tag=None): mem_x=(0.0115, 'in_file', 't')) resample.inputs.voxel_size = res_string_to_tuple(resolution) resample.inputs.outputtype = 'NIFTI_GZ' - resample.inputs.resample_mode = 'Cu' + resample.inputs.resample_mode = resample_mode resample.inputs.in_file = local_path resample.base_dir = '.' @@ -1219,6 +1224,105 @@ def create_grp_analysis_dataflow(wf_name='gp_dataflow'): return wf +@docstring_parameter(atlas_analyses=list_items_unbracketed(list( + chain.from_iterable([(f'atlas-tse-{option}_name', f'atlas-tse-{option}') + for option in valid_options['timeseries']['roi_paths']] + + [(f'atlas-sca-{option}_name', f'atlas-sca-{option}') + for option in valid_options['sca']['roi_paths']])))) +def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): + """ + Collects all the ROI atlases in a config, resamples them and adds + them to the resource pool. Outputs are dynamically generated during + graph build. + + Node Block: + {{"name": "gather_atlases", + "config": "None", + "switch": "None", + "option_key": "None", + "option_val": "None", + "inputs": ["bold"], + "outputs": [{atlas_analyses}]}} + """ + if cfg['timeseries_extraction', + 'run'] or cfg['seed_based_correlation_analysis', 'run']: + tse_atlases, sca_atlases = gather_extraction_maps(cfg) + atlases = set() + specified_atlases = {'tse': tse_atlases, 'sca': sca_atlases} + for config_key, _atlases in { + 'timeseries_extraction': tse_atlases, + 'seed_based_correlation_analysis': sca_atlases}.items(): + if cfg[config_key, 'run']: + # pylint: disable=consider-using-dict-items + for atlas in [atlas for analysis_type in _atlases for atlas + in _atlases[analysis_type]]: + atlases.add(atlas) + outputs = {} + for atlas in atlases: + atlas_name = get_atlas_name(atlas) + atlas_name_node = pe.Node(util.IdentityInterface( + fields=['atlas_name'], mandatory_inputs=True), + name=f'{atlas_name}_name') + atlas_name_node.inputs.atlas_name = atlas_name + resampled = pe.Node(Function(input_names=['resolution', + 'template', + 'template_name', + 'tag', + 'resample_mode'], + output_names=['local_path'], + function=resolve_resolution, + as_module=True), + name=f'resampled_{atlas_name}') + resampled.inputs.template = atlas + resampled.inputs.resolution = cfg[ + "registration_workflows", "functional_registration", + "func_registration_to_template", "output_resolution", + "func_preproc_outputs"] + resampled.inputs.tag = 'func_preproc_outputs' + resampled.inputs.template_name = atlas_name + resampled.inputs.resample_mode = 'NN' + for spec, _specified in specified_atlases.items(): + for analysis, _atlases in _specified.items(): + if atlas in _atlases: + outputs = insert_in_dict_of_lists( + outputs, f'atlas-{spec}-{analysis}_name', + (atlas_name_node, 'atlas_name')) + outputs = insert_in_dict_of_lists( + outputs, f'atlas-{spec}-{analysis}', + (resampled, 'local_path')) + return wf, outputs + + +def get_atlas_name(atlas): + """Get a resource name for an atlas + + Parameters + ---------- + atlas : str + + Returns + ------- + str + """ + mask_file = atlas.rstrip('\r\n') + name, desc = lookup_identifier(mask_file) + if name == 'template': + base_file = os.path.basename(mask_file) + try: + valid_extensions = ['.nii', '.nii.gz'] + base_name = [base_file[:-len(ext)] for ext in valid_extensions + if base_file.endswith(ext)][0] + except IndexError: + # pylint: disable=raise-missing-from + raise ValueError( + f'File extension of {base_file} not ".nii" or ".nii.gz"') + except Exception as e: + raise e + else: + base_name = format_identifier(name, desc) + return base_name + + def resample_func_roi(in_func, in_roi, realignment, identity_matrix): import os, subprocess import nibabel as nb @@ -1264,3 +1368,32 @@ def resample_func_roi(in_func, in_roi, realignment, identity_matrix): out_roi = in_roi return out_func, out_roi + + +def roi_input_node(wf, strat_pool, atlas_key, pipe_num): + """Create a utility node for forking atlases + + Parameters + ---------- + wf : pe.Workflow + + strat_pool : ResourcePool + + atlas_key : str + + pipe_num : int + + Returns + ------- + pe.Node + """ + roi_atlas = strat_pool.node_data(atlas_key) + atlas_name = strat_pool.node_data(f'{atlas_key}_name') + node = pe.MapNode(util.IdentityInterface(fields=['atlas_name', + 'atlas_file']), + name=f'roi_{atlas_key}_{pipe_num}', + iterfield=['atlas_name', 'atlas_file']) + wf.connect([ + (roi_atlas.node, node, [(roi_atlas.out, 'atlas_file')]), + (atlas_name.node, node, [(atlas_name.out, 'atlas_name')])]) + return node diff --git a/CPAC/utils/docs.py b/CPAC/utils/docs.py index 466f849afe..b0c5ca7baa 100644 --- a/CPAC/utils/docs.py +++ b/CPAC/utils/docs.py @@ -78,4 +78,31 @@ def grab_docstring_dct(fn): return dct +def list_items_unbracketed(full_list, inner_quotes='"'): + """ + Takes a list of items and returns a string representing that list + without the containing brackets. + + Parameters + ---------- + full_list : list + + inner_quotes : str, optional + '"' or "'" + + Examples + -------- + >>> list_items_unbracketed(['a', 'b', 'c']) + '"a", "b", "c"' + >>> list_items_unbracketed(['a', 'b', 'c'], "'") + "'a', 'b', 'c'" + >>> list_items_unbracketed(["a", "b", "c"]) + '"a", "b", "c"' + >>> list_items_unbracketed(["a", "b", "c"], "'") + "'a', 'b', 'c'" + """ + outer_quotes = '"' if inner_quotes == "'" else "'" + return str(full_list).lstrip('[').replace(outer_quotes, inner_quotes).rstrip(']') + + DOCS_URL_PREFIX = _docs_url_prefix() diff --git a/CPAC/utils/utils.py b/CPAC/utils/utils.py index 0f6423d849..135d38f833 100644 --- a/CPAC/utils/utils.py +++ b/CPAC/utils/utils.py @@ -22,12 +22,12 @@ import json import numbers import pickle +from copy import deepcopy +from itertools import repeat +from typing import Any import numpy as np import yaml - from click import BadParameter -from copy import deepcopy -from itertools import repeat from voluptuous.error import Invalid from CPAC.pipeline import ALL_PIPELINE_CONFIGS, AVAILABLE_PIPELINE_CONFIGS @@ -1813,6 +1813,42 @@ def concat_list(in_list1=None, in_list2=None): return out_list +def insert_in_dict_of_lists(dict_of_lists: dict, key: str, value: Any) -> dict: + ''' + Function to insert a value into a list at a key in a dict, + creating the list if not yet present + + Parameters + ---------- + dict_of_lists : dict of lists + + key : str + + value : any + + Returns + ------- + dict_of_lists : dict + + Examples + -------- + >>> d = {'a': 100} + >>> insert_in_dict_of_lists(d, 'a', 30) + {'a': [100, 30]} + >>> insert_in_dict_of_lists(d, 'a', 30) + {'a': [100, 30, 30]} + >>> insert_in_dict_of_lists(d, 'b', 30) + {'a': [100, 30, 30], 'b': [30]} + ''' + if key not in dict_of_lists: + dict_of_lists[key] = [value] + else: + if not isinstance(dict_of_lists[key], list): + dict_of_lists[key] = [dict_of_lists[key]] + dict_of_lists[key].append(value) + return dict_of_lists + + def list_item_replace(l, # noqa: E741 # pylint: disable=invalid-name old, new): '''Function to replace an item in a list