Skip to content

Commit f058c31

Browse files
committed
Merge pull request #496 from chrisfilo/enh/graph
Respect custom execution parameters in graph based models.
2 parents 03e64b8 + b67b1dd commit f058c31

File tree

5 files changed

+88
-68
lines changed

5 files changed

+88
-68
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def run(self, graph, config, updatehash=False):
218218
# setup polling - TODO: change to threaded model
219219
notrun = []
220220
while np.any(self.proc_done == False) | \
221-
np.any(self.proc_pending == True):
221+
np.any(self.proc_pending == True):
222222
toappend = []
223223
# trigger callbacks for any pending results
224224
while self.pending_tasks:
@@ -297,11 +297,12 @@ def _submit_mapnode(self, jobid):
297297
self.procs.extend(mapnodesubids)
298298
self.depidx = ssp.vstack((self.depidx,
299299
ssp.lil_matrix(np.zeros((numnodes,
300-
self.depidx.shape[1])))),
300+
self.depidx.shape[1])))),
301301
'lil')
302302
self.depidx = ssp.hstack((self.depidx,
303-
ssp.lil_matrix(np.zeros((self.depidx.shape[0],
304-
numnodes)))),
303+
ssp.lil_matrix(
304+
np.zeros((self.depidx.shape[0],
305+
numnodes)))),
305306
'lil')
306307
self.depidx[-numnodes:, jobid] = 1
307308
self.proc_done = np.concatenate((self.proc_done,
@@ -315,7 +316,7 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
315316
"""
316317
while np.any(self.proc_done == False):
317318
# Check to see if a job is available
318-
jobids = np.flatnonzero((self.proc_done == False) & \
319+
jobids = np.flatnonzero((self.proc_done == False) &
319320
(self.depidx.sum(axis=0) == 0).__array__())
320321
if len(jobids) > 0:
321322
# send all available jobs
@@ -336,20 +337,21 @@ def _send_procs_to_workers(self, updatehash=False, slots=None, graph=None):
336337
self.proc_done[jobid] = True
337338
self.proc_pending[jobid] = True
338339
# Send job to task manager and add to pending tasks
339-
logger.info('Executing: %s ID: %d' % \
340-
(self.procs[jobid]._id, jobid))
340+
logger.info('Executing: %s ID: %d' %
341+
(self.procs[jobid]._id, jobid))
341342
if self._status_callback:
342343
self._status_callback(self.procs[jobid], 'start')
343344
continue_with_submission = True
344345
if str2bool(self.procs[jobid].config['execution']['local_hash_check']):
345346
logger.debug('checking hash locally')
346347
try:
347-
hash_exists, _, _, _ = self.procs[jobid].hash_exists()
348+
hash_exists, _, _, _ = self.procs[
349+
jobid].hash_exists()
348350
logger.debug('Hash exists %s' % str(hash_exists))
349351
if (hash_exists and
350-
(self.procs[jobid].overwrite == False or
351-
(self.procs[jobid].overwrite == None and
352-
not self.procs[jobid]._interface.always_run))):
352+
(self.procs[jobid].overwrite == False or
353+
(self.procs[jobid].overwrite == None and
354+
not self.procs[jobid]._interface.always_run))):
353355
continue_with_submission = False
354356
self._task_finished_cb(jobid)
355357
self._remove_node_dirs()
@@ -385,7 +387,7 @@ def _task_finished_cb(self, jobid):
385387
386388
This is called when a job is completed.
387389
"""
388-
logger.info('[Job finished] jobname: %s jobid: %d' % \
390+
logger.info('[Job finished] jobname: %s jobid: %d' %
389391
(self.procs[jobid]._id, jobid))
390392
if self._status_callback:
391393
self._status_callback(self.procs[jobid], 'end')
@@ -431,7 +433,7 @@ def _remove_node_dirs(self):
431433
self.refidx[idx, idx] = -1
432434
outdir = self.procs[idx]._output_directory()
433435
logger.info(('[node dependencies finished] '
434-
'removing node: %s from directory %s') % \
436+
'removing node: %s from directory %s') %
435437
(self.procs[idx]._id, outdir))
436438
shutil.rmtree(outdir)
437439

@@ -562,9 +564,26 @@ def run(self, graph, config, updatehash=False):
562564
store_exception=False))
563565
dependencies[idx] = [nodes.index(prevnode) for prevnode in
564566
graph.predecessors(node)]
565-
self._submit_graph(pyfiles, dependencies)
567+
self._submit_graph(pyfiles, dependencies, nodes)
568+
569+
def _get_args(self, node, keywords):
570+
values = ()
571+
for keyword in keywords:
572+
value = getattr(self, "_" + keyword)
573+
if keyword == "template" and os.path.isfile(value):
574+
value = open(value).read()
575+
if hasattr(node, "plugin_args") and isinstance(node.plugin_args, dict) and keyword in node.plugin_args:
576+
if keyword == "template" and os.path.isfile(node.plugin_args[keyword]):
577+
tmp_value = open(node.plugin_args[keyword]).read()
578+
if 'overwrite' in node.plugin_args and node.plugin_args['overwrite']:
579+
value = tmp_value
580+
else:
581+
value += tmp_value
582+
else:
583+
values += (value, )
584+
return values
566585

567-
def _submit_graph(self, pyfiles, dependencies):
586+
def _submit_graph(self, pyfiles, dependencies, nodes):
568587
"""
569588
pyfiles: list of files corresponding to a topological sort
570589
dependencies: dictionary of dependencies based on the toplogical sort

nipype/pipeline/plugins/dagman.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ def __init__(self, **kwargs):
4949
self._dagman_args = plugin_args['dagman_args']
5050
super(CondorDAGManPlugin, self).__init__(**kwargs)
5151

52-
def _submit_graph(self, pyfiles, dependencies):
52+
def _submit_graph(self, pyfiles, dependencies, nodes):
5353
# location of all scripts, place dagman output in here too
5454
batch_dir, _ = os.path.split(pyfiles[0])
5555
# DAG description filename
@@ -58,27 +58,30 @@ def _submit_graph(self, pyfiles, dependencies):
5858
# loop over all scripts, create submit files, and define them
5959
# as jobs in the DAG
6060
for idx, pyscript in enumerate(pyfiles):
61+
node = nodes[idx]
62+
template, submit_specs = self._get_args(
63+
node, ["template", "submit_specs"])
6164
# XXX redundant with previous value? or could it change between
6265
# scripts?
6366
batch_dir, name = os.path.split(pyscript)
6467
name = '.'.join(name.split('.')[:-1])
6568
submitspec = '\n'.join(
66-
(self._template,
67-
'executable = %s' % sys.executable,
68-
'arguments = %s' % pyscript,
69-
'output = %s' % os.path.join(batch_dir,
70-
'%s.out' % name),
71-
'error = %s' % os.path.join(batch_dir,
72-
'%s.err' % name),
73-
'log = %s' % os.path.join(batch_dir,
74-
'%s.log' % name),
75-
'getenv = True',
76-
self._submit_specs,
77-
'queue'
78-
))
69+
(template,
70+
'executable = %s' % sys.executable,
71+
'arguments = %s' % pyscript,
72+
'output = %s' % os.path.join(batch_dir,
73+
'%s.out' % name),
74+
'error = %s' % os.path.join(batch_dir,
75+
'%s.err' % name),
76+
'log = %s' % os.path.join(batch_dir,
77+
'%s.log' % name),
78+
'getenv = True',
79+
submit_specs,
80+
'queue'
81+
))
7982
# write submit spec for this job
8083
submitfile = os.path.join(batch_dir,
81-
'%s.submit' % name)
84+
'%s.submit' % name)
8285
with open(submitfile, 'wt') as submitfileprt:
8386
submitfileprt.writelines(submitspec)
8487
submitfileprt.close()
@@ -98,4 +101,3 @@ def _submit_graph(self, pyfiles, dependencies):
98101
self._dagman_args)
99102
cmd.run()
100103
logger.info('submitted all jobs to Condor DAGMan')
101-

nipype/pipeline/plugins/pbsgraph.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77
from .base import (GraphPluginBase, logger)
88

99
from ...interfaces.base import CommandLine
10+
from .sgegraph import SGEGraphPlugin
1011

1112

12-
class PBSGraphPlugin(GraphPluginBase):
13+
class PBSGraphPlugin(SGEGraphPlugin):
1314
"""Execute using PBS/Torque
1415
1516
The plugin_args input to run can be used to control the SGE execution.
@@ -20,31 +21,23 @@ class PBSGraphPlugin(GraphPluginBase):
2021
qsub call
2122
2223
"""
23-
24-
def __init__(self, **kwargs):
25-
self._template = """
24+
_template = """
2625
#PBS -V
27-
"""
28-
self._qsub_args = None
29-
if 'plugin_args' in kwargs:
30-
plugin_args = kwargs['plugin_args']
31-
if 'template' in plugin_args:
32-
self._template = plugin_args['template']
33-
if os.path.isfile(self._template):
34-
self._template = open(self._template).read()
35-
if 'qsub_args' in plugin_args:
36-
self._qsub_args = plugin_args['qsub_args']
37-
super(PBSGraphPlugin, self).__init__(**kwargs)
26+
"""
3827

39-
def _submit_graph(self, pyfiles, dependencies):
28+
def _submit_graph(self, pyfiles, dependencies, nodes):
4029
batch_dir, _ = os.path.split(pyfiles[0])
4130
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
4231
with open(submitjobsfile, 'wt') as fp:
4332
fp.writelines('#!/usr/bin/env sh\n')
4433
for idx, pyscript in enumerate(pyfiles):
34+
node = nodes[idx]
35+
template, qsub_args = self._get_args(
36+
node, ["template", "qsub_args"])
37+
4538
batch_dir, name = os.path.split(pyscript)
4639
name = '.'.join(name.split('.')[:-1])
47-
batchscript = '\n'.join((self._template,
40+
batchscript = '\n'.join((template,
4841
'%s %s' % (sys.executable, pyscript)))
4942
batchscriptfile = os.path.join(batch_dir,
5043
'batchscript_%s.sh' % name)
@@ -53,14 +46,14 @@ def _submit_graph(self, pyfiles, dependencies):
5346
batchfp.close()
5447
deps = ''
5548
if idx in dependencies:
56-
values = ['$job%05d' % jobid for jobid in dependencies[idx]]
49+
values = ['$job%05d' %
50+
jobid for jobid in dependencies[idx]]
5751
if len(values):
5852
deps = '-W depend=afterok:%s' % ':'.join(values)
5953
fp.writelines('job%05d=`qsub %s %s %s`\n' % (idx, deps,
60-
self._qsub_args,
54+
qsub_args,
6155
batchscriptfile))
6256
cmd = CommandLine('sh', environ=os.environ.data)
6357
cmd.inputs.args = '%s' % submitjobsfile
6458
cmd.run()
6559
logger.info('submitted all jobs to queue')
66-

nipype/pipeline/plugins/sgegraph.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ class SGEGraphPlugin(GraphPluginBase):
2020
qsub call
2121
2222
"""
23-
24-
def __init__(self, **kwargs):
25-
self._template = """
23+
_template = """
2624
#!/bin/bash
2725
#$ -V
2826
#$ -S /bin/bash
29-
"""
27+
"""
28+
29+
def __init__(self, **kwargs):
3030
self._qsub_args = ''
3131
if 'plugin_args' in kwargs:
3232
plugin_args = kwargs['plugin_args']
@@ -38,15 +38,19 @@ def __init__(self, **kwargs):
3838
self._qsub_args = plugin_args['qsub_args']
3939
super(SGEGraphPlugin, self).__init__(**kwargs)
4040

41-
def _submit_graph(self, pyfiles, dependencies):
41+
def _submit_graph(self, pyfiles, dependencies, nodes):
4242
batch_dir, _ = os.path.split(pyfiles[0])
4343
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
4444
with open(submitjobsfile, 'wt') as fp:
4545
fp.writelines('#!/usr/bin/env bash\n')
4646
for idx, pyscript in enumerate(pyfiles):
47+
node = nodes[idx]
48+
template, qsub_args = self._get_args(
49+
node, ["template", "qsub_args"])
50+
4751
batch_dir, name = os.path.split(pyscript)
4852
name = '.'.join(name.split('.')[:-1])
49-
batchscript = '\n'.join((self._template,
53+
batchscript = '\n'.join((template,
5054
'%s %s' % (sys.executable, pyscript)))
5155
batchscriptfile = os.path.join(batch_dir,
5256
'batchscript_%s.sh' % name)
@@ -65,22 +69,24 @@ def _submit_graph(self, pyfiles, dependencies):
6569
if 'job' in values:
6670
values = values.rstrip(',')
6771
deps = '-hold_jid%s' % values
68-
jobname = 'job%05d' % ( idx )
72+
jobname = 'job%05d' % (idx)
6973
## Do not use default output locations if they are set in self._qsub_args
7074
stderrFile = ''
7175
if self._qsub_args.count('-e ') == 0:
72-
stderrFile='-e {errFile}'.format(errFile=batchscripterrfile)
76+
stderrFile = '-e {errFile}'.format(
77+
errFile=batchscripterrfile)
7378
stdoutFile = ''
7479
if self._qsub_args.count('-o ') == 0:
75-
stdoutFile='-o {outFile}'.format(outFile=batchscriptoutfile)
80+
stdoutFile = '-o {outFile}'.format(
81+
outFile=batchscriptoutfile)
7682
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript})\n'.format(
77-
jobNm=jobname,
78-
outFileOption=stdoutFile,
79-
errFileOption=stderrFile,
80-
extraQSubArgs=self._qsub_args,
81-
dependantIndex=deps,
82-
batchscript=batchscriptfile)
83-
fp.writelines( full_line )
83+
jobNm=jobname,
84+
outFileOption=stdoutFile,
85+
errFileOption=stderrFile,
86+
extraQSubArgs=qsub_args,
87+
dependantIndex=deps,
88+
batchscript=batchscriptfile)
89+
fp.writelines(full_line)
8490

8591
cmd = CommandLine('bash', environ=os.environ.data)
8692
cmd.inputs.args = '%s' % submitjobsfile

nipype/pipeline/plugins/somaflow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def __init__(self, plugin_args=None):
2323
raise ImportError('SomaFlow could not be imported')
2424
super(SomaFlowPlugin, self).__init__(plugin_args=plugin_args)
2525

26-
def _submit_graph(self, pyfiles, dependencies):
26+
def _submit_graph(self, pyfiles, dependencies, nodes):
2727
jobs = []
2828
soma_deps = []
2929
for idx, fname in enumerate(pyfiles):

0 commit comments

Comments
 (0)