Skip to content

Commit 6b2a460

Browse files
authored
Merge pull request #328 from mgxd/fix/slurm
RF: improve support for queue args
2 parents 3467341 + 4621423 commit 6b2a460

File tree

7 files changed

+171
-90
lines changed

7 files changed

+171
-90
lines changed

heudiconv/bids.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from random import sample
1212
from glob import glob
1313

14-
from heudiconv.external.pydicom import dcm
14+
from .external.pydicom import dcm
1515

1616
from .parser import find_files
1717
from .utils import (

heudiconv/cli/run.py

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def get_parser():
142142
group.add_argument('--files', nargs='*',
143143
help='Files (tarballs, dicoms) or directories '
144144
'containing files to process. Cannot be provided if '
145-
'using --dicom_dir_template or --subjects')
145+
'using --dicom_dir_template.')
146146
parser.add_argument('-s', '--subjects', dest='subjs', type=str, nargs='*',
147147
help='list of subjects - required for dicom template. '
148148
'If not provided, DICOMS would first be "sorted" and '
@@ -173,8 +173,6 @@ def get_parser():
173173
'single argument and return a single anonymized ID. '
174174
'Also see --conv-outdir')
175175
parser.add_argument('-f', '--heuristic', dest='heuristic',
176-
# some commands might not need heuristic
177-
# required=True,
178176
help='Name of a known heuristic or path to the Python'
179177
'script containing heuristic')
180178
parser.add_argument('-p', '--with-prov', action='store_true',
@@ -221,7 +219,9 @@ def get_parser():
221219
default=None,
222220
help='batch system to submit jobs in parallel')
223221
submission.add_argument('--queue-args', dest='queue_args', default=None,
224-
help='Additional queue arguments')
222+
help='Additional queue arguments passed as '
223+
'single string of Argument=Value pairs space '
224+
'separated.')
225225
return parser
226226

227227

@@ -246,6 +246,13 @@ def process_args(args):
246246
if not args.heuristic:
247247
raise RuntimeError("No heuristic specified - add to arguments and rerun")
248248

249+
if args.queue:
250+
lgr.info("Queuing %s conversion", args.queue)
251+
iterarg, iterables = ("files", len(args.files)) if args.files else \
252+
("subjects", len(args.subjs))
253+
queue_conversion(args.queue, iterarg, iterables, args.queue_args)
254+
sys.exit(0)
255+
249256
heuristic = load_heuristic(args.heuristic)
250257

251258
study_sessions = get_study_sessions(args.dicom_dir_template, args.files,
@@ -281,31 +288,6 @@ def process_args(args):
281288
lgr.warning("Skipping unknown locator dataset")
282289
continue
283290

284-
if args.queue:
285-
# if seqinfo and not dicoms:
286-
# # flatten them all and provide into batching, which again
287-
# # would group them... heh
288-
# dicoms = sum(seqinfo.values(), [])
289-
# raise NotImplementedError(
290-
# "we already grouped them so need to add a switch to avoid "
291-
# "any grouping, so no outdir prefix doubled etc")
292-
293-
pyscript = op.abspath(inspect.getfile(inspect.currentframe()))
294-
295-
studyid = sid
296-
if session:
297-
studyid += "-%s" % session
298-
if locator:
299-
studyid += "-%s" % locator
300-
# remove any separators
301-
studyid = studyid.replace(op.sep, '_')
302-
303-
queue_conversion(pyscript,
304-
args.queue,
305-
studyid,
306-
args.queue_args)
307-
continue
308-
309291
anon_sid = anonymize_sid(sid, args.anon_cmd) if args.anon_cmd else None
310292
if args.anon_cmd:
311293
lgr.info('Anonymized {} to {}'.format(sid, anon_sid))

heudiconv/dicoms.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
import logging
55
from collections import OrderedDict
66
import tarfile
7-
from heudiconv.external.pydicom import dcm
87

8+
from .external.pydicom import dcm
99
from .utils import SeqInfo, load_json, set_readonly
1010

1111
lgr = logging.getLogger(__name__)
@@ -55,10 +55,10 @@ def group_dicoms_into_seqinfos(files, file_filter, dcmfilter, grouping):
5555
lgr.info('Filtering out {0} dicoms based on their filename'.format(
5656
nfl_before-nfl_after))
5757
for fidx, filename in enumerate(files):
58-
from heudiconv.external.dcmstack import ds
58+
import nibabel.nicom.dicomwrappers as dw
5959
# TODO after getting a regression test check if the same behavior
6060
# with stop_before_pixels=True
61-
mw = ds.wrapper_from_data(dcm.read_file(filename, force=True))
61+
mw = dw.wrapper_from_data(dcm.read_file(filename, force=True))
6262

6363
for sig in ('iop', 'ICE_Dims', 'SequenceName'):
6464
try:
@@ -385,7 +385,7 @@ def embed_nifti(dcmfiles, niftifile, infofile, bids_info, min_meta):
385385
import re
386386

387387
if not min_meta:
388-
import dcmstack as ds
388+
from heudiconv.external.dcmstack import ds
389389
stack = ds.parse_and_stack(dcmfiles, force=True).values()
390390
if len(stack) > 1:
391391
raise ValueError('Found multiple series')

heudiconv/external/dcmstack.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
try:
88
import dcmstack as ds
99
except ImportError as e:
10-
from heudiconv import lgr
10+
from .. import lgr
1111
# looks different between py2 and 3 so we go for very rudimentary matching
1212
e_str = str(e)
1313
# there were changes from how

heudiconv/queue.py

Lines changed: 97 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,112 @@
11
import subprocess
22
import sys
33
import os
4-
54
import logging
65

6+
from .utils import which
7+
78
lgr = logging.getLogger(__name__)
89

9-
def queue_conversion(pyscript, queue, studyid, queue_args=None):
10-
"""
11-
Write out conversion arguments to file and submit to a job scheduler.
12-
Parses `sys.argv` for heudiconv arguments.
13-
14-
Parameters
15-
----------
16-
pyscript: file
17-
path to `heudiconv` script
18-
queue: string
19-
batch scheduler to use
20-
studyid: string
21-
identifier for conversion
22-
queue_args: string (optional)
23-
additional queue arguments for job submission
24-
25-
Returns
26-
-------
27-
proc: int
28-
Queue submission exit code
29-
"""
30-
31-
SUPPORTED_QUEUES = {'SLURM': 'sbatch'}
32-
if queue not in SUPPORTED_QUEUES:
33-
raise NotImplementedError("Queuing with %s is not supported", queue)
34-
35-
args = sys.argv[1:]
36-
# search args for queue flag
37-
for i, arg in enumerate(args):
38-
if arg in ["-q", "--queue"]:
39-
break
40-
if i == len(args) - 1:
41-
raise RuntimeError(
42-
"Queue flag not found (must be provided as a command-line arg)"
43-
)
44-
# remove queue flag and value
45-
del args[i:i+2]
46-
47-
# make arguments executable again
48-
args.insert(0, pyscript)
49-
pypath = sys.executable or "python"
50-
args.insert(0, pypath)
10+
def queue_conversion(queue, iterarg, iterables, queue_args=None):
11+
"""
12+
Write out conversion arguments to file and submit to a job scheduler.
13+
Parses `sys.argv` for heudiconv arguments.
14+
15+
Parameters
16+
----------
17+
queue: string
18+
Batch scheduler to use
19+
iterarg: str
20+
Multi-argument to index (`subjects` OR `files`)
21+
iterables: int
22+
Number of `iterarg` arguments
23+
queue_args: string (optional)
24+
Additional queue arguments for job submission
25+
26+
"""
27+
28+
SUPPORTED_QUEUES = {'SLURM': 'sbatch'}
29+
if queue not in SUPPORTED_QUEUES:
30+
raise NotImplementedError("Queuing with %s is not supported", queue)
31+
32+
for i in range(iterables):
33+
args = clean_args(sys.argv[1:], iterarg, i)
34+
# make arguments executable
35+
heudiconv_exec = which("heudiconv") or "heudiconv"
36+
args.insert(0, heudiconv_exec)
5137
convertcmd = " ".join(args)
5238

5339
# will overwrite across subjects
5440
queue_file = os.path.abspath('heudiconv-%s.sh' % queue)
5541
with open(queue_file, 'wt') as fp:
56-
fp.writelines(['#!/bin/bash\n', convertcmd, '\n'])
42+
fp.write("#!/bin/bash\n")
43+
if queue_args:
44+
for qarg in queue_args.split():
45+
fp.write("#SBATCH %s\n" % qarg)
46+
fp.write(convertcmd + "\n")
5747

5848
cmd = [SUPPORTED_QUEUES[queue], queue_file]
59-
if queue_args:
60-
cmd.insert(1, queue_args)
6149
proc = subprocess.call(cmd)
62-
return proc
50+
lgr.info("Submitted %d jobs", iterables)
51+
52+
def clean_args(hargs, iterarg, iteridx):
53+
"""
54+
Filters arguments for batch submission.
55+
56+
Parameters
57+
----------
58+
hargs: list
59+
Command-line arguments
60+
iterarg: str
61+
Multi-argument to index (`subjects` OR `files`)
62+
iteridx: int
63+
`iterarg` index to submit
64+
65+
Returns
66+
-------
67+
cmdargs : list
68+
Filtered arguments for batch submission
69+
70+
Example
71+
--------
72+
>>> from heudiconv.queue import clean_args
73+
>>> cmd = ['heudiconv', '-d', '/some/{subject}/path',
74+
... '-q', 'SLURM',
75+
... '-s', 'sub-1', 'sub-2', 'sub-3', 'sub-4']
76+
>>> clean_args(cmd, 'subjects', 0)
77+
['heudiconv', '-d', '/some/{subject}/path', '-s', 'sub-1']
78+
"""
79+
80+
if iterarg == "subjects":
81+
iterarg = ['-s', '--subjects']
82+
elif iterarg == "files":
83+
iterarg = ['--files']
84+
else:
85+
raise ValueError("Cannot index %s" % iterarg)
86+
87+
# remove these or cause an infinite loop
88+
queue_args = ['-q', '--queue', '--queue-args']
89+
90+
# control variables for multi-argument parsing
91+
is_iterarg = False
92+
itercount = 0
93+
94+
indicies = []
95+
cmdargs = hargs[:]
96+
97+
for i, arg in enumerate(hargs):
98+
if arg.startswith('-') and is_iterarg:
99+
# moving on to another argument
100+
is_iterarg = False
101+
if is_iterarg:
102+
if iteridx != itercount:
103+
indicies.append(i)
104+
itercount += 1
105+
if arg in iterarg:
106+
is_iterarg = True
107+
if arg in queue_args:
108+
indicies.extend([i, i+1])
109+
110+
for j in sorted(indicies, reverse=True):
111+
del cmdargs[j]
112+
return cmdargs

heudiconv/tests/test_queue.py

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
import subprocess
44

55
from heudiconv.cli.run import main as runner
6+
from heudiconv.queue import clean_args, which
67
from .utils import TESTS_DATA_PATH
78
import pytest
8-
from nipype.utils.filemanip import which
99

1010
@pytest.mark.skipif(which("sbatch"), reason="skip a real slurm call")
1111
@pytest.mark.parametrize(
@@ -23,7 +23,7 @@ def test_queue_no_slurm(tmpdir, invocation):
2323
sys.argv = ['heudiconv'] + hargs
2424

2525
try:
26-
with pytest.raises(OSError):
26+
with pytest.raises(OSError): # SLURM should not be installed
2727
runner(hargs)
2828
# should have generated a slurm submission script
2929
slurm_cmd_file = (tmpdir / 'heudiconv-SLURM.sh').strpath
@@ -44,3 +44,50 @@ def test_queue_no_slurm(tmpdir, invocation):
4444
finally:
4545
# revert before breaking something
4646
sys.argv = _sys_args
47+
48+
def test_argument_filtering(tmpdir):
49+
cmd_files = [
50+
'heudiconv',
51+
'--files',
52+
'/fake/path/to/files',
53+
'/another/fake/path',
54+
'-f',
55+
'convertall',
56+
'-q',
57+
'SLURM',
58+
'--queue-args',
59+
'--cpus-per-task=4 --contiguous --time=10'
60+
]
61+
filtered = [
62+
'heudiconv',
63+
'--files',
64+
'/another/fake/path',
65+
'-f',
66+
'convertall',
67+
]
68+
assert clean_args(cmd_files, 'files', 1) == filtered
69+
70+
cmd_subjects = [
71+
'heudiconv',
72+
'-d',
73+
'/some/{subject}/path',
74+
'--queue',
75+
'SLURM',
76+
'--subjects',
77+
'sub1',
78+
'sub2',
79+
'sub3',
80+
'sub4',
81+
'-f',
82+
'convertall'
83+
]
84+
filtered = [
85+
'heudiconv',
86+
'-d',
87+
'/some/{subject}/path',
88+
'--subjects',
89+
'sub3',
90+
'-f',
91+
'convertall'
92+
]
93+
assert clean_args(cmd_subjects, 'subjects', 2) == filtered

heudiconv/utils.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
from pathlib import Path
1313
from collections import namedtuple
1414
from glob import glob
15+
from subprocess import check_output
16+
17+
from nipype.utils.filemanip import which
1518

1619
import logging
1720
lgr = logging.getLogger(__name__)
@@ -103,18 +106,17 @@ def dec(obj):
103106

104107

105108
def anonymize_sid(sid, anon_sid_cmd):
106-
import sys
107-
from subprocess import check_output
108-
109+
109110
cmd = [anon_sid_cmd, sid]
110111
shell_return = check_output(cmd)
111112

112-
### Handle subprocess returning a bytes literal string to a python3 interpreter
113-
if all([sys.version_info[0] > 2, isinstance(shell_return, bytes), isinstance(sid, str)]):
113+
if all([sys.version_info[0] > 2,
114+
isinstance(shell_return, bytes),
115+
isinstance(sid, str)]):
114116
anon_sid = shell_return.decode()
115117
else:
116118
anon_sid = shell_return
117-
119+
118120
return anon_sid.strip()
119121

120122

0 commit comments

Comments
 (0)