Skip to content

Commit 356dc30

Browse files
authored
Merge pull request #1778 from FCP-INDI/deeper-config-path
🚚 Move C-PAC generated configs into `log/sub-${sub}_ses-${ses}` subdirectory
2 parents b1ce64b + 55667a2 commit 356dc30

File tree

9 files changed

+212
-118
lines changed

9 files changed

+212
-118
lines changed

.pylintrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ contextmanager-decorators=contextlib.contextmanager
291291
# List of members which are set dynamically and missed by pylint inference
292292
# system, and so shouldn't trigger E1101 when accessed. Python regular
293293
# expressions are accepted.
294-
generated-members=
294+
generated-members=CPAC.utils.configuration.configuration.Configuration.*
295295

296296
# Tells whether missing members accessed in mixin class should be ignored. A
297297
# mixin class is detected if its name ends with "mixin" (case insensitive).

CPAC/pipeline/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@
2828
if preconfig != 'benchmark-ANTS' and
2929
not preconfig.startswith('regtest-')]
3030

31+
3132
__all__ = ['ALL_PIPELINE_CONFIGS', 'AVAILABLE_PIPELINE_CONFIGS']

CPAC/pipeline/cpac_pipeline.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@
199199
from CPAC.pipeline.random_state import set_up_random_state_logger
200200
from CPAC.pipeline.schema import valid_options
201201
from CPAC.utils.trimmer import the_trimmer
202-
from CPAC.utils import Configuration
202+
from CPAC.utils import Configuration, set_subject
203203

204204
from CPAC.qc.pipeline import create_qc_workflow
205205
from CPAC.qc.xcp import qc_xcp
@@ -259,17 +259,8 @@ def run_workflow(sub_dict, c, run, pipeline_timing_info=None, p_name=None,
259259
# Assure that changes on config will not affect other parts
260260
c = copy.copy(c)
261261

262-
subject_id = sub_dict['subject_id']
263-
if sub_dict['unique_id']:
264-
subject_id += "_" + sub_dict['unique_id']
265-
262+
subject_id, p_name, log_dir = set_subject(sub_dict, c)
266263
c['subject_id'] = subject_id
267-
if p_name is None:
268-
p_name = f'pipeline_{c.pipeline_setup["pipeline_name"]}'
269-
log_dir = os.path.join(c.pipeline_setup['log_directory']['path'],
270-
p_name, subject_id)
271-
if not os.path.exists(log_dir):
272-
os.makedirs(os.path.join(log_dir))
273264

274265
set_up_logger(f'{subject_id}_expectedOutputs',
275266
filename=f'{bidsier_prefix(c["subject_id"])}_'

CPAC/pipeline/cpac_runner.py

Lines changed: 83 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@
1717
import os
1818
import sys
1919
import warnings
20-
import yaml
2120
from multiprocessing import Process
2221
from time import strftime
22+
import yaml
2323
from voluptuous.error import Invalid
24-
from CPAC.utils.configuration import Configuration
24+
from CPAC.utils.configuration import check_pname, Configuration, set_subject
2525
from CPAC.utils.ga import track_run
2626
from CPAC.utils.monitoring import failed_to_start, log_nodes_cb
27-
from CPAC.longitudinal_pipeline.longitudinal_workflow import (
28-
anat_longitudinal_wf,
29-
func_preproc_longitudinal_wf,
30-
func_longitudinal_template_wf
31-
)
27+
from CPAC.longitudinal_pipeline.longitudinal_workflow import \
28+
anat_longitudinal_wf
3229
from CPAC.utils.yaml_template import upgrade_pipeline_to_1_8
3330

3431

@@ -324,10 +321,11 @@ def run(subject_list_file, config_file=None, p_name=None, plugin=None,
324321
warnings.warn("We recommend that the working directory full path "
325322
"should have less then 70 characters. "
326323
"Long paths might not work in your operating system.")
327-
warnings.warn("Current working directory: %s" % c.pipeline_setup['working_directory']['path'])
324+
warnings.warn("Current working directory: "
325+
f"{c.pipeline_setup['working_directory']['path']}")
328326

329327
# Get the pipeline name
330-
p_name = p_name or c.pipeline_setup['pipeline_name']
328+
p_name = check_pname(p_name, c)
331329

332330
# Load in subject list
333331
try:
@@ -577,82 +575,89 @@ def replace_index(target1, target2, file_path):
577575
p_name, plugin, plugin_args, test_config)
578576
except Exception as exception: # pylint: disable=broad-except
579577
exitcode = 1
580-
failed_to_start(c['pipeline_setup', 'log_directory',
581-
'path'], exception)
578+
failed_to_start(set_subject(sub, c)[2], exception)
582579
return exitcode
583580

584-
pid = open(os.path.join(
585-
c.pipeline_setup['working_directory']['path'], 'pid.txt'
586-
), 'w')
587-
588581
# Init job queue
589582
job_queue = []
590583

591584
# Allocate processes
592-
processes = [
593-
Process(target=run_workflow,
594-
args=(sub, c, True, pipeline_timing_info,
595-
p_name, plugin, plugin_args, test_config))
596-
for sub in sublist
597-
]
598-
599-
# If we're allocating more processes than are subjects, run them all
600-
if len(sublist) <= c.pipeline_setup['system_config']['num_participants_at_once']:
601-
for p in processes:
602-
try:
603-
p.start()
604-
print(p.pid, file=pid)
605-
except Exception: # pylint: disable=broad-except
606-
exitcode = 1
607-
failed_to_start(c['pipeline_setup', 'log_directory',
608-
'path'])
609-
610-
# Otherwise manage resources to run processes incrementally
611-
else:
612-
idx = 0
613-
while idx < len(sublist):
614-
# If the job queue is empty and we haven't started indexing
615-
if len(job_queue) == 0 and idx == 0:
616-
# Init subject process index
617-
idc = idx
618-
# Launch processes (one for each subject)
619-
for p in processes[idc: idc + c.pipeline_setup[
620-
'system_config']['num_participants_at_once']]:
621-
try:
622-
p.start()
623-
print(p.pid, file=pid)
624-
job_queue.append(p)
625-
idx += 1
626-
except Exception: # pylint: disable=broad-except
627-
exitcode = 1
628-
failed_to_start(c['pipeline_setup',
629-
'log_directory', 'path'])
630-
# Otherwise, jobs are running - check them
631-
else:
632-
# Check every job in the queue's status
633-
for job in job_queue:
634-
# If the job is not alive
635-
if not job.is_alive():
636-
# Find job and delete it from queue
637-
print('found dead job ', job)
638-
loc = job_queue.index(job)
639-
del job_queue[loc]
640-
# ...and start the next available process
641-
# (subject)
585+
processes = [Process(target=run_workflow,
586+
args=(sub, c, True, pipeline_timing_info, p_name,
587+
plugin, plugin_args, test_config)) for
588+
sub in sublist]
589+
working_dir = os.path.join(c['pipeline_setup', 'working_directory',
590+
'path'], p_name)
591+
# Create pipeline-specific working dir if not exists
592+
if not os.path.exists(working_dir):
593+
os.makedirs(working_dir)
594+
# Set PID context to pipeline-specific file
595+
with open(os.path.join(working_dir, 'pid.txt'), 'w', encoding='utf-8'
596+
) as pid:
597+
# If we're allocating more processes than are subjects, run
598+
# them all
599+
if len(sublist) <= c.pipeline_setup['system_config'][
600+
'num_participants_at_once']:
601+
for i, _p in enumerate(processes):
602+
try:
603+
_p.start()
604+
print(_p.pid, file=pid)
605+
# pylint: disable=broad-except
606+
except Exception as exception:
607+
exitcode = 1
608+
failed_to_start(set_subject(sublist[i], c)[2],
609+
exception)
610+
# Otherwise manage resources to run processes incrementally
611+
else:
612+
idx = 0
613+
while idx < len(sublist):
614+
# If the job queue is empty and we haven't started indexing
615+
if len(job_queue) == 0 and idx == 0:
616+
# Init subject process index
617+
idc = idx
618+
# Launch processes (one for each subject)
619+
for _p in processes[idc: idc + c.pipeline_setup[
620+
'system_config']['num_participants_at_once']]:
642621
try:
643-
processes[idx].start()
644-
# Append this to job queue and increment index
645-
job_queue.append(processes[idx])
622+
_p.start()
623+
print(_p.pid, file=pid)
624+
job_queue.append(_p)
646625
idx += 1
647-
except Exception: # pylint: disable=broad-except
626+
# pylint: disable=broad-except
627+
except Exception as exception:
648628
exitcode = 1
649-
failed_to_start(c['pipeline_setup',
650-
'log_directory', 'path'])
651-
# Add sleep so while loop isn't consuming 100% of CPU
652-
time.sleep(2)
653-
# set exitcode to 1 if any exception
654-
if hasattr(pid, 'exitcode'):
655-
exitcode = exitcode or pid.exitcode
656-
# Close PID txt file to indicate finish
657-
pid.close()
629+
failed_to_start(set_subject(sublist[idx],
630+
c)[2], exception)
631+
# Otherwise, jobs are running - check them
632+
else:
633+
# Check every job in the queue's status
634+
for job in job_queue:
635+
# If the job is not alive
636+
if not job.is_alive():
637+
# Find job and delete it from queue
638+
print('found dead job ', job)
639+
loc = job_queue.index(job)
640+
del job_queue[loc]
641+
# ...and start the next available
642+
# process (subject)
643+
try:
644+
processes[idx].start()
645+
# Append this to job queue and
646+
# increment index
647+
# pylint: disable=modified-iterating-list
648+
job_queue.append(processes[idx])
649+
idx += 1
650+
# pylint: disable=broad-except
651+
except Exception as exception:
652+
exitcode = 1
653+
failed_to_start(set_subject(sublist[idx],
654+
c)[2],
655+
exception)
656+
# Add sleep so while loop isn't consuming 100% of CPU
657+
time.sleep(2)
658+
# set exitcode to 1 if any exception
659+
if hasattr(pid, 'exitcode'):
660+
exitcode = exitcode or pid.exitcode
661+
# Close PID txt file to indicate finish
662+
pid.close()
658663
sys.exit(exitcode)

CPAC/pipeline/random_state/seed.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
from nipype.interfaces.fsl.maths import MathsCommand
1111
from nipype.interfaces.fsl.utils import ImageMaths
1212

13-
from CPAC.registration.utils import hardcoded_reg
1413
from CPAC.utils.interfaces.ants import AI
1514
from CPAC.utils.monitoring.custom_logging import set_up_logger
1615

@@ -92,6 +91,7 @@ def random_seed_flags():
9291
... 'functions', 'interfaces']])
9392
True
9493
'''
94+
from CPAC.registration.utils import hardcoded_reg
9595
seed = random_seed()
9696
if seed is None:
9797
return {'functions': {}, 'interfaces': {}}

CPAC/utils/__init__.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from .interfaces import function, masktool
66
from .extract_data import run
77
from .datatypes import ListFromItem
8-
from .configuration import Configuration
8+
from .configuration import check_pname, Configuration, set_subject
9+
from .strategy import Strategy
10+
from .outputs import Outputs
911

1012
from .utils import (
1113
get_zscore,
@@ -35,6 +37,5 @@
3537
repickle,
3638
)
3739

38-
__all__ = [
39-
'function', 'ListFromItem'
40-
]
40+
__all__ = ['check_pname', 'Configuration', 'function', 'ListFromItem',
41+
'set_subject']

CPAC/utils/configuration/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
1717
You should have received a copy of the GNU Lesser General Public
1818
License along with C-PAC. If not, see <https://www.gnu.org/licenses/>."""
19-
from .configuration import Configuration, DEFAULT_PIPELINE_FILE, \
20-
Preconfiguration
19+
from .configuration import check_pname, Configuration, DEFAULT_PIPELINE_FILE, \
20+
Preconfiguration, set_subject
2121
from . import configuration, diff
2222

23-
__all__ = ['Configuration', 'configuration', 'DEFAULT_PIPELINE_FILE', 'diff',
24-
'Preconfiguration']
23+
__all__ = ['check_pname', 'Configuration', 'configuration',
24+
'DEFAULT_PIPELINE_FILE', 'diff', 'Preconfiguration', 'set_subject']

0 commit comments

Comments
 (0)