Skip to content

Commit fefc26b

Browse files
committed
rf: queue support
1 parent b25a7ae commit fefc26b

File tree

3 files changed

+132
-56
lines changed

3 files changed

+132
-56
lines changed

heudiconv/cli/run.py

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#!/usr/bin/env python
2+
13
import os
24
import os.path as op
35
from argparse import ArgumentParser
@@ -215,12 +217,11 @@ def get_parser():
215217
parser.add_argument('--dcmconfig', default=None,
216218
help='JSON file for additional dcm2niix configuration')
217219
submission = parser.add_argument_group('Conversion submission options')
218-
submission.add_argument('-q', '--queue', default=None,
219-
help='select batch system to submit jobs to instead'
220-
' of running the conversion serially')
221-
submission.add_argument('--sbargs', dest='sbatch_args', default=None,
222-
help='Additional sbatch arguments if running with '
223-
'queue arg')
220+
submission.add_argument('-q', '--queue', choices=("SLURM", None),
221+
default=None,
222+
help='batch system to submit jobs in parallel')
223+
submission.add_argument('--queue-args', dest='queue_args', default=None,
224+
help='Additional queue arguments')
224225
return parser
225226

226227

@@ -281,27 +282,28 @@ def process_args(args):
281282
continue
282283

283284
if args.queue:
284-
if seqinfo and not dicoms:
285-
# flatten them all and provide into batching, which again
286-
# would group them... heh
287-
dicoms = sum(seqinfo.values(), [])
288-
raise NotImplementedError(
289-
"we already grouped them so need to add a switch to avoid "
290-
"any grouping, so no outdir prefix doubled etc")
291-
292-
progname = op.abspath(inspect.getfile(inspect.currentframe()))
293-
294-
queue_conversion(progname,
285+
# if seqinfo and not dicoms:
286+
# # flatten them all and provide into batching, which again
287+
# # would group them... heh
288+
# dicoms = sum(seqinfo.values(), [])
289+
# raise NotImplementedError(
290+
# "we already grouped them so need to add a switch to avoid "
291+
# "any grouping, so no outdir prefix doubled etc")
292+
293+
pyscript = op.abspath(inspect.getfile(inspect.currentframe()))
294+
295+
studyid = sid
296+
if session:
297+
studyid += "-%s" % session
298+
if locator:
299+
studyid += "-%s" % locator
300+
# remove any separators
301+
studyid = studyid.replace(op.sep, '_')
302+
303+
queue_conversion(pyscript,
295304
args.queue,
296-
outdir,
297-
heuristic.filename,
298-
dicoms,
299-
sid,
300-
args.anon_cmd,
301-
args.converter,
302-
session,
303-
args.with_prov,
304-
args.bids)
305+
studyid,
306+
args.queue_args)
305307
continue
306308

307309
anon_sid = anonymize_sid(sid, args.anon_cmd) if args.anon_cmd else None

heudiconv/queue.py

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,62 @@
1-
import os
2-
import os.path as op
1+
import subprocess
2+
import sys
33

44
import logging
55

66
lgr = logging.getLogger(__name__)
77

8-
# start with SLURM but extend past that #TODO
9-
def queue_conversion(progname, queue, outdir, heuristic, dicoms, sid,
10-
anon_cmd, converter, session,with_prov, bids):
11-
12-
# Rework this...
13-
convertcmd = ' '.join(['python', progname,
14-
'-o', outdir,
15-
'-f', heuristic,
16-
'-s', sid,
17-
'--anon-cmd', anon_cmd,
18-
'-c', converter])
19-
if session:
20-
convertcmd += " --ses '%s'" % session
21-
if with_prov:
22-
convertcmd += " --with-prov"
23-
if bids:
24-
convertcmd += " --bids"
25-
if dicoms:
26-
convertcmd += " --files"
27-
convertcmd += [" '%s'" % f for f in dicoms]
28-
29-
script_file = 'dicom-%s.sh' % sid
30-
with open(script_file, 'wt') as fp:
31-
fp.writelines(['#!/bin/bash\n', convertcmd])
32-
outcmd = 'sbatch -J dicom-%s -p %s -N1 -c2 --mem=20G %s' \
33-
% (sid, queue, script_file)
34-
35-
os.system(outcmd)
8+
def queue_conversion(pyscript, queue, studyid, queue_args=None):
9+
"""
10+
Write out conversion arguments to file and submit to a job scheduler.
11+
Parses `sys.argv` for heudiconv arguments.
12+
13+
Parameters
14+
----------
15+
pyscript: file
16+
path to `heudiconv` script
17+
queue: string
18+
batch scheduler to use
19+
studyid: string
20+
identifier for conversion
21+
queue_args: string (optional)
22+
additional queue arguments for job submission
23+
24+
Returns
25+
-------
26+
proc: int
27+
Queue submission exit code
28+
"""
29+
30+
SUPPORTED_QUEUES = {'SLURM': 'sbatch'}
31+
if queue not in SUPPORTED_QUEUES:
32+
raise NotImplementedError("Queuing with %s is not supported", queue)
33+
34+
args = sys.argv[1:]
35+
print(sys.argv)
36+
# search args for queue flag
37+
for i, arg in enumerate(args):
38+
if arg in ["-q", "--queue"]:
39+
break
40+
if i == len(args) - 1:
41+
raise RuntimeError(
42+
"Queue flag not found (must be provided as a command-line arg)"
43+
)
44+
# remove queue flag and value
45+
del args[i:i+2]
46+
47+
# make arguments executable again
48+
args.insert(0, pyscript)
49+
pypath = sys.executable or "python"
50+
args.insert(0, pypath)
51+
convertcmd = " ".join(args)
52+
53+
# will overwrite across subjects
54+
queue_file = 'heudiconv-%s.sh' % queue
55+
with open(queue_file, 'wt') as fp:
56+
fp.writelines(['#!/bin/bash\n', convertcmd, '\n'])
57+
58+
cmd = [SUPPORTED_QUEUES[queue], queue_file]
59+
if queue_args:
60+
cmd.insert(1, queue_args)
61+
proc = subprocess.call(cmd)
62+
return proc

heudiconv/tests/test_queue.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import os
2+
import sys
3+
import subprocess
4+
5+
from heudiconv.cli.run import main as runner
6+
from .utils import TESTS_DATA_PATH
7+
import pytest
8+
from nipype.utils.filemanip import which
9+
10+
@pytest.mark.skipif(which("sbatch"), reason="skip a real slurm call")
11+
@pytest.mark.parametrize(
12+
'invocation', [
13+
"--files %s/01-fmap_acq-3mm" % TESTS_DATA_PATH, # our new way with automated groupping
14+
"-d %s/{subject}/* -s 01-fmap_acq-3mm" % TESTS_DATA_PATH # "old" way specifying subject
15+
])
16+
def test_queue_no_slurm(tmpdir, invocation):
17+
tmpdir.chdir()
18+
hargs = invocation.split(" ")
19+
hargs.extend(["-f", "reproin", "-b", "--minmeta", "--queue", "SLURM"])
20+
print(hargs)
21+
22+
# simulate command-line call
23+
_sys_args = sys.argv
24+
sys.argv = ['heudiconv'] + hargs
25+
26+
try:
27+
with pytest.raises(FileNotFoundError):
28+
runner(hargs)
29+
# should have generated a slurm submission script
30+
slurm_cmd_file = tmpdir / 'heudiconv-SLURM.sh'
31+
assert slurm_cmd_file
32+
# check contents and ensure args match
33+
with open(slurm_cmd_file) as fp:
34+
lines = fp.readlines()
35+
assert lines[0] == "#!/bin/bash\n"
36+
cmd = lines[1]
37+
38+
# check that all flags we gave still being called
39+
for arg in hargs:
40+
# except --queue <queue>
41+
if arg in ['--queue', 'SLURM']:
42+
assert arg not in cmd
43+
else:
44+
assert arg in cmd
45+
finally:
46+
# revert before breaking something
47+
sys.argv = _sys_args

0 commit comments

Comments
 (0)