From eca7a540331b380b9d74596a2023ac97a5476d30 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Tue, 7 Feb 2023 16:53:41 -0500 Subject: [PATCH 01/23] :fire: DRY ROI ingress function --- CPAC/sca/sca.py | 4 +- CPAC/timeseries/timeseries_analysis.py | 3 +- CPAC/utils/datasource.py | 81 +------------------------- 3 files changed, 5 insertions(+), 83 deletions(-) diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index e8a1ca02d9..59ca0cfe4a 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -9,7 +9,7 @@ from CPAC.sca.utils import * from CPAC.utils.utils import extract_one_d from CPAC.utils.datasource import resample_func_roi, \ - create_roi_mask_dataflow, create_spatial_map_dataflow + create_roi_mask_dataflow from CPAC.timeseries.timeseries_analysis import get_roi_timeseries, \ get_spatial_map_timeseries @@ -490,7 +490,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'identity_matrix'] ) - spatial_map_dataflow_for_dr = create_spatial_map_dataflow( + spatial_map_dataflow_for_dr = create_roi_mask_dataflow( cfg.seed_based_correlation_analysis['sca_atlases']['DualReg'], f'spatial_map_dataflow_for_DR_{pipe_num}' ) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 2cf42a0e72..1d619912af 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -7,7 +7,6 @@ get_connectome_method from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import create_roi_mask_dataflow, \ - create_spatial_map_dataflow, \ resample_func_roi @@ -984,7 +983,7 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'func_registration_to_template']['FNIRT_pipelines'][ 'identity_matrix']) - spatial_map_dataflow = create_spatial_map_dataflow( + spatial_map_dataflow = create_roi_mask_dataflow( cfg.timeseries_extraction['tse_atlases']['SpatialReg'], f'spatial_map_dataflow_{pipe_num}') diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index ca4082b810..db6c6421b6 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1033,9 +1033,8 @@ def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): except IndexError: # pylint: disable=raise-missing-from - raise ValueError('Error in spatial_map_dataflow: File ' - f'extension of {base_file} not ".nii" or ' - '.nii.gz') + raise ValueError(f'Error in {wf_name}: File extension ' + f'of {base_file} not ".nii" or ".nii.gz"') except Exception as e: raise e @@ -1090,82 +1089,6 @@ def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): return wf -def create_spatial_map_dataflow(spatial_maps, wf_name='datasource_maps'): - import os - - wf = pe.Workflow(name=wf_name) - - spatial_map_dict = {} - - for spatial_map_file in spatial_maps: - - spatial_map_file = spatial_map_file.rstrip('\r\n') - base_file = os.path.basename(spatial_map_file) - - try: - valid_extensions = ['.nii', '.nii.gz'] - - base_name = [ - base_file[:-len(ext)] - for ext in valid_extensions - if base_file.endswith(ext) - ][0] - - if base_name in spatial_map_dict: - raise ValueError( - 'Files with same name not allowed: %s %s' % ( - spatial_map_file, - spatial_map_dict[base_name] - ) - ) - - spatial_map_dict[base_name] = spatial_map_file - - except IndexError as e: - raise Exception('Error in spatial_map_dataflow: ' - 'File extension not in .nii and .nii.gz') - - inputnode = pe.Node(util.IdentityInterface(fields=['spatial_map', - 'spatial_map_file', - 'creds_path', - 'dl_dir'], - mandatory_inputs=True), - name='inputspec') - - spatial_map_keys, spatial_map_values = \ - zip(*spatial_map_dict.items()) - - inputnode.synchronize = True - inputnode.iterables = [ - ('spatial_map', spatial_map_keys), - ('spatial_map_file', spatial_map_values), - ] - - check_s3_node = pe.Node(function.Function(input_names=['file_path', - 'creds_path', - 'dl_dir', - 'img_type'], - output_names=['local_path'], - function=check_for_s3, - as_module=True), - name='check_for_s3') - - wf.connect(inputnode, 'spatial_map_file', check_s3_node, 'file_path') - wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') - wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') - check_s3_node.inputs.img_type = 'mask' - - select_spatial_map = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name'], - mandatory_inputs=True), - name='select_spatial_map') - - wf.connect(check_s3_node, 'local_path', select_spatial_map, 'out_file') - wf.connect(inputnode, 'spatial_map', select_spatial_map, 'out_name') - - return wf - - def create_grp_analysis_dataflow(wf_name='gp_dataflow'): from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util From 80d19e98754fd4e2a02058f2450203813553ee30 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Wed, 8 Feb 2023 16:58:54 -0500 Subject: [PATCH 02/23] fixup! :fire: DRY ROI ingress function --- CPAC/sca/sca.py | 4 ++-- CPAC/timeseries/timeseries_analysis.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index 59ca0cfe4a..d8545ca4a7 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -513,7 +513,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): wf.connect(node, out, spatial_map_timeseries_for_dr, 'inputspec.subject_rest') - wf.connect(spatial_map_dataflow_for_dr, 'select_spatial_map.out_file', + wf.connect(spatial_map_dataflow_for_dr, 'outputspec.out_file', resample_spatial_map_to_native_space_for_dr, 'in_file') # connect it to the spatial_map_timeseries @@ -541,7 +541,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'desc-DualReg_statmap': (dr_temp_reg, 'outputspec.temp_reg_map_z'), 'atlas_name': - (spatial_map_dataflow_for_dr, 'select_spatial_map.out_name') + (spatial_map_dataflow_for_dr, 'outputspec.out_name') } return (wf, outputs) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 1d619912af..9e92e34aff 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -1000,7 +1000,7 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): # resample the input functional file and functional mask # to spatial map wf.connect(node, out, resample_spatial_map_to_native_space, 'reference') - wf.connect(spatial_map_dataflow, 'select_spatial_map.out_file', + wf.connect(spatial_map_dataflow, 'outputspec.out_file', resample_spatial_map_to_native_space, 'in_file') wf.connect(node, out, spatial_map_timeseries, 'inputspec.subject_rest') @@ -1016,7 +1016,7 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'desc-SpatReg_timeseries': (spatial_map_timeseries, 'outputspec.subject_timeseries'), - 'atlas_name': (spatial_map_dataflow, 'select_spatial_map.out_name') + 'atlas_name': (spatial_map_dataflow, 'outputspec.out_name') } return (wf, outputs) From 8349365ed7d7cb452a35925bcc2c2163e9e54a5f Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 11:09:33 -0500 Subject: [PATCH 03/23] :recycle: Create `gather_atlases` NodeBlock --- CPAC/utils/datasource.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index db6c6421b6..e07710519b 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1005,6 +1005,45 @@ def create_anat_datasource(wf_name='anat_datasource'): return wf +def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): + """ + Collects all the ROI atlases in a config, resamples them and adds + them to the resource pool + + Node Block: + {"name": "gather_atlases", + "config": "None", + "switch": "None", + "option_key": "None", + "option_val": "None", + "inputs": "None", + "outputs": ['atlas-file', 'atlas-name']} + """ + outputs = {} + if cfg['timeseries_extraction', + 'run'] or cfg['seed_based_correlation_analysis', 'run']: + tse_atlases, sca_atlases = gather_extraction_maps(cfg) + atlases = [] + if cfg['timeseries_extraction', 'run']: + atlases += tse_atlases + if cfg['seed_based_correlation_analysis', 'run']: + atlases += sca_atlases + gather = create_roi_mask_dataflow(atlases, f'gather_rois_{pipe_num}') + outputs['atlas-name'] = (gather, 'outputspec.out_name') + if 'func_to_ROI' in cfg['timeseries_extraction', 'realignment']: + # realign to output res + resample_ROI = pe.Node() + resample_ROI.inputs.resolution = cfg[ + 'registration_workflows', 'functional_registration', + 'func_registration_to_template', 'output_resolution', + 'func_preproc_outputs'] + wf.connect(gather, 'outputspec.out_file', resample_ROI, 'infile') + outputs['atlas-file'] = (resample_ROI, 'out_file') + else: + outputs['atlas-file'] = (gather, 'outputspec.out_file') + return wf, outputs + + def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): import os From 5cbe7029800bd16cd7a08f5e2dde4e7e4a88b131 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 14:31:06 -0500 Subject: [PATCH 04/23] :construction: WIP :recycle: Connect `gather_atlases` NodeBlock --- CPAC/anat_preproc/anat_preproc.py | 9 ++++---- CPAC/timeseries/timeseries_analysis.py | 30 +++++++++++--------------- CPAC/utils/datasource.py | 30 +++++++++++++++----------- CPAC/utils/docs.py | 27 +++++++++++++++++++++++ 4 files changed, 61 insertions(+), 35 deletions(-) diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py index 78fbda4251..64741fb998 100644 --- a/CPAC/anat_preproc/anat_preproc.py +++ b/CPAC/anat_preproc/anat_preproc.py @@ -18,7 +18,7 @@ fslmaths_command, \ VolumeRemoveIslands from CPAC.pipeline.engine import flatten_list -from CPAC.utils.docs import docstring_parameter +from CPAC.utils.docs import docstring_parameter, list_items_unbracketed from CPAC.utils.interfaces.fsl import Merge as fslMerge from CPAC.utils.interfaces.function.seg_preproc import \ pick_tissue_from_labels_file_interface @@ -2679,9 +2679,8 @@ def freesurfer_postproc(wf, cfg, strat_pool, pipe_num, opt=None): # we're grabbing the postproc outputs and appending them to # the reconall outputs -@docstring_parameter(postproc_outputs=str(flatten_list( - freesurfer_postproc, 'outputs') - ).lstrip('[').replace("'", '"')) +@docstring_parameter(postproc_outputs=list_items_unbracketed( + flatten_list(freesurfer_postproc, 'outputs'))) def freesurfer_reconall(wf, cfg, strat_pool, pipe_num, opt=None): ''' {{"name": "freesurfer_reconall", @@ -2696,7 +2695,7 @@ def freesurfer_reconall(wf, cfg, strat_pool, pipe_num, opt=None): "brainmask", "wmparc", "T1", - {postproc_outputs}}} + {postproc_outputs}]}} ''' reconall = pe.Node(interface=freesurfer.ReconAll(), diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 9e92e34aff..b91678e721 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -765,7 +765,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [("atlas-config-path", "atlas-file", "atlas-name"), + "space-template_desc-preproc_bold"], "outputs": ["space-template_desc-Mean_timeseries", "space-template_space-template_desc-ndmg_correlations", "atlas_name", @@ -774,6 +775,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "space-template_desc-PearsonNilearn_correlations", "space-template_desc-PartialNilearn_correlations"]} ''' + resample_functional_roi = pe.Node(Function(input_names=['in_func', 'in_roi', 'realignment', @@ -791,15 +793,6 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow = create_roi_mask_dataflow( - cfg.timeseries_extraction['tse_atlases']['Avg'], - f'roi_dataflow_{pipe_num}') - - roi_dataflow.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries = get_roi_timeseries(f'roi_timeseries_{pipe_num}') #roi_timeseries.inputs.inputspec.output_type = cfg.timeseries_extraction[ # 'roi_tse_outputs'] @@ -807,12 +800,12 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-preproc_bold") wf.connect(node, out, resample_functional_roi, 'in_func') - wf.connect(roi_dataflow, 'outputspec.out_file', + roi_atlas = strat_pool.node_data("atlas-file") + wf.connect(roi_atlas.node, roi_atlas.out, resample_functional_roi, 'in_roi') + atlas_name = strat_pool.node_data("atlas-name") # connect it to the roi_timeseries - # workflow.connect(roi_dataflow, 'outputspec.out_file', - # roi_timeseries, 'input_roi.roi') wf.connect(resample_functional_roi, 'out_roi', roi_timeseries, 'input_roi.roi') wf.connect(resample_functional_roi, 'out_func', @@ -847,8 +840,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): timeseries_correlation.inputs.inputspec.method = cm_measure wf.connect([ - (roi_dataflow, timeseries_correlation, [ - ('outputspec.out_name', 'inputspec.atlas_name')]), + (atlas_name.node, timeseries_correlation, [ + (atlas_name.out, 'inputspec.atlas_name')]), (resample_functional_roi, timeseries_correlation, [ ('out_roi', 'inputspec.in_rois'), ('out_func', 'inputspec.in_file')])]) @@ -861,7 +854,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'space-template_desc-Mean_timeseries': ( roi_timeseries, 'outputspec.roi_csv'), - 'atlas_name': (roi_dataflow, 'outputspec.out_name'), + 'atlas_name': (atlas_name.node, atlas_name.out), **matrix_outputs } # - NDMG @@ -882,7 +875,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): mem_x=(1928411764134803 / 302231454903657293676544, 'ts')) wf.connect(roi_timeseries, 'outputspec.roi_ts', ndmg_graph, 'ts') - wf.connect(roi_dataflow, 'outputspec.out_file', ndmg_graph, 'labels') + wf.connect(roi_atlas.node, roi_atlas.out, ndmg_graph, 'labels') outputs['space-template_space-template_desc-ndmg_correlations' ] = (ndmg_graph, 'out_file') @@ -896,7 +889,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [("atlas-config-path", "atlas-file", "atlas-name"), + "space-template_desc-preproc_bold"], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index e07710519b..8b6a72bed6 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -21,9 +21,11 @@ from nipype import logging from nipype.interfaces import utility as util from CPAC.pipeline import nipype_pipeline_engine as pe +from CPAC.pipeline.schema import valid_options from CPAC.resources.templates.lookup_table import format_identifier, \ lookup_identifier from CPAC.utils import function +from CPAC.utils.docs import docstring_parameter, list_items_unbracketed from CPAC.utils.interfaces.function import Function from CPAC.utils.utils import get_scan_params @@ -1005,21 +1007,23 @@ def create_anat_datasource(wf_name='anat_datasource'): return wf +@docstring_parameter(roi_analyses=list_items_unbracketed( + [f'tse-{option}' for option in valid_options['timeseries']['roi_paths']] + + [f'sca-{option}' for option in valid_options['sca']['roi_paths']])) def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): """ Collects all the ROI atlases in a config, resamples them and adds them to the resource pool Node Block: - {"name": "gather_atlases", - "config": "None", - "switch": "None", - "option_key": "None", - "option_val": "None", - "inputs": "None", - "outputs": ['atlas-file', 'atlas-name']} + {{"name": "gather_atlases", + "config": "None", + "switch": "None", + "option_key": "None", + "option_val": "None", + "inputs": "None", + "outputs": ['atlas-file', 'atlas-name', {roi_analyses}]}} """ - outputs = {} if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: tse_atlases, sca_atlases = gather_extraction_maps(cfg) @@ -1029,7 +1033,8 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): if cfg['seed_based_correlation_analysis', 'run']: atlases += sca_atlases gather = create_roi_mask_dataflow(atlases, f'gather_rois_{pipe_num}') - outputs['atlas-name'] = (gather, 'outputspec.out_name') + outputs = {'atlas-config-path': (gather, 'outputspec.config_path'), + 'atlas-name': (gather, 'outputspec.out_name')} if 'func_to_ROI' in cfg['timeseries_extraction', 'realignment']: # realign to output res resample_ROI = pe.Node() @@ -1050,7 +1055,7 @@ def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): mask_dict = {} for mask_file in masks: - + config_path = mask_file mask_file = mask_file.rstrip('\r\n') if mask_file.strip() == '' or mask_file.startswith('#'): @@ -1119,9 +1124,10 @@ def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): check_s3_node.inputs.img_type = 'mask' outputnode = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name']), + 'out_name', + 'config_path']), name='outputspec') - + outputnode.config_path = config_path wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') wf.connect(inputnode, 'mask', outputnode, 'out_name') diff --git a/CPAC/utils/docs.py b/CPAC/utils/docs.py index 466f849afe..b0c5ca7baa 100644 --- a/CPAC/utils/docs.py +++ b/CPAC/utils/docs.py @@ -78,4 +78,31 @@ def grab_docstring_dct(fn): return dct +def list_items_unbracketed(full_list, inner_quotes='"'): + """ + Takes a list of items and returns a string representing that list + without the containing brackets. + + Parameters + ---------- + full_list : list + + inner_quotes : str, optional + '"' or "'" + + Examples + -------- + >>> list_items_unbracketed(['a', 'b', 'c']) + '"a", "b", "c"' + >>> list_items_unbracketed(['a', 'b', 'c'], "'") + "'a', 'b', 'c'" + >>> list_items_unbracketed(["a", "b", "c"]) + '"a", "b", "c"' + >>> list_items_unbracketed(["a", "b", "c"], "'") + "'a', 'b', 'c'" + """ + outer_quotes = '"' if inner_quotes == "'" else "'" + return str(full_list).lstrip('[').replace(outer_quotes, inner_quotes).rstrip(']') + + DOCS_URL_PREFIX = _docs_url_prefix() From 03b5591fa43d66cc942bf6c005fbff90a5d06bde Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 16:50:38 -0500 Subject: [PATCH 05/23] :construction: WIP :recycle: Connect TSE Node Blocks to `gather_atlases` --- CPAC/timeseries/timeseries_analysis.py | 52 +++----- CPAC/utils/datasource.py | 170 ++++++++++++++++++------- 2 files changed, 138 insertions(+), 84 deletions(-) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index b91678e721..b02f5820a3 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -6,8 +6,7 @@ create_connectome_nilearn, \ get_connectome_method from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.utils.datasource import create_roi_mask_dataflow, \ - resample_func_roi +from CPAC.utils.datasource import resample_func_roi def get_voxel_timeseries(wf_name='voxel_timeseries'): @@ -765,7 +764,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-config-path", "atlas-file", "atlas-name"), + "inputs": [("atlas-tse-Avg", "atlas-name"), "space-template_desc-preproc_bold"], "outputs": ["space-template_desc-Mean_timeseries", "space-template_space-template_desc-ndmg_correlations", @@ -775,7 +774,6 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "space-template_desc-PearsonNilearn_correlations", "space-template_desc-PartialNilearn_correlations"]} ''' - resample_functional_roi = pe.Node(Function(input_names=['in_func', 'in_roi', 'realignment', @@ -784,7 +782,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): 'out_roi'], function=resample_func_roi, as_module=True), - name=f'resample_functional_roi_' + name='resample_functional_roi_' f'{pipe_num}') resample_functional_roi.inputs.realignment = cfg.timeseries_extraction[ @@ -800,7 +798,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-preproc_bold") wf.connect(node, out, resample_functional_roi, 'in_func') - roi_atlas = strat_pool.node_data("atlas-file") + roi_atlas = strat_pool.node_data("atlas-tse-Avg") wf.connect(roi_atlas.node, roi_atlas.out, resample_functional_roi, 'in_roi') atlas_name = strat_pool.node_data("atlas-name") @@ -889,12 +887,11 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-config-path", "atlas-file", "atlas-name"), + "inputs": [("atlas-tse-Voxel", "atlas-name"), "space-template_desc-preproc_bold"], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' - resample_functional_to_mask = pe.Node(Function(input_names=['in_func', 'in_roi', 'realignment', @@ -903,7 +900,7 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): 'out_roi'], function=resample_func_roi, as_module=True), - name=f'resample_functional_to_mask_' + name='resample_functional_to_mask_' f'{pipe_num}') resample_functional_to_mask.inputs.realignment = cfg.timeseries_extraction[ @@ -912,34 +909,27 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - mask_dataflow = create_roi_mask_dataflow(cfg.timeseries_extraction[ - 'tse_atlases']['Voxel'], - f'mask_dataflow_{pipe_num}') - voxel_timeseries = get_voxel_timeseries( f'voxel_timeseries_{pipe_num}') #voxel_timeseries.inputs.inputspec.output_type = cfg.timeseries_extraction[ # 'roi_tse_outputs'] - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to mask - wf.connect(node, out, - resample_functional_to_mask, 'in_func') - wf.connect(mask_dataflow, 'outputspec.out_file', - resample_functional_to_mask, 'in_roi') + wf.connect(node, out, resample_functional_to_mask, 'in_func') + wf.connect(strat_pool.get_data('atlas-tse-Voxel'), + resample_functional_to_mask, 'in_roi') # connect it to the voxel_timeseries wf.connect(resample_functional_to_mask, 'out_roi', - voxel_timeseries, 'input_mask.mask') + voxel_timeseries, 'input_mask.mask') wf.connect(resample_functional_to_mask, 'out_func', - voxel_timeseries, 'inputspec.rest') + voxel_timeseries, 'inputspec.rest') outputs = { 'desc-Voxel_timeseries': (voxel_timeseries, 'outputspec.mask_outputs'), - 'atlas_name': (mask_dataflow, 'outputspec.out_name') - } + 'atlas_name': strat_pool.get_data('atlas-name')} return (wf, outputs) @@ -958,7 +948,8 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", + "inputs": [("atlas-tse-SpatialReg", "atlas-name"), + "space-template_desc-preproc_bold", "space-template_desc-bold_mask"], "outputs": ["desc-SpatReg_timeseries", "atlas_name"]} @@ -977,24 +968,16 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'func_registration_to_template']['FNIRT_pipelines'][ 'identity_matrix']) - spatial_map_dataflow = create_roi_mask_dataflow( - cfg.timeseries_extraction['tse_atlases']['SpatialReg'], - f'spatial_map_dataflow_{pipe_num}') - - spatial_map_dataflow.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path']) - spatial_map_timeseries = get_spatial_map_timeseries( f'spatial_map_timeseries_{pipe_num}') spatial_map_timeseries.inputs.inputspec.demean = True - node, out = strat_pool.get_data("space-template_desc-preproc_bold") + node, out = strat_pool.get_data('space-template_desc-preproc_bold') # resample the input functional file and functional mask # to spatial map wf.connect(node, out, resample_spatial_map_to_native_space, 'reference') - wf.connect(spatial_map_dataflow, 'outputspec.out_file', + wf.connect(strat_pool.get_data("atlas-tse-SpatialReg"), resample_spatial_map_to_native_space, 'in_file') wf.connect(node, out, spatial_map_timeseries, 'inputspec.subject_rest') @@ -1010,7 +993,6 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'desc-SpatReg_timeseries': (spatial_map_timeseries, 'outputspec.subject_timeseries'), - 'atlas_name': (spatial_map_dataflow, 'outputspec.out_name') - } + 'atlas_name': strat_pool.get_data('atlas_name')} return (wf, outputs) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 8b6a72bed6..e92b60b1db 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -16,6 +16,7 @@ # License along with C-PAC. If not, see . import csv import json +import os import re from typing import Tuple from nipype import logging @@ -1007,13 +1008,15 @@ def create_anat_datasource(wf_name='anat_datasource'): return wf -@docstring_parameter(roi_analyses=list_items_unbracketed( - [f'tse-{option}' for option in valid_options['timeseries']['roi_paths']] - + [f'sca-{option}' for option in valid_options['sca']['roi_paths']])) +@docstring_parameter(atlas_analyses=list_items_unbracketed( + [f'atlas-tse-{option}' for option + in valid_options['timeseries']['roi_paths']] + + [f'atlas-sca-{option}' for option in valid_options['sca']['roi_paths']])) def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): """ Collects all the ROI atlases in a config, resamples them and adds - them to the resource pool + them to the resource pool. Outputs are dynamically generated during + graph build. Node Block: {{"name": "gather_atlases", @@ -1021,8 +1024,8 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "switch": "None", "option_key": "None", "option_val": "None", - "inputs": "None", - "outputs": ['atlas-file', 'atlas-name', {roi_analyses}]}} + "inputs": ['space-template_bold'], + "outputs": ['atlas-name', {atlas_analyses}]}} """ if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: @@ -1032,63 +1035,132 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): atlases += tse_atlases if cfg['seed_based_correlation_analysis', 'run']: atlases += sca_atlases - gather = create_roi_mask_dataflow(atlases, f'gather_rois_{pipe_num}') - outputs = {'atlas-config-path': (gather, 'outputspec.config_path'), - 'atlas-name': (gather, 'outputspec.out_name')} - if 'func_to_ROI' in cfg['timeseries_extraction', 'realignment']: - # realign to output res - resample_ROI = pe.Node() - resample_ROI.inputs.resolution = cfg[ - 'registration_workflows', 'functional_registration', - 'func_registration_to_template', 'output_resolution', - 'func_preproc_outputs'] - wf.connect(gather, 'outputspec.out_file', resample_ROI, 'infile') - outputs['atlas-file'] = (resample_ROI, 'out_file') - else: - outputs['atlas-file'] = (gather, 'outputspec.out_file') + for atlas in atlases: + atlas_name = get_atlas_name(atlas) + gather = gather_atlas(atlas_name, atlas, pipe_num) + realignment = cfg['timeseries_extraction', 'realignment'] + if 'func_to_ROI' in cfg['timeseries_extraction', 'realignment']: + # realign to output res + resample_roi = pe.Node(Function(input_names=['in_func', + 'in_roi', + 'realignment', + 'identity_matrix'], + output_names=['out_func', + 'out_roi'], + function=resample_func_roi, + as_module=True), + name=f'resample_{atlas}_{pipe_num}') + resample_roi.inputs.identity_matrix = cfg[ + 'registration_workflows', 'functional_registration', + 'func_registration_to_template', 'FNIRT_pipelines', + 'identity_matrix'] + resample_roi.inputs.realignment = realignment + wf.connect(strat_pool.get_data('space-template_bold'), + resample_roi, 'in_func') + wf.connect(gather, 'outputspec.out_file', resample_roi, 'in_roi') + final_subnode, final_out = resample_roi, 'out_roi' + else: + final_subnode, final_out = gather, 'outputspec.out_file' + outputs = {'atlas-name': (gather, 'outputspec.out_name')} + for analysis, _atlases in tse_atlases.items(): + if atlas in _atlases: + outputs[f'atlas-tse-{analysis}'] = (final_subnode, final_out) + for analysis, _atlases in sca_atlases.items(): + if atlas in _atlases: + outputs[f'atlas-sca-{analysis}'] = (final_subnode, final_out) return wf, outputs -def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): - import os +def gather_atlas(atlas_name, atlas, pipe_num): + """ + Injects a single ROI atlas to the resource pool at the required + resolution. - mask_dict = {} + Parameters + ---------- + atlas_name, atlas_path : str - for mask_file in masks: - config_path = mask_file - mask_file = mask_file.rstrip('\r\n') + pipe_num : int - if mask_file.strip() == '' or mask_file.startswith('#'): - continue + Returns + ------- + pe.Workflow + """ + wf = pe.Workflow(name=f'gather_{atlas_name}_{pipe_num}') - name, desc = lookup_identifier(mask_file) + inputnode = pe.Node(util.IdentityInterface(fields=['mask', + 'mask_file', + 'creds_path', + 'dl_dir'], + mandatory_inputs=True), + name='inputspec') - if name == 'template': - base_file = os.path.basename(mask_file) + inputnode.inputs.mask = atlas_name + inputnode.inputs.mask_file = atlas - try: - valid_extensions = ['.nii', '.nii.gz'] + check_s3_node = pe.Node(function.Function(input_names=['file_path', + 'creds_path', + 'dl_dir', + 'img_type'], + output_names=['local_path'], + function=check_for_s3, + as_module=True), + name='check_for_s3') + check_s3_node.inputs.img_type = 'mask' + wf.connect([inputnode, check_s3_node, [('mask_file', 'file_path'), + ('creds_path', 'creds_path')]]) + wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') - base_name = [ - base_file[:-len(ext)] - for ext in valid_extensions - if base_file.endswith(ext) - ][0] + outputnode = pe.Node(util.IdentityInterface(fields=['out_file', + 'out_name', + 'config_path']), + name='outputspec') - except IndexError: - # pylint: disable=raise-missing-from - raise ValueError(f'Error in {wf_name}: File extension ' - f'of {base_file} not ".nii" or ".nii.gz"') + wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') + wf.connect(inputnode, 'mask', outputnode, 'out_name') + + return wf - except Exception as e: - raise e - else: - base_name = format_identifier(name, desc) - if base_name in mask_dict: - raise ValueError('Duplicate templates/atlases not allowed: ' - f'{mask_file} {mask_dict[base_name]}') +def get_atlas_name(atlas): + """Get a resource name for an atlas + + Parameters + ---------- + atlas : str + Returns + ------- + str + """ + mask_file = atlas.rstrip('\r\n') + name, desc = lookup_identifier(mask_file) + if name == 'template': + base_file = os.path.basename(mask_file) + try: + valid_extensions = ['.nii', '.nii.gz'] + base_name = [base_file[:-len(ext)] for ext in valid_extensions + if base_file.endswith(ext)][0] + except IndexError: + # pylint: disable=raise-missing-from + raise ValueError( + f'File extension of {base_file} not ".nii" or ".nii.gz"') + except Exception as e: + raise e + else: + base_name = format_identifier(name, desc) + return base_name + + +def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): + from CPAC.utils.datasource import get_atlas_name + mask_dict = {} + + for mask_file in masks: + config_path = mask_file + if mask_file.strip() == '' or mask_file.startswith('#'): + continue + base_name = get_atlas_name(mask_file) mask_dict[base_name] = mask_file wf = pe.Workflow(name=wf_name) From 43b5c17670fe13707adea75ecb638d39ffdbb6ab Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 16:51:31 -0500 Subject: [PATCH 06/23] :art: Resequence function definitions --- CPAC/utils/datasource.py | 206 +++++++++++++++++++-------------------- 1 file changed, 103 insertions(+), 103 deletions(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index e92b60b1db..853b2384f6 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1008,6 +1008,109 @@ def create_anat_datasource(wf_name='anat_datasource'): return wf +def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): + from CPAC.utils.datasource import get_atlas_name + mask_dict = {} + + for mask_file in masks: + config_path = mask_file + if mask_file.strip() == '' or mask_file.startswith('#'): + continue + base_name = get_atlas_name(mask_file) + mask_dict[base_name] = mask_file + + wf = pe.Workflow(name=wf_name) + + inputnode = pe.Node(util.IdentityInterface(fields=['mask', + 'mask_file', + 'creds_path', + 'dl_dir'], + mandatory_inputs=True), + name='inputspec') + + mask_keys, mask_values = \ + zip(*mask_dict.items()) + + inputnode.synchronize = True + inputnode.iterables = [ + ('mask', mask_keys), + ('mask_file', mask_values), + ] + + check_s3_node = pe.Node(function.Function(input_names=['file_path', + 'creds_path', + 'dl_dir', + 'img_type'], + output_names=['local_path'], + function=check_for_s3, + as_module=True), + name='check_for_s3') + + wf.connect(inputnode, 'mask_file', check_s3_node, 'file_path') + wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') + wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') + check_s3_node.inputs.img_type = 'mask' + + outputnode = pe.Node(util.IdentityInterface(fields=['out_file', + 'out_name', + 'config_path']), + name='outputspec') + outputnode.config_path = config_path + wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') + wf.connect(inputnode, 'mask', outputnode, 'out_name') + + return wf + + +def create_grp_analysis_dataflow(wf_name='gp_dataflow'): + from CPAC.pipeline import nipype_pipeline_engine as pe + import nipype.interfaces.utility as util + from CPAC.utils.datasource import select_model_files + + wf = pe.Workflow(name=wf_name) + + inputnode = pe.Node(util.IdentityInterface(fields=['ftest', + 'grp_model', + 'model_name'], + mandatory_inputs=True), + name='inputspec') + + selectmodel = pe.Node(function.Function(input_names=['model', + 'ftest', + 'model_name'], + output_names=['fts_file', + 'con_file', + 'grp_file', + 'mat_file'], + function=select_model_files, + as_module=True), + name='selectnode') + + wf.connect(inputnode, 'ftest', + selectmodel, 'ftest') + wf.connect(inputnode, 'grp_model', + selectmodel, 'model') + wf.connect(inputnode, 'model_name', selectmodel, 'model_name') + + outputnode = pe.Node(util.IdentityInterface(fields=['fts', + 'grp', + 'mat', + 'con'], + mandatory_inputs=True), + name='outputspec') + + wf.connect(selectmodel, 'mat_file', + outputnode, 'mat') + wf.connect(selectmodel, 'grp_file', + outputnode, 'grp') + wf.connect(selectmodel, 'fts_file', + outputnode, 'fts') + wf.connect(selectmodel, 'con_file', + outputnode, 'con') + + return wf + + @docstring_parameter(atlas_analyses=list_items_unbracketed( [f'atlas-tse-{option}' for option in valid_options['timeseries']['roi_paths']] @@ -1152,109 +1255,6 @@ def get_atlas_name(atlas): return base_name -def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): - from CPAC.utils.datasource import get_atlas_name - mask_dict = {} - - for mask_file in masks: - config_path = mask_file - if mask_file.strip() == '' or mask_file.startswith('#'): - continue - base_name = get_atlas_name(mask_file) - mask_dict[base_name] = mask_file - - wf = pe.Workflow(name=wf_name) - - inputnode = pe.Node(util.IdentityInterface(fields=['mask', - 'mask_file', - 'creds_path', - 'dl_dir'], - mandatory_inputs=True), - name='inputspec') - - mask_keys, mask_values = \ - zip(*mask_dict.items()) - - inputnode.synchronize = True - inputnode.iterables = [ - ('mask', mask_keys), - ('mask_file', mask_values), - ] - - check_s3_node = pe.Node(function.Function(input_names=['file_path', - 'creds_path', - 'dl_dir', - 'img_type'], - output_names=['local_path'], - function=check_for_s3, - as_module=True), - name='check_for_s3') - - wf.connect(inputnode, 'mask_file', check_s3_node, 'file_path') - wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') - wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') - check_s3_node.inputs.img_type = 'mask' - - outputnode = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name', - 'config_path']), - name='outputspec') - outputnode.config_path = config_path - wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') - wf.connect(inputnode, 'mask', outputnode, 'out_name') - - return wf - - -def create_grp_analysis_dataflow(wf_name='gp_dataflow'): - from CPAC.pipeline import nipype_pipeline_engine as pe - import nipype.interfaces.utility as util - from CPAC.utils.datasource import select_model_files - - wf = pe.Workflow(name=wf_name) - - inputnode = pe.Node(util.IdentityInterface(fields=['ftest', - 'grp_model', - 'model_name'], - mandatory_inputs=True), - name='inputspec') - - selectmodel = pe.Node(function.Function(input_names=['model', - 'ftest', - 'model_name'], - output_names=['fts_file', - 'con_file', - 'grp_file', - 'mat_file'], - function=select_model_files, - as_module=True), - name='selectnode') - - wf.connect(inputnode, 'ftest', - selectmodel, 'ftest') - wf.connect(inputnode, 'grp_model', - selectmodel, 'model') - wf.connect(inputnode, 'model_name', selectmodel, 'model_name') - - outputnode = pe.Node(util.IdentityInterface(fields=['fts', - 'grp', - 'mat', - 'con'], - mandatory_inputs=True), - name='outputspec') - - wf.connect(selectmodel, 'mat_file', - outputnode, 'mat') - wf.connect(selectmodel, 'grp_file', - outputnode, 'grp') - wf.connect(selectmodel, 'fts_file', - outputnode, 'fts') - wf.connect(selectmodel, 'con_file', - outputnode, 'con') - - return wf - - def resample_func_roi(in_func, in_roi, realignment, identity_matrix): import os, subprocess import nibabel as nb From 54b5c9470413bac86b3f13b7ba470d7ad7d381a8 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 17:12:04 -0500 Subject: [PATCH 07/23] :construction: WIP :recycle: Connect SCA to `gather_atlases` --- CPAC/anat_preproc/anat_preproc.py | 13 ++-- CPAC/sca/sca.py | 85 ++++++++------------------ CPAC/timeseries/timeseries_analysis.py | 22 +++---- CPAC/utils/datasource.py | 9 +-- 4 files changed, 51 insertions(+), 78 deletions(-) diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py index 64741fb998..086464356f 100644 --- a/CPAC/anat_preproc/anat_preproc.py +++ b/CPAC/anat_preproc/anat_preproc.py @@ -1411,9 +1411,10 @@ def acpc_align_brain_with_mask(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'desc-preproc_T1w': (acpc_align, 'outputspec.acpc_aligned_head'), 'desc-acpcbrain_T1w': (acpc_align, 'outputspec.acpc_aligned_brain'), - 'space-T1w_desc-brain_mask': (acpc_align, 'outputspec.acpc_brain_mask'), - 'space-T1w_desc-prebrain_mask': (strat_pool.get_data('space-T1w_desc-brain_mask')) - } + 'space-T1w_desc-brain_mask': (acpc_align, + 'outputspec.acpc_brain_mask'), + 'space-T1w_desc-prebrain_mask': strat_pool.get_data( + 'space-T1w_desc-brain_mask')} return (wf, outputs) @@ -2478,9 +2479,11 @@ def brain_extraction_T2(wf, cfg, strat_pool, pipe_num, opt=None): ["space-T2w_desc-brain_mask", "space-T2w_desc-acpcbrain_mask"])], "outputs": ["desc-brain_T2w"]} ''' - if cfg.anatomical_preproc['acpc_alignment']['run'] and cfg.anatomical_preproc['acpc_alignment']['acpc_target'] == 'brain': + if (cfg.anatomical_preproc['acpc_alignment']['run'] and + and cfg.anatomical_preproc['acpc_alignment']['acpc_target'] == 'brain' + ): outputs = { - 'desc-brain_T2w': (strat_pool.get_data(["desc-acpcbrain_T2w"])) + 'desc-brain_T2w': strat_pool.get_data(["desc-acpcbrain_T2w"]) } else: anat_skullstrip_orig_vol = pe.Node(interface=afni.Calc(), diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index d8545ca4a7..3e4abc18d4 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -7,9 +7,8 @@ import nipype.interfaces.utility as util from CPAC.sca.utils import * -from CPAC.utils.utils import extract_one_d -from CPAC.utils.datasource import resample_func_roi, \ - create_roi_mask_dataflow +# from CPAC.utils.utils import extract_one_d +from CPAC.utils.datasource import resample_func_roi from CPAC.timeseries.timeseries_analysis import get_roi_timeseries, \ get_spatial_map_timeseries @@ -389,7 +388,8 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [("atlas-sca-Avg", "atlas_name", + "space-template_desc-preproc_bold")], "outputs": ["desc-MeanSCA_timeseries", "space-template_desc-MeanSCA_correlations", "atlas_name"]} @@ -413,31 +413,20 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow_for_sca = create_roi_mask_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['Avg'], - f'roi_dataflow_for_sca_{pipe_num}' - ) - - roi_dataflow_for_sca.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries_for_sca = get_roi_timeseries( f'roi_timeseries_for_sca_{pipe_num}') - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to roi - wf.connect(node, out, - resample_functional_roi_for_sca, 'in_func') - wf.connect(roi_dataflow_for_sca, 'outputspec.out_file', - resample_functional_roi_for_sca, 'in_roi') + wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), + resample_functional_roi_for_sca, 'in_func') + wf.connect(*strat_pool.get_data("atlas-sca-Avg"), + resample_functional_roi_for_sca, 'in_roi') # connect it to the roi_timeseries wf.connect(resample_functional_roi_for_sca, 'out_roi', - roi_timeseries_for_sca, 'input_roi.roi') + roi_timeseries_for_sca, 'input_roi.roi') wf.connect(resample_functional_roi_for_sca, 'out_func', - roi_timeseries_for_sca, 'inputspec.rest') + roi_timeseries_for_sca, 'inputspec.rest') sca_roi = create_sca(f'sca_roi_{pipe_num}') @@ -455,7 +444,7 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): # extract_one_d)), 'space-template_desc-MeanSCA_correlations': (sca_roi, 'outputspec.correlation_stack'), - 'atlas_name': (roi_dataflow_for_sca, 'outputspec.out_name') + 'atlas_name': strat_pool.get_data('atlas_name') } return (wf, outputs) @@ -470,8 +459,9 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], - "space-template_desc-bold_mask"], + "inputs": [("atlas-sca-DualReg", "atlas_name", + "space-template_desc-preproc_bold" + "space-template_desc-bold_mask")], "outputs": ["space-template_desc-DualReg_correlations", "desc-DualReg_statmap", "atlas_name"]} @@ -490,16 +480,6 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): 'identity_matrix'] ) - spatial_map_dataflow_for_dr = create_roi_mask_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['DualReg'], - f'spatial_map_dataflow_for_DR_{pipe_num}' - ) - - spatial_map_dataflow_for_dr.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - spatial_map_timeseries_for_dr = get_spatial_map_timeseries( f'spatial_map_timeseries_for_DR_{pipe_num}' ) @@ -513,7 +493,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): wf.connect(node, out, spatial_map_timeseries_for_dr, 'inputspec.subject_rest') - wf.connect(spatial_map_dataflow_for_dr, 'outputspec.out_file', + wf.connect(*strat_pool.get_data('atlas-sca-DualReg'), resample_spatial_map_to_native_space_for_dr, 'in_file') # connect it to the spatial_map_timeseries @@ -540,8 +520,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): (dr_temp_reg, 'outputspec.temp_reg_map'), 'desc-DualReg_statmap': (dr_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': - (spatial_map_dataflow_for_dr, 'outputspec.out_name') + 'atlas_name': strat_pool.get_data('atlas_name') } return (wf, outputs) @@ -556,8 +535,9 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold", - "space-template_desc-bold_mask"], + "inputs": [("atlas-sca-DualReg", "atlas_name", + "space-template_desc-preproc_bold", + "space-template_desc-bold_mask")], "outputs": ["space-template_desc-MultReg_correlations", "desc-MultReg_statmap", "atlas_name"]} @@ -567,9 +547,9 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): # pool so that it will not get sent to SCA resample_functional_roi_for_multreg = pe.Node( util.Function(input_names=['in_func', - 'in_roi', - 'realignment', - 'identity_matrix'], + 'in_roi', + 'realignment', + 'identity_matrix'], output_names=['out_func', 'out_roi'], function=resample_func_roi, @@ -582,25 +562,14 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): cfg.registration_workflows['functional_registration'][ 'func_registration_to_template']['FNIRT_pipelines']['identity_matrix'] - roi_dataflow_for_multreg = create_roi_mask_dataflow( - cfg.seed_based_correlation_analysis['sca_atlases']['MultReg'], - f'roi_dataflow_for_mult_reg_{pipe_num}') - - roi_dataflow_for_multreg.inputs.inputspec.set( - creds_path=cfg.pipeline_setup['input_creds_path'], - dl_dir=cfg.pipeline_setup['working_directory']['path'] - ) - roi_timeseries_for_multreg = get_roi_timeseries( f'roi_timeseries_for_mult_reg_{pipe_num}') - node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to roi - wf.connect(node, out, resample_functional_roi_for_multreg, 'in_func') - wf.connect(roi_dataflow_for_multreg, - 'outputspec.out_file', - resample_functional_roi_for_multreg, - 'in_roi') + wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), + resample_functional_roi_for_multreg, 'in_func') + wf.connect(*strat_pool.get_data("atlas-sca-MultReg"), + resample_functional_roi_for_multreg, 'in_roi') # connect it to the roi_timeseries wf.connect(resample_functional_roi_for_multreg, @@ -638,7 +607,7 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): (sc_temp_reg, 'outputspec.temp_reg_map'), 'desc-MultReg_statmap': (sc_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': (roi_dataflow_for_multreg, 'outputspec.out_name') + 'atlas_name': strat_pool.get_data('atlas_name') } return (wf, outputs) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index b02f5820a3..7ee2b52b3a 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -764,8 +764,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Avg", "atlas-name"), - "space-template_desc-preproc_bold"], + "inputs": [("atlas-tse-Avg", "atlas_name", + "space-template_desc-preproc_bold")], "outputs": ["space-template_desc-Mean_timeseries", "space-template_space-template_desc-ndmg_correlations", "atlas_name", @@ -801,7 +801,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): roi_atlas = strat_pool.node_data("atlas-tse-Avg") wf.connect(roi_atlas.node, roi_atlas.out, resample_functional_roi, 'in_roi') - atlas_name = strat_pool.node_data("atlas-name") + atlas_name = strat_pool.node_data("atlas_name") # connect it to the roi_timeseries wf.connect(resample_functional_roi, 'out_roi', @@ -887,8 +887,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Voxel", "atlas-name"), - "space-template_desc-preproc_bold"], + "inputs": [("atlas-tse-Voxel", "atlas_name", + "space-template_desc-preproc_bold")], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' @@ -917,7 +917,7 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to mask wf.connect(node, out, resample_functional_to_mask, 'in_func') - wf.connect(strat_pool.get_data('atlas-tse-Voxel'), + wf.connect(*strat_pool.get_data('atlas-tse-Voxel'), resample_functional_to_mask, 'in_roi') # connect it to the voxel_timeseries @@ -929,7 +929,7 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'desc-Voxel_timeseries': (voxel_timeseries, 'outputspec.mask_outputs'), - 'atlas_name': strat_pool.get_data('atlas-name')} + 'atlas_name': strat_pool.get_data('atlas_name')} return (wf, outputs) @@ -948,9 +948,9 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-SpatialReg", "atlas-name"), - "space-template_desc-preproc_bold", - "space-template_desc-bold_mask"], + "inputs": [("atlas-tse-SpatialReg", "atlas_name", + "space-template_desc-preproc_bold", + "space-template_desc-bold_mask")], "outputs": ["desc-SpatReg_timeseries", "atlas_name"]} ''' @@ -977,7 +977,7 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): # resample the input functional file and functional mask # to spatial map wf.connect(node, out, resample_spatial_map_to_native_space, 'reference') - wf.connect(strat_pool.get_data("atlas-tse-SpatialReg"), + wf.connect(*strat_pool.get_data("atlas-tse-SpatialReg"), resample_spatial_map_to_native_space, 'in_file') wf.connect(node, out, spatial_map_timeseries, 'inputspec.subject_rest') diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 853b2384f6..67ed435575 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1127,8 +1127,8 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "switch": "None", "option_key": "None", "option_val": "None", - "inputs": ['space-template_bold'], - "outputs": ['atlas-name', {atlas_analyses}]}} + "inputs": ['space-template_desc-preproc_bold'], + "outputs": ['atlas_name', {atlas_analyses}]}} """ if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: @@ -1158,13 +1158,14 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): 'func_registration_to_template', 'FNIRT_pipelines', 'identity_matrix'] resample_roi.inputs.realignment = realignment - wf.connect(strat_pool.get_data('space-template_bold'), + wf.connect(*strat_pool.get_data('space-template_' + 'desc-preproc_bold'), resample_roi, 'in_func') wf.connect(gather, 'outputspec.out_file', resample_roi, 'in_roi') final_subnode, final_out = resample_roi, 'out_roi' else: final_subnode, final_out = gather, 'outputspec.out_file' - outputs = {'atlas-name': (gather, 'outputspec.out_name')} + outputs = {'atlas_name': (gather, 'outputspec.out_name')} for analysis, _atlases in tse_atlases.items(): if atlas in _atlases: outputs[f'atlas-tse-{analysis}'] = (final_subnode, final_out) From 3585816ecf65fe568a9d284073cf625b90225245 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 9 Feb 2023 17:21:57 -0500 Subject: [PATCH 08/23] :recycle: Connect `gather_atlases` in `pipeline_blocks` --- CHANGELOG.md | 1 + CPAC/pipeline/cpac_pipeline.py | 5 ++-- CPAC/utils/datasource.py | 54 ---------------------------------- 3 files changed, 4 insertions(+), 56 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e576e1b34e..14bfd65a5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Updated some output filenaming conventions for human-readability and to move closer to BIDS-derivatives compliance - Changed motion filter from single dictionary to list of dictionaries - Changed CI logic to allow non-release tags +- Refactored ROI resampling to be more efficient (resample to output resolution before analysis, and resample each atlas just once if used for multiple analyses) ### Upgraded dependencies - `nibabel` 2.3.3 → 3.0.1 diff --git a/CPAC/pipeline/cpac_pipeline.py b/CPAC/pipeline/cpac_pipeline.py index 2141610ac0..77a1228125 100644 --- a/CPAC/pipeline/cpac_pipeline.py +++ b/CPAC/pipeline/cpac_pipeline.py @@ -1135,10 +1135,10 @@ def connect_pipeline(wf, cfg, rpool, pipeline_blocks): def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None, num_ants_cores=1): - from CPAC.utils.datasource import gather_extraction_maps + from CPAC.utils.datasource import gather_atlases, gather_extraction_maps # Workflow setup - wf = initialize_nipype_wf(cfg, sub_dict) + wf = initialize_nipype_wf(cfg, sub_dict, name=pipeline_name) # Extract credentials path if it exists try: @@ -1397,6 +1397,7 @@ def build_workflow(subject_id, sub_dict, cfg, pipeline_name=None, pipeline_blocks += [surface_postproc] # Extractions and Derivatives + pipeline_blocks += [gather_atlases] tse_atlases, sca_atlases = gather_extraction_maps(cfg) cfg.timeseries_extraction['tse_atlases'] = tse_atlases cfg.seed_based_correlation_analysis['sca_atlases'] = sca_atlases diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 67ed435575..11ca51bc65 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1008,60 +1008,6 @@ def create_anat_datasource(wf_name='anat_datasource'): return wf -def create_roi_mask_dataflow(masks, wf_name='datasource_roi_mask'): - from CPAC.utils.datasource import get_atlas_name - mask_dict = {} - - for mask_file in masks: - config_path = mask_file - if mask_file.strip() == '' or mask_file.startswith('#'): - continue - base_name = get_atlas_name(mask_file) - mask_dict[base_name] = mask_file - - wf = pe.Workflow(name=wf_name) - - inputnode = pe.Node(util.IdentityInterface(fields=['mask', - 'mask_file', - 'creds_path', - 'dl_dir'], - mandatory_inputs=True), - name='inputspec') - - mask_keys, mask_values = \ - zip(*mask_dict.items()) - - inputnode.synchronize = True - inputnode.iterables = [ - ('mask', mask_keys), - ('mask_file', mask_values), - ] - - check_s3_node = pe.Node(function.Function(input_names=['file_path', - 'creds_path', - 'dl_dir', - 'img_type'], - output_names=['local_path'], - function=check_for_s3, - as_module=True), - name='check_for_s3') - - wf.connect(inputnode, 'mask_file', check_s3_node, 'file_path') - wf.connect(inputnode, 'creds_path', check_s3_node, 'creds_path') - wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') - check_s3_node.inputs.img_type = 'mask' - - outputnode = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name', - 'config_path']), - name='outputspec') - outputnode.config_path = config_path - wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') - wf.connect(inputnode, 'mask', outputnode, 'out_name') - - return wf - - def create_grp_analysis_dataflow(wf_name='gp_dataflow'): from CPAC.pipeline import nipype_pipeline_engine as pe import nipype.interfaces.utility as util From 465ea6c55f8549ef625c380e7d72f3bb999524cf Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 08:54:58 -0500 Subject: [PATCH 09/23] :pencil2: Remove extra `and` --- CPAC/anat_preproc/anat_preproc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CPAC/anat_preproc/anat_preproc.py b/CPAC/anat_preproc/anat_preproc.py index 086464356f..cb24e99e47 100644 --- a/CPAC/anat_preproc/anat_preproc.py +++ b/CPAC/anat_preproc/anat_preproc.py @@ -2479,7 +2479,7 @@ def brain_extraction_T2(wf, cfg, strat_pool, pipe_num, opt=None): ["space-T2w_desc-brain_mask", "space-T2w_desc-acpcbrain_mask"])], "outputs": ["desc-brain_T2w"]} ''' - if (cfg.anatomical_preproc['acpc_alignment']['run'] and + if (cfg.anatomical_preproc['acpc_alignment']['run'] and cfg.anatomical_preproc['acpc_alignment']['acpc_target'] == 'brain' ): outputs = { From cf1327a325b5f0bfdf8c6c013307ac00c28ae9b5 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 12:02:52 -0500 Subject: [PATCH 10/23] fixup! :recycle: Create `gather_atlases` NodeBlock --- CPAC/utils/datasource.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 11ca51bc65..49920e54a2 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1079,11 +1079,13 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: tse_atlases, sca_atlases = gather_extraction_maps(cfg) - atlases = [] + atlases = set() if cfg['timeseries_extraction', 'run']: - atlases += tse_atlases + for atlas in [atlas for analysis_type in tse_atlases for atlas in tse_atlases[analysis_type]]: + atlases.add(atlas) if cfg['seed_based_correlation_analysis', 'run']: - atlases += sca_atlases + for atlas in [atlas for analysis_type in sca_atlases for atlas in sca_atlases[analysis_type]]: + atlases.add(atlas) for atlas in atlases: atlas_name = get_atlas_name(atlas) gather = gather_atlas(atlas_name, atlas, pipe_num) @@ -1157,8 +1159,8 @@ def gather_atlas(atlas_name, atlas, pipe_num): as_module=True), name='check_for_s3') check_s3_node.inputs.img_type = 'mask' - wf.connect([inputnode, check_s3_node, [('mask_file', 'file_path'), - ('creds_path', 'creds_path')]]) + wf.connect([(inputnode, check_s3_node, [('mask_file', 'file_path'), + ('creds_path', 'creds_path')])]) wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') outputnode = pe.Node(util.IdentityInterface(fields=['out_file', From 30073158b8ecfc2fcbbfdf6bf36950a899c90077 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 13:12:41 -0500 Subject: [PATCH 11/23] :bug: Dedupe `align_template_mask_to_template_data` --- CPAC/nuisance/nuisance.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CPAC/nuisance/nuisance.py b/CPAC/nuisance/nuisance.py index d751c5ec25..33b0981ad8 100644 --- a/CPAC/nuisance/nuisance.py +++ b/CPAC/nuisance/nuisance.py @@ -2475,7 +2475,8 @@ def nuisance_regression(wf, cfg, strat_pool, pipe_num, opt, space, res=None): # sometimes mm dimensions match but the voxel dimensions don't # so here we align the mask to the resampled data before applying match_grid = pe.Node(afni.Resample(), - name='align_template_mask_to_template_data') + name='align_template_mask_to_template_' + f'data_{name_suff}') match_grid.inputs.outputtype = 'NIFTI_GZ' match_grid.inputs.resample_mode = 'Cu' node, out = strat_pool.get_data('FSL-AFNI-brain-mask') From a8dba26205779575518ae0b58b9d27cc31c8b2e6 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 13:42:13 -0500 Subject: [PATCH 12/23] :art: Match quotes --- CPAC/utils/datasource.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 49920e54a2..55a1125c93 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1073,8 +1073,8 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "switch": "None", "option_key": "None", "option_val": "None", - "inputs": ['space-template_desc-preproc_bold'], - "outputs": ['atlas_name', {atlas_analyses}]}} + "inputs": ["space-template_desc-preproc_bold"], + "outputs": ["atlas_name", {atlas_analyses}]}} """ if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: From 7351508ee955d5723819c7c66e9e8538199f8824 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 13:42:35 -0500 Subject: [PATCH 13/23] :alembic: Print debugging statement --- CPAC/utils/datasource.py | 1 + 1 file changed, 1 insertion(+) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 55a1125c93..8df4e41b82 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1120,6 +1120,7 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): for analysis, _atlases in sca_atlases.items(): if atlas in _atlases: outputs[f'atlas-sca-{analysis}'] = (final_subnode, final_out) + print(outputs) return wf, outputs From bef2f7f8ed2ee61e52252aa81eb0b0b5d2e37035 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Fri, 10 Feb 2023 14:32:31 -0500 Subject: [PATCH 14/23] :recycle: Skip ROI analysis nodes with no atlas assigned --- CPAC/sca/sca.py | 11 +++++++---- CPAC/timeseries/timeseries_analysis.py | 7 ++++++- CPAC/utils/datasource.py | 1 - 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index 3e4abc18d4..d4f796f8e1 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -394,7 +394,8 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "space-template_desc-MeanSCA_correlations", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-Avg') is False: + return wf, {} # same workflow, except to run TSE and send it to the resource # pool so that it will not get sent to SCA resample_functional_roi_for_sca = pe.Node( @@ -466,7 +467,8 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): "desc-DualReg_statmap", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-DualReg') is False: + return wf, {} resample_spatial_map_to_native_space_for_dr = pe.Node( interface=fsl.FLIRT(), name=f'resample_spatial_map_to_native_space_for_DR_{pipe_num}' @@ -535,14 +537,15 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-DualReg", "atlas_name", + "inputs": [("atlas-sca-MultReg", "atlas_name", "space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["space-template_desc-MultReg_correlations", "desc-MultReg_statmap", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-sca-MultReg') is False: + return wf, {} # same workflow, except to run TSE and send it to the resource # pool so that it will not get sent to SCA resample_functional_roi_for_multreg = pe.Node( diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 7ee2b52b3a..a20c35f762 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -774,6 +774,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "space-template_desc-PearsonNilearn_correlations", "space-template_desc-PartialNilearn_correlations"]} ''' + if strat_pool.check_rpool('atlas-tse-Avg') is False: + return wf, {} resample_functional_roi = pe.Node(Function(input_names=['in_func', 'in_roi', 'realignment', @@ -892,6 +894,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' + if strat_pool.check_rpool('atlas-tse-Voxel') is False: + return wf, {} resample_functional_to_mask = pe.Node(Function(input_names=['in_func', 'in_roi', 'realignment', @@ -954,7 +958,8 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "outputs": ["desc-SpatReg_timeseries", "atlas_name"]} ''' - + if strat_pool.check_rpool('atlas-tse-SpatialReg') is False: + return wf, {} resample_spatial_map_to_native_space = pe.Node( interface=fsl.FLIRT(), name=f'resample_spatial_map_to_native_space_{pipe_num}', diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 8df4e41b82..55a1125c93 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1120,7 +1120,6 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): for analysis, _atlases in sca_atlases.items(): if atlas in _atlases: outputs[f'atlas-sca-{analysis}'] = (final_subnode, final_out) - print(outputs) return wf, outputs From 53edfe52e44cbc529d0cfae7f5018185792ca027 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 13 Feb 2023 11:39:50 -0500 Subject: [PATCH 15/23] :recycle: Fork ROI analyses on atlas --- CPAC/sca/sca.py | 32 ++++---- CPAC/timeseries/timeseries_analysis.py | 39 +++++---- CPAC/utils/datasource.py | 106 ++++++++++++++++--------- CPAC/utils/utils.py | 32 +++++++- 4 files changed, 132 insertions(+), 77 deletions(-) diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index d4f796f8e1..179087be99 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -8,7 +8,7 @@ from CPAC.sca.utils import * # from CPAC.utils.utils import extract_one_d -from CPAC.utils.datasource import resample_func_roi +from CPAC.utils.datasource import resample_func_roi, roi_input_node from CPAC.timeseries.timeseries_analysis import get_roi_timeseries, \ get_spatial_map_timeseries @@ -388,7 +388,7 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-Avg", "atlas_name", + "inputs": [("atlas-sca-Avg", "atlas-sca-Avg_name", "space-template_desc-preproc_bold")], "outputs": ["desc-MeanSCA_timeseries", "space-template_desc-MeanSCA_correlations", @@ -420,7 +420,8 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): # resample the input functional file to roi wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), resample_functional_roi_for_sca, 'in_func') - wf.connect(*strat_pool.get_data("atlas-sca-Avg"), + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-Avg', pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_functional_roi_for_sca, 'in_roi') # connect it to the roi_timeseries @@ -445,7 +446,7 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): # extract_one_d)), 'space-template_desc-MeanSCA_correlations': (sca_roi, 'outputspec.correlation_stack'), - 'atlas_name': strat_pool.get_data('atlas_name') + 'atlas_name': (fork_atlases, 'atlas_name') } return (wf, outputs) @@ -460,7 +461,7 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-DualReg", "atlas_name", + "inputs": [("atlas-sca-DualReg", "atlas-sca-DualReg_name", "space-template_desc-preproc_bold" "space-template_desc-bold_mask")], "outputs": ["space-template_desc-DualReg_correlations", @@ -494,8 +495,9 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): resample_spatial_map_to_native_space_for_dr, 'reference') wf.connect(node, out, spatial_map_timeseries_for_dr, 'inputspec.subject_rest') - - wf.connect(*strat_pool.get_data('atlas-sca-DualReg'), + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-DualReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_spatial_map_to_native_space_for_dr, 'in_file') # connect it to the spatial_map_timeseries @@ -522,10 +524,9 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): (dr_temp_reg, 'outputspec.temp_reg_map'), 'desc-DualReg_statmap': (dr_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': strat_pool.get_data('atlas_name') - } + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): @@ -537,7 +538,7 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-MultReg", "atlas_name", + "inputs": [("atlas-sca-MultReg", "atlas-sca-MultReg_name", "space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["space-template_desc-MultReg_correlations", @@ -571,7 +572,9 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): # resample the input functional file to roi wf.connect(*strat_pool.get_data("space-template_desc-preproc_bold"), resample_functional_roi_for_multreg, 'in_func') - wf.connect(*strat_pool.get_data("atlas-sca-MultReg"), + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-sca-MultReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_functional_roi_for_multreg, 'in_roi') # connect it to the roi_timeseries @@ -610,7 +613,6 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): (sc_temp_reg, 'outputspec.temp_reg_map'), 'desc-MultReg_statmap': (sc_temp_reg, 'outputspec.temp_reg_map_z'), - 'atlas_name': strat_pool.get_data('atlas_name') - } + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index a20c35f762..b884b72166 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -6,7 +6,7 @@ create_connectome_nilearn, \ get_connectome_method from CPAC.pipeline import nipype_pipeline_engine as pe -from CPAC.utils.datasource import resample_func_roi +from CPAC.utils.datasource import resample_func_roi, roi_input_node def get_voxel_timeseries(wf_name='voxel_timeseries'): @@ -764,7 +764,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Avg", "atlas_name", + "inputs": [("atlas-tse-Avg", "atlas-tse-Avg_name", "space-template_desc-preproc_bold")], "outputs": ["space-template_desc-Mean_timeseries", "space-template_space-template_desc-ndmg_correlations", @@ -799,11 +799,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-preproc_bold") wf.connect(node, out, resample_functional_roi, 'in_func') - - roi_atlas = strat_pool.node_data("atlas-tse-Avg") - wf.connect(roi_atlas.node, roi_atlas.out, - resample_functional_roi, 'in_roi') - atlas_name = strat_pool.node_data("atlas_name") + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-Avg', pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_functional_roi, 'in_roi') # connect it to the roi_timeseries wf.connect(resample_functional_roi, 'out_roi', @@ -854,7 +851,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): outputs = { 'space-template_desc-Mean_timeseries': ( roi_timeseries, 'outputspec.roi_csv'), - 'atlas_name': (atlas_name.node, atlas_name.out), + 'atlas_name': (fork_atlases, 'atlas_name'), **matrix_outputs } # - NDMG @@ -875,7 +872,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): mem_x=(1928411764134803 / 302231454903657293676544, 'ts')) wf.connect(roi_timeseries, 'outputspec.roi_ts', ndmg_graph, 'ts') - wf.connect(roi_atlas.node, roi_atlas.out, ndmg_graph, 'labels') + wf.connect(fork_atlases, 'atlas_file', ndmg_graph, 'labels') outputs['space-template_space-template_desc-ndmg_correlations' ] = (ndmg_graph, 'out_file') @@ -889,7 +886,7 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Voxel", "atlas_name", + "inputs": [("atlas-tse-Voxel", "atlas-tse-Voxel_name", "space-template_desc-preproc_bold")], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} @@ -921,7 +918,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): node, out = strat_pool.get_data("space-template_desc-preproc_bold") # resample the input functional file to mask wf.connect(node, out, resample_functional_to_mask, 'in_func') - wf.connect(*strat_pool.get_data('atlas-tse-Voxel'), + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-Voxel', pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_functional_to_mask, 'in_roi') # connect it to the voxel_timeseries @@ -931,9 +929,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): voxel_timeseries, 'inputspec.rest') outputs = { - 'desc-Voxel_timeseries': - (voxel_timeseries, 'outputspec.mask_outputs'), - 'atlas_name': strat_pool.get_data('atlas_name')} + 'desc-Voxel_timeseries': (voxel_timeseries, 'outputspec.mask_outputs'), + 'atlas_name': (fork_atlases, 'atlas_name')} return (wf, outputs) @@ -952,7 +949,7 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-SpatialReg", "atlas_name", + "inputs": [("atlas-tse-SpatialReg", "atlas-tse-SpatialReg_name", "space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["desc-SpatReg_timeseries", @@ -982,7 +979,9 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): # resample the input functional file and functional mask # to spatial map wf.connect(node, out, resample_spatial_map_to_native_space, 'reference') - wf.connect(*strat_pool.get_data("atlas-tse-SpatialReg"), + fork_atlases = roi_input_node(wf, strat_pool, 'atlas-tse-SpatialReg', + pipe_num) + wf.connect(fork_atlases, 'atlas_file', resample_spatial_map_to_native_space, 'in_file') wf.connect(node, out, spatial_map_timeseries, 'inputspec.subject_rest') @@ -996,8 +995,8 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): # 'atlas_name' will be an iterable and will carry through outputs = { - 'desc-SpatReg_timeseries': - (spatial_map_timeseries, 'outputspec.subject_timeseries'), - 'atlas_name': strat_pool.get_data('atlas_name')} + 'desc-SpatReg_timeseries': (spatial_map_timeseries, + 'outputspec.subject_timeseries'), + 'atlas_name': (fork_atlases, 'atlas_name')} - return (wf, outputs) + return wf, outputs diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 55a1125c93..6a0d71b8c1 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -18,6 +18,7 @@ import json import os import re +from itertools import chain from typing import Tuple from nipype import logging from nipype.interfaces import utility as util @@ -28,7 +29,7 @@ from CPAC.utils import function from CPAC.utils.docs import docstring_parameter, list_items_unbracketed from CPAC.utils.interfaces.function import Function -from CPAC.utils.utils import get_scan_params +from CPAC.utils.utils import get_scan_params, insert_in_dict_of_lists logger = logging.getLogger('nipype.workflow') @@ -1057,10 +1058,11 @@ def create_grp_analysis_dataflow(wf_name='gp_dataflow'): return wf -@docstring_parameter(atlas_analyses=list_items_unbracketed( - [f'atlas-tse-{option}' for option - in valid_options['timeseries']['roi_paths']] - + [f'atlas-sca-{option}' for option in valid_options['sca']['roi_paths']])) +@docstring_parameter(atlas_analyses=list_items_unbracketed(list( + chain.from_iterable([(f'atlas-tse-{option}_name', f'atlas-tse-{option}') + for option in valid_options['timeseries']['roi_paths']] + + [(f'atlas-sca-{option}-name', f'atlas-sca-{option}') + for option in valid_options['sca']['roi_paths']])))) def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): """ Collects all the ROI atlases in a config, resamples them and adds @@ -1074,18 +1076,22 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "option_key": "None", "option_val": "None", "inputs": ["space-template_desc-preproc_bold"], - "outputs": ["atlas_name", {atlas_analyses}]}} + "outputs": [{atlas_analyses}]}} """ if cfg['timeseries_extraction', 'run'] or cfg['seed_based_correlation_analysis', 'run']: tse_atlases, sca_atlases = gather_extraction_maps(cfg) atlases = set() - if cfg['timeseries_extraction', 'run']: - for atlas in [atlas for analysis_type in tse_atlases for atlas in tse_atlases[analysis_type]]: - atlases.add(atlas) - if cfg['seed_based_correlation_analysis', 'run']: - for atlas in [atlas for analysis_type in sca_atlases for atlas in sca_atlases[analysis_type]]: - atlases.add(atlas) + specified_atlases = {'tse': tse_atlases, 'sca': sca_atlases} + for config_key, _atlases in { + 'timeseries_extraction': tse_atlases, + 'seed_based_correlation_analysis': sca_atlases}.items(): + if cfg[config_key, 'run']: + # pylint: disable=consider-using-dict-items + for atlas in [atlas for analysis_type in _atlases for atlas + in _atlases[analysis_type]]: + atlases.add(atlas) + outputs = {} for atlas in atlases: atlas_name = get_atlas_name(atlas) gather = gather_atlas(atlas_name, atlas, pipe_num) @@ -1113,17 +1119,19 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): final_subnode, final_out = resample_roi, 'out_roi' else: final_subnode, final_out = gather, 'outputspec.out_file' - outputs = {'atlas_name': (gather, 'outputspec.out_name')} - for analysis, _atlases in tse_atlases.items(): - if atlas in _atlases: - outputs[f'atlas-tse-{analysis}'] = (final_subnode, final_out) - for analysis, _atlases in sca_atlases.items(): - if atlas in _atlases: - outputs[f'atlas-sca-{analysis}'] = (final_subnode, final_out) + for spec, _specified in specified_atlases.items(): + for analysis, _atlases in _specified.items(): + if atlas in _atlases: + outputs = insert_in_dict_of_lists( + outputs, f'atlas-{spec}-{analysis}_name', + (gather, 'outputspec.out_name')) + outputs = insert_in_dict_of_lists( + outputs, f'atlas-{spec}-{analysis}', + (final_subnode, final_out)) return wf, outputs -def gather_atlas(atlas_name, atlas, pipe_num): +def gather_atlas(atlas_name, atlas, pipe_num, s3_options=None): """ Injects a single ROI atlas to the resource pool at the required resolution. @@ -1134,22 +1142,14 @@ def gather_atlas(atlas_name, atlas, pipe_num): pipe_num : int + s3_options : dict or None + Returns ------- pe.Workflow """ wf = pe.Workflow(name=f'gather_{atlas_name}_{pipe_num}') - inputnode = pe.Node(util.IdentityInterface(fields=['mask', - 'mask_file', - 'creds_path', - 'dl_dir'], - mandatory_inputs=True), - name='inputspec') - - inputnode.inputs.mask = atlas_name - inputnode.inputs.mask_file = atlas - check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', 'dl_dir', @@ -1157,19 +1157,18 @@ def gather_atlas(atlas_name, atlas, pipe_num): output_names=['local_path'], function=check_for_s3, as_module=True), - name='check_for_s3') + name='inputnode') + check_s3_node.inputs.file_path = atlas check_s3_node.inputs.img_type = 'mask' - wf.connect([(inputnode, check_s3_node, [('mask_file', 'file_path'), - ('creds_path', 'creds_path')])]) - wf.connect(inputnode, 'dl_dir', check_s3_node, 'dl_dir') + if s3_options: + for option in s3_options: + setattr(check_s3_node.inputs, option, s3_options[option]) outputnode = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name', - 'config_path']), + 'out_name']), name='outputspec') - - wf.connect(check_s3_node, 'local_path', outputnode, 'out_file') - wf.connect(inputnode, 'mask', outputnode, 'out_name') + wf.connect([(check_s3_node, outputnode, [('local_path', 'out_file'), + ('mask', 'out_name')])]) return wf @@ -1249,3 +1248,32 @@ def resample_func_roi(in_func, in_roi, realignment, identity_matrix): out_roi = in_roi return out_func, out_roi + + +def roi_input_node(wf, strat_pool, atlas_key, pipe_num): + """Create a utility node for forking atlases + + Parameters + ---------- + wf : pe.Workflow + + strat_pool : ResourcePool + + atlas_key : str + + pipe_num : int + + Returns + ------- + pe.Node + """ + roi_atlas = strat_pool.node_data(atlas_key) + atlas_name = strat_pool.node_data(f'{atlas_key}_name') + node = pe.Node(util.IdentityInterface(iterfields=['atlas_name', + 'atlas_file'], + mandatory_inputs=True), + name=f'roi_atlas_{pipe_num}') + wf.connect([ + (roi_atlas.node, node, [(roi_atlas.out, 'atlas_file')]), + (atlas_name.node, node, [(atlas_name.out, 'atlas_name')])]) + return node diff --git a/CPAC/utils/utils.py b/CPAC/utils/utils.py index f209b77296..56d8770836 100644 --- a/CPAC/utils/utils.py +++ b/CPAC/utils/utils.py @@ -22,12 +22,12 @@ import json import numbers import pickle +from copy import deepcopy +from itertools import repeat +from typing import Any import numpy as np import yaml - from click import BadParameter -from copy import deepcopy -from itertools import repeat from voluptuous.error import Invalid from CPAC.pipeline import ALL_PIPELINE_CONFIGS, AVAILABLE_PIPELINE_CONFIGS @@ -1836,6 +1836,32 @@ def concat_list(in_list1=None, in_list2=None): return out_list +def insert_in_dict_of_lists(dict_of_lists: dict, key: str, value: Any) -> dict: + ''' + Function to insert a value into a list at a key in a dict, + creating the list if not yet present + + Parameters + ---------- + dict_of_lists : dict of lists + + key : str + + value : any + + Returns + ------- + dict_of_lists : dict + ''' + if key not in dict_of_lists: + dict_of_lists[key] = [value] + else: + if not isinstance(dict_of_lists[key], list): + dict_of_lists[key] = [dict_of_lists[key]] + dict_of_lists[key].append(value) + return dict_of_lists + + def list_item_replace(l, # noqa: E741 # pylint: disable=invalid-name old, new): '''Function to replace an item in a list From be1d31d5da3d654a137dccd3dc6bd1356bae2b81 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 13 Feb 2023 11:53:48 -0500 Subject: [PATCH 16/23] :pencil2: Fix typo in NodeBlock docstring [skipci] --- CPAC/utils/datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 6a0d71b8c1..76d7e7fb57 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1061,7 +1061,7 @@ def create_grp_analysis_dataflow(wf_name='gp_dataflow'): @docstring_parameter(atlas_analyses=list_items_unbracketed(list( chain.from_iterable([(f'atlas-tse-{option}_name', f'atlas-tse-{option}') for option in valid_options['timeseries']['roi_paths']] - + [(f'atlas-sca-{option}-name', f'atlas-sca-{option}') + + [(f'atlas-sca-{option}_name', f'atlas-sca-{option}') for option in valid_options['sca']['roi_paths']])))) def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): """ From 5e0bd8d2720e42a08c7c855394fecbfd736c2bcb Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 13 Feb 2023 14:50:59 -0500 Subject: [PATCH 17/23] :necktie: Update engine to allow lists of 2-tuples --- CPAC/pipeline/engine.py | 74 ++++++++++++++++++++++++++++++----------- 1 file changed, 55 insertions(+), 19 deletions(-) diff --git a/CPAC/pipeline/engine.py b/CPAC/pipeline/engine.py index 2e1f77b543..f6c645ceda 100644 --- a/CPAC/pipeline/engine.py +++ b/CPAC/pipeline/engine.py @@ -1503,26 +1503,18 @@ def connect_block(self, wf, cfg, rpool): if raw_label not in new_json_info['CpacVariant']: new_json_info['CpacVariant'][raw_label] = [] new_json_info['CpacVariant'][raw_label].append(node_name) - - rpool.set_data(label, - connection[0], - connection[1], - new_json_info, - pipe_idx, node_name, fork) - - wf, post_labels = rpool.post_process( - wf, label, connection, new_json_info, pipe_idx, - pipe_x, outs) - - if rpool.func_reg: - for postlabel in post_labels: - connection = (postlabel[1], postlabel[2]) - wf = rpool.derivative_xfm(wf, postlabel[0], - connection, - new_json_info, - pipe_idx, - pipe_x) + if isinstance(connection, list): + for _connection in connection: + wf = load_into_rpool( + wf, rpool, label, _connection, + new_json_info, pipe_idx, node_name, + fork, pipe_x, outs) + else: + wf = load_into_rpool( + wf, rpool, label, connection, + new_json_info, pipe_idx, node_name, fork, + pipe_x, outs) return wf @@ -1560,6 +1552,50 @@ def flatten_list(node_block_function: Union[FunctionType, list, Tuple], return flat_list +def load_into_rpool(wf, rpool, label, connection, new_json_info, pipe_idx, + node_name, fork, pipe_x, outs): + """ + Loads a single resource into a ResourcePool + + Parameters + ---------- + wf : pe.Workflow + + rpool : ResourcePool + + label : str + + connection : 2-tuple + + new_json_info : dict + + pipe_idx : dict + + node_name : str + + fork : bool + + pipe_x : int + + outs : dict + + Returns + ------- + wf : pe.Workflow + """ + rpool.set_data(label, connection[0], connection[1], new_json_info, + pipe_idx, node_name, fork) + wf, post_labels = rpool.post_process( + wf, label, connection, new_json_info, pipe_idx, + pipe_x, outs) + if rpool.func_reg: + for postlabel in post_labels: + connection = (postlabel[1], postlabel[2]) + wf = rpool.derivative_xfm(wf, postlabel[0], connection, + new_json_info, pipe_idx, pipe_x) + return wf + + def wrap_block(node_blocks, interface, wf, cfg, strat_pool, pipe_num, opt): """Wrap a list of node block functions to make them easier to use within other node blocks. From 675f723e34a3e85af9e88047541fa9471b5217e6 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 13 Feb 2023 15:05:15 -0500 Subject: [PATCH 18/23] :white_check_mark: Add doctest for `insert_in_dict_of_lists` --- CPAC/utils/utils.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/CPAC/utils/utils.py b/CPAC/utils/utils.py index 56d8770836..a2a23ed9eb 100644 --- a/CPAC/utils/utils.py +++ b/CPAC/utils/utils.py @@ -1852,6 +1852,16 @@ def insert_in_dict_of_lists(dict_of_lists: dict, key: str, value: Any) -> dict: Returns ------- dict_of_lists : dict + + Examples + -------- + >>> d = {'a': 100} + >>> insert_in_dict_of_lists(d, 'a', 30) + {'a': [100, 30]} + >>> insert_in_dict_of_lists(d, 'a', 30) + {'a': [100, 30, 30]} + >>> insert_in_dict_of_lists(d, 'b', 30) + {'a': [100, 30, 30], 'b': [30]} ''' if key not in dict_of_lists: dict_of_lists[key] = [value] From 7f4cde9db77c59a6c64787277e4f382330b29821 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 13 Feb 2023 15:24:38 -0500 Subject: [PATCH 19/23] :recycle: Resample, don't realign --- CPAC/timeseries/timeseries_analysis.py | 4 +-- CPAC/utils/datasource.py | 44 +++++++++++--------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index b884b72166..15319763a7 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -837,8 +837,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): timeseries_correlation.inputs.inputspec.method = cm_measure wf.connect([ - (atlas_name.node, timeseries_correlation, [ - (atlas_name.out, 'inputspec.atlas_name')]), + (fork_atlases, timeseries_correlation, [ + ('atlas_name', 'inputspec.atlas_name')]), (resample_functional_roi, timeseries_correlation, [ ('out_roi', 'inputspec.in_rois'), ('out_func', 'inputspec.in_file')])]) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 76d7e7fb57..58ba161841 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1075,7 +1075,7 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "switch": "None", "option_key": "None", "option_val": "None", - "inputs": ["space-template_desc-preproc_bold"], + "inputs": [], "outputs": [{atlas_analyses}]}} """ if cfg['timeseries_extraction', @@ -1095,30 +1095,22 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): for atlas in atlases: atlas_name = get_atlas_name(atlas) gather = gather_atlas(atlas_name, atlas, pipe_num) - realignment = cfg['timeseries_extraction', 'realignment'] - if 'func_to_ROI' in cfg['timeseries_extraction', 'realignment']: - # realign to output res - resample_roi = pe.Node(Function(input_names=['in_func', - 'in_roi', - 'realignment', - 'identity_matrix'], - output_names=['out_func', - 'out_roi'], - function=resample_func_roi, - as_module=True), - name=f'resample_{atlas}_{pipe_num}') - resample_roi.inputs.identity_matrix = cfg[ - 'registration_workflows', 'functional_registration', - 'func_registration_to_template', 'FNIRT_pipelines', - 'identity_matrix'] - resample_roi.inputs.realignment = realignment - wf.connect(*strat_pool.get_data('space-template_' - 'desc-preproc_bold'), - resample_roi, 'in_func') - wf.connect(gather, 'outputspec.out_file', resample_roi, 'in_roi') - final_subnode, final_out = resample_roi, 'out_roi' - else: - final_subnode, final_out = gather, 'outputspec.out_file' + resampled = pe.Node(Function(input_names=['resolution', + 'template', + 'template_name', + 'tag'], + output_names=['local_path'], + function=resolve_resolution, + as_module=True), + name='resampled_' + atlas_name) + resampled.inputs.resolution = cfg[ + "registration_workflows", "functional_registration", + "func_registration_to_template", "output_resolution", + "func_preproc_outputs"] + resampled.inputs.tag = 'func_preproc_outputs' + wf.connect([(gather, resampled, [ + ('outputspec.out_file', 'template'), + ('outputspec.out_name', 'template_name')])]) for spec, _specified in specified_atlases.items(): for analysis, _atlases in _specified.items(): if atlas in _atlases: @@ -1127,7 +1119,7 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): (gather, 'outputspec.out_name')) outputs = insert_in_dict_of_lists( outputs, f'atlas-{spec}-{analysis}', - (final_subnode, final_out)) + (resampled, 'local_path')) return wf, outputs From b447e90c8549978b4f4d752698e9582abc4db1df Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Wed, 15 Feb 2023 16:27:42 -0500 Subject: [PATCH 20/23] :recycle: Insert lists of 2-tuples in resource pool from NodeBlocks --- CPAC/pipeline/engine.py | 2 +- CPAC/sca/sca.py | 12 ++--- CPAC/timeseries/timeseries_analysis.py | 12 ++--- CPAC/utils/datasource.py | 75 +++++++------------------- 4 files changed, 32 insertions(+), 69 deletions(-) diff --git a/CPAC/pipeline/engine.py b/CPAC/pipeline/engine.py index f6c645ceda..fa5acba331 100644 --- a/CPAC/pipeline/engine.py +++ b/CPAC/pipeline/engine.py @@ -1644,7 +1644,7 @@ def wrap_block(node_blocks, interface, wf, cfg, strat_pool, pipe_num, opt): for in_resource, val in interface.items(): if isinstance(val, tuple): strat_pool.set_data(in_resource, val[0], val[1], {}, "", "", - fork=True)# + fork=True) if 'sub_num' not in strat_pool.get_pool_info(): strat_pool.set_pool_info({'sub_num': 0}) sub_num = strat_pool.get_pool_info()['sub_num'] diff --git a/CPAC/sca/sca.py b/CPAC/sca/sca.py index 179087be99..6cc4a8a622 100644 --- a/CPAC/sca/sca.py +++ b/CPAC/sca/sca.py @@ -388,8 +388,8 @@ def SCA_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-Avg", "atlas-sca-Avg_name", - "space-template_desc-preproc_bold")], + "inputs": [("atlas-sca-Avg", "atlas-sca-Avg_name"), + "space-template_desc-preproc_bold"], "outputs": ["desc-MeanSCA_timeseries", "space-template_desc-MeanSCA_correlations", "atlas_name"]} @@ -461,8 +461,8 @@ def dual_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-DualReg", "atlas-sca-DualReg_name", - "space-template_desc-preproc_bold" + "inputs": [("atlas-sca-DualReg", "atlas-sca-DualReg_name"), + ("space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["space-template_desc-DualReg_correlations", "desc-DualReg_statmap", @@ -538,8 +538,8 @@ def multiple_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-sca-MultReg", "atlas-sca-MultReg_name", - "space-template_desc-preproc_bold", + "inputs": [("atlas-sca-MultReg", "atlas-sca-MultReg_name"), + ("space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["space-template_desc-MultReg_correlations", "desc-MultReg_statmap", diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 15319763a7..da5a27c9c9 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -764,8 +764,8 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Avg", "atlas-tse-Avg_name", - "space-template_desc-preproc_bold")], + "inputs": [("atlas-tse-Avg", "atlas-tse-Avg_name"), + "space-template_desc-preproc_bold"], "outputs": ["space-template_desc-Mean_timeseries", "space-template_space-template_desc-ndmg_correlations", "atlas_name", @@ -886,8 +886,8 @@ def timeseries_extraction_Voxel(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-Voxel", "atlas-tse-Voxel_name", - "space-template_desc-preproc_bold")], + "inputs": [("atlas-tse-Voxel", "atlas-tse-Voxel_name"), + "space-template_desc-preproc_bold"], "outputs": ["desc-Voxel_timeseries", "atlas_name"]} ''' @@ -949,8 +949,8 @@ def spatial_regression(wf, cfg, strat_pool, pipe_num, opt=None): "switch": ["run"], "option_key": "None", "option_val": "None", - "inputs": [("atlas-tse-SpatialReg", "atlas-tse-SpatialReg_name", - "space-template_desc-preproc_bold", + "inputs": [("atlas-tse-SpatialReg", "atlas-tse-SpatialReg_name"), + ("space-template_desc-preproc_bold", "space-template_desc-bold_mask")], "outputs": ["desc-SpatReg_timeseries", "atlas_name"]} diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 58ba161841..1bb91074a1 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -1,4 +1,4 @@ -# Copyright (C) 2012-2022 C-PAC Developers +# Copyright (C) 2012-2023 C-PAC Developers # This file is part of C-PAC. @@ -691,7 +691,7 @@ def create_check_for_s3_node(name, file_path, img_type='other', function=check_for_s3, as_module=True), iterfield=['file_path'], - name='check_for_s3_%s' % name) + name=f'check_for_s3_{name}') else: check_s3_node = pe.Node(function.Function(input_names=['file_path', 'creds_path', @@ -700,7 +700,7 @@ def create_check_for_s3_node(name, file_path, img_type='other', output_names=['local_path'], function=check_for_s3, as_module=True), - name='check_for_s3_%s' % name) + name=f'check_for_s3_{name}') check_s3_node.inputs.set( file_path=file_path, @@ -1075,7 +1075,7 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): "switch": "None", "option_key": "None", "option_val": "None", - "inputs": [], + "inputs": ["bold"], "outputs": [{atlas_analyses}]}} """ if cfg['timeseries_extraction', @@ -1094,7 +1094,13 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): outputs = {} for atlas in atlases: atlas_name = get_atlas_name(atlas) - gather = gather_atlas(atlas_name, atlas, pipe_num) + atlas_name_node = pe.Node(util.IdentityInterface( + fields=['atlas_name'], mandatory_inputs=True), + name=f'{atlas_name}_name') + atlas_name_node.inputs.atlas_name = atlas_name + gather = create_check_for_s3_node( + atlas_name, atlas, img_type='mask', + creds_path=cfg['pipeline_setup', 'input_creds_path']) resampled = pe.Node(Function(input_names=['resolution', 'template', 'template_name', @@ -1102,69 +1108,26 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): output_names=['local_path'], function=resolve_resolution, as_module=True), - name='resampled_' + atlas_name) + name=f'resampled_{atlas_name}') resampled.inputs.resolution = cfg[ "registration_workflows", "functional_registration", "func_registration_to_template", "output_resolution", "func_preproc_outputs"] resampled.inputs.tag = 'func_preproc_outputs' - wf.connect([(gather, resampled, [ - ('outputspec.out_file', 'template'), - ('outputspec.out_name', 'template_name')])]) + resampled.inputs.template_name = atlas_name + wf.connect(gather, 'local_path', resampled, 'template') for spec, _specified in specified_atlases.items(): for analysis, _atlases in _specified.items(): if atlas in _atlases: outputs = insert_in_dict_of_lists( outputs, f'atlas-{spec}-{analysis}_name', - (gather, 'outputspec.out_name')) + (atlas_name_node, 'atlas_name')) outputs = insert_in_dict_of_lists( outputs, f'atlas-{spec}-{analysis}', (resampled, 'local_path')) return wf, outputs -def gather_atlas(atlas_name, atlas, pipe_num, s3_options=None): - """ - Injects a single ROI atlas to the resource pool at the required - resolution. - - Parameters - ---------- - atlas_name, atlas_path : str - - pipe_num : int - - s3_options : dict or None - - Returns - ------- - pe.Workflow - """ - wf = pe.Workflow(name=f'gather_{atlas_name}_{pipe_num}') - - check_s3_node = pe.Node(function.Function(input_names=['file_path', - 'creds_path', - 'dl_dir', - 'img_type'], - output_names=['local_path'], - function=check_for_s3, - as_module=True), - name='inputnode') - check_s3_node.inputs.file_path = atlas - check_s3_node.inputs.img_type = 'mask' - if s3_options: - for option in s3_options: - setattr(check_s3_node.inputs, option, s3_options[option]) - - outputnode = pe.Node(util.IdentityInterface(fields=['out_file', - 'out_name']), - name='outputspec') - wf.connect([(check_s3_node, outputnode, [('local_path', 'out_file'), - ('mask', 'out_name')])]) - - return wf - - def get_atlas_name(atlas): """Get a resource name for an atlas @@ -1261,10 +1224,10 @@ def roi_input_node(wf, strat_pool, atlas_key, pipe_num): """ roi_atlas = strat_pool.node_data(atlas_key) atlas_name = strat_pool.node_data(f'{atlas_key}_name') - node = pe.Node(util.IdentityInterface(iterfields=['atlas_name', - 'atlas_file'], - mandatory_inputs=True), - name=f'roi_atlas_{pipe_num}') + node = pe.MapNode(util.IdentityInterface(fields=['atlas_name', + 'atlas_file']), + name=f'roi_{atlas_key}_{pipe_num}', + iterfield=['atlas_name', 'atlas_file']) wf.connect([ (roi_atlas.node, node, [(roi_atlas.out, 'atlas_file')]), (atlas_name.node, node, [(atlas_name.out, 'atlas_name')])]) From f19db2250622e144ed59c75ce00c72c851c298d1 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 16 Feb 2023 10:39:37 -0500 Subject: [PATCH 21/23] :recycle: Resample atlases nearest-neighbor --- CPAC/utils/datasource.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/CPAC/utils/datasource.py b/CPAC/utils/datasource.py index 1bb91074a1..3d9a83cb1c 100644 --- a/CPAC/utils/datasource.py +++ b/CPAC/utils/datasource.py @@ -924,7 +924,8 @@ def res_string_to_tuple(resolution): return (float(resolution.replace('mm', '')),) * 3 -def resolve_resolution(resolution, template, template_name, tag=None): +def resolve_resolution(resolution, template, template_name, tag=None, + resample_mode='Cu'): from nipype.interfaces import afni from CPAC.pipeline import nipype_pipeline_engine as pe from CPAC.utils.datasource import check_for_s3 @@ -962,7 +963,7 @@ def resolve_resolution(resolution, template, template_name, tag=None): mem_x=(0.0115, 'in_file', 't')) resample.inputs.voxel_size = res_string_to_tuple(resolution) resample.inputs.outputtype = 'NIFTI_GZ' - resample.inputs.resample_mode = 'Cu' + resample.inputs.resample_mode = resample_mode resample.inputs.in_file = local_path resample.base_dir = '.' @@ -1098,24 +1099,23 @@ def gather_atlases(wf, cfg, strat_pool, pipe_num, opt=None): fields=['atlas_name'], mandatory_inputs=True), name=f'{atlas_name}_name') atlas_name_node.inputs.atlas_name = atlas_name - gather = create_check_for_s3_node( - atlas_name, atlas, img_type='mask', - creds_path=cfg['pipeline_setup', 'input_creds_path']) resampled = pe.Node(Function(input_names=['resolution', 'template', 'template_name', - 'tag'], + 'tag', + 'resample_mode'], output_names=['local_path'], function=resolve_resolution, as_module=True), name=f'resampled_{atlas_name}') + resampled.inputs.template = atlas resampled.inputs.resolution = cfg[ "registration_workflows", "functional_registration", "func_registration_to_template", "output_resolution", "func_preproc_outputs"] resampled.inputs.tag = 'func_preproc_outputs' resampled.inputs.template_name = atlas_name - wf.connect(gather, 'local_path', resampled, 'template') + resampled.inputs.resample_mode = 'NN' for spec, _specified in specified_atlases.items(): for analysis, _atlases in _specified.items(): if atlas in _atlases: From 80481d51afe72863c64840851040f68a35a1d353 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Thu, 16 Feb 2023 15:37:37 -0500 Subject: [PATCH 22/23] :recycle: Account for not-yet-in-output-res preproc_bold --- CPAC/timeseries/timeseries_analysis.py | 38 +++++++++++++------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index a2625612e8..73b0cf5ac9 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -856,25 +856,25 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): ) brain_mask_node, brain_mask_out = strat_pool.get_data([ 'space-template_desc-brain_mask']) - if 'func_to_ROI' in realignment: - resample_brain_mask_roi = pe.Node( - resample_function(), - name=f'resample_brain_mask_roi_{pipe_num}') - resample_brain_mask_roi.inputs.realignment = realignment - resample_brain_mask_roi.inputs.identity_matrix = ( - cfg.registration_workflows['functional_registration'][ - 'func_registration_to_template' - ]['FNIRT_pipelines']['identity_matrix']) - wf.connect([ - (brain_mask_node, resample_brain_mask_roi, [ - (brain_mask_out, 'in_func')]), - (roi_dataflow, resample_brain_mask_roi, [ - ('outputspec.out_file', 'in_roi')]), - (resample_brain_mask_roi, timeseries_correlation, [ - ('out_func', 'inputspec.mask')])]) - else: - wf.connect(brain_mask_node, brain_mask_out, - timeseries_correlation, 'inputspec.mask') + # if 'func_to_ROI' in realignment: + resample_brain_mask_roi = pe.Node( + resample_function(), + name=f'resample_brain_mask_roi_{pipe_num}') + resample_brain_mask_roi.inputs.realignment = realignment + resample_brain_mask_roi.inputs.identity_matrix = ( + cfg.registration_workflows['functional_registration'][ + 'func_registration_to_template' + ]['FNIRT_pipelines']['identity_matrix']) + wf.connect([ + (brain_mask_node, resample_brain_mask_roi, [ + (brain_mask_out, 'in_func')]), + (fork_atlases, resample_brain_mask_roi, [ + ('atlas_file', 'in_roi')]), + (resample_brain_mask_roi, timeseries_correlation, [ + ('out_func', 'inputspec.mask')])]) + # else: + # wf.connect(brain_mask_node, brain_mask_out, + # timeseries_correlation, 'inputspec.mask') timeseries_correlation.inputs.inputspec.method = cm_measure wf.connect([ From d830d45aec982bfc8b546edcf72d8ee0f0a078d9 Mon Sep 17 00:00:00 2001 From: Jon Clucas Date: Mon, 20 Feb 2023 13:41:00 -0500 Subject: [PATCH 23/23] :necktie: Drop optional mask from AFNI connectivity matrices --- CPAC/timeseries/timeseries_analysis.py | 24 +----------------------- 1 file changed, 1 insertion(+), 23 deletions(-) diff --git a/CPAC/timeseries/timeseries_analysis.py b/CPAC/timeseries/timeseries_analysis.py index 73b0cf5ac9..107bb01042 100644 --- a/CPAC/timeseries/timeseries_analysis.py +++ b/CPAC/timeseries/timeseries_analysis.py @@ -796,8 +796,7 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): "option_key": "None", "option_val": "None", "inputs": [("atlas-tse-Avg", "atlas-tse-Avg_name"), - "space-template_desc-preproc_bold", - "space-template_desc-brain_mask"], + "space-template_desc-preproc_bold"], "outputs": ["space-template_desc-Mean_timeseries", "space-template_desc-ndmg_correlations", "atlas_name", @@ -854,27 +853,6 @@ def timeseries_extraction_AVG(wf, cfg, strat_pool, pipe_num, opt=None): method=cm_measure, pipe_num=pipe_num ) - brain_mask_node, brain_mask_out = strat_pool.get_data([ - 'space-template_desc-brain_mask']) - # if 'func_to_ROI' in realignment: - resample_brain_mask_roi = pe.Node( - resample_function(), - name=f'resample_brain_mask_roi_{pipe_num}') - resample_brain_mask_roi.inputs.realignment = realignment - resample_brain_mask_roi.inputs.identity_matrix = ( - cfg.registration_workflows['functional_registration'][ - 'func_registration_to_template' - ]['FNIRT_pipelines']['identity_matrix']) - wf.connect([ - (brain_mask_node, resample_brain_mask_roi, [ - (brain_mask_out, 'in_func')]), - (fork_atlases, resample_brain_mask_roi, [ - ('atlas_file', 'in_roi')]), - (resample_brain_mask_roi, timeseries_correlation, [ - ('out_func', 'inputspec.mask')])]) - # else: - # wf.connect(brain_mask_node, brain_mask_out, - # timeseries_correlation, 'inputspec.mask') timeseries_correlation.inputs.inputspec.method = cm_measure wf.connect([