Skip to content

Commit 4346f29

Browse files
committed
Merge branch 'enh/slurmgraph'
* enh/slurmgraph: extra condition ported from SLURMGraph calling non existing variable Remove run_without_submitting code updated changelog Improved docs. space! fixed whitespace in the dependencies fixed template fixed caching remove SGE remnants fixed copypaste bug Inintial work on SLURMGraph plugin
2 parents b72578d + 9cfb703 commit 4346f29

File tree

5 files changed

+183
-17
lines changed

5 files changed

+183
-17
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Next release
44
* ENH: Added interface to simulate DWIs using the multi-tensor model
55
(https://github.com/nipy/nipype/pull/1085)
66
* ENH: New interface for FSL fslcpgeom utility (https://github.com/nipy/nipype/pull/1152)
7+
* ENH: Added SLURMGraph plugin for submitting jobs to SLURM with dependencies (https://github.com/nipy/nipype/pull/1136)
78
* FIX: Enable absolute path definitions in DCMStack (https://github.com/nipy/nipype/pull/1089,
89
replaced by https://github.com/nipy/nipype/pull/1093)
910
* ENH: New mesh.MeshWarpMaths to operate on surface-defined warpings

doc/users/plugins.rst

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,11 @@ particular node might use more resources than other nodes in a workflow.
139139
140140
SGEGraph
141141
~~~~~~~~
142-
SGEGraph_ is a exuction plugin working with Sun Grid Engine that allows for
142+
SGEGraph_ is an execution plugin working with Sun Grid Engine that allows for
143143
submitting entire graph of dependent jobs at once. This way Nipype does not
144-
need to run a monitoring process - SGE takes care of this.
144+
need to run a monitoring process - SGE takes care of this. The use of SGEGraph_
145+
is preferred over SGE_ since the latter adds unnecessary load on the submit
146+
machine.
145147

146148
.. note::
147149

@@ -175,6 +177,26 @@ Optional arguments::
175177

176178
template: custom template file to use
177179
sbatch_args: any other command line args to be passed to bsub.
180+
181+
182+
SLURMGraph
183+
~~~~~~~~~~
184+
SLURMGraph_ is an execution plugin working with SLURM that allows for
185+
submitting entire graph of dependent jobs at once. This way Nipype does not
186+
need to run a monitoring process - SLURM takes care of this. The use of SLURMGraph_
187+
plugin is preferred over the vanilla SLURM_ plugin since the latter adds
188+
unnecessary load on the submit machine.
189+
190+
191+
.. note::
192+
193+
When rerunning unfinished workflows using SLURMGraph you may decide not to
194+
submit jobs for Nodes that previously finished running. This can speed up
195+
execution, but new or modified inputs that would previously trigger a Node
196+
to rerun will be ignored. The following option turns on this functionality::
197+
198+
workflow.run(plugin='SLURMGraph', plugin_args = {'dont_resubmit_completed_jobs': True})
199+
178200

179201
HTCondor
180202
--------
@@ -183,12 +205,12 @@ DAGMan
183205
~~~~~~
184206

185207
With its DAGMan_ component HTCondor_ (previously Condor) allows for submitting
186-
entire graphs of dependent jobs at once (similar to SGEGraph_). With the ``CondorDAGMan`` plug-in
187-
Nipype can utilize this functionality to submit complete workflows directly and
188-
in a single step. Consequently, and in contrast to other plug-ins, workflow
189-
execution returns almost instantaneously -- Nipype is only used to generate the
190-
workflow graph, while job scheduling and dependency resolution are entirely
191-
managed by HTCondor_.
208+
entire graphs of dependent jobs at once (similar to SGEGraph_ and SLURMGaaoh_).
209+
With the ``CondorDAGMan`` plug-in Nipype can utilize this functionality to
210+
submit complete workflows directly and in a single step. Consequently, and
211+
in contrast to other plug-ins, workflow execution returns almost
212+
instantaneously -- Nipype is only used to generate the workflow graph,
213+
while job scheduling and dependency resolution are entirely managed by HTCondor_.
192214

193215
Please note that although DAGMan_ supports specification of data dependencies
194216
as well as data provisioning on compute nodes this functionality is currently

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
from .sgegraph import SGEGraphPlugin
1616
from .lsf import LSFPlugin
1717
from .slurm import SLURMPlugin
18+
from .slurmgraph import SLURMGraphPlugin

nipype/pipeline/plugins/sgegraph.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,14 +80,6 @@ def make_job_name(jobnumber, nodeslist):
8080
for idx, pyscript in enumerate(pyfiles):
8181
node = nodes[idx]
8282
node_status_done = node_completed_status(node)
83-
## If a node has no dependencies, and it is requested to run_without_submitting
84-
## then run this node in place
85-
if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True):
86-
try:
87-
node.run()
88-
except Exception:
89-
node._clean_queue(idx, nodes)
90-
node_status_done = True # if successfully run locally, then claim true
9183

9284
#if the node itself claims done, then check to ensure all
9385
#dependancies are also done
@@ -130,7 +122,7 @@ def make_job_name(jobnumber, nodeslist):
130122
values = ' '
131123
for jobid in dependencies[idx]:
132124
## Avoid dependancies of done jobs
133-
if cache_doneness_per_node[jobid] == False:
125+
if not self._dont_resubmit_completed_jobs or cache_doneness_per_node[jobid] == False:
134126
values += "${{{0}}},".format(make_job_name(jobid, nodes))
135127
if values != ' ': # i.e. if some jobs were added to dependency list
136128
values = values.rstrip(',')

nipype/pipeline/plugins/slurmgraph.py

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
"""Parallel workflow execution via SLURM
2+
"""
3+
4+
import os
5+
import sys
6+
7+
from .base import (GraphPluginBase, logger)
8+
9+
from ...interfaces.base import CommandLine
10+
11+
12+
def node_completed_status( checknode):
13+
"""
14+
A function to determine if a node has previously completed it's work
15+
:param checknode: The node to check the run status
16+
:return: boolean value True indicates that the node does not need to be run.
17+
"""
18+
""" TODO: place this in the base.py file and refactor """
19+
node_state_does_not_require_overwrite = ( checknode.overwrite == False or
20+
(checknode.overwrite == None and
21+
not checknode._interface.always_run )
22+
)
23+
hash_exists = False
24+
try:
25+
hash_exists, _, _, _ = checknode.hash_exists()
26+
except Exception:
27+
hash_exists = False
28+
return (hash_exists and node_state_does_not_require_overwrite )
29+
30+
31+
class SLURMGraphPlugin(GraphPluginBase):
32+
"""Execute using SLURM
33+
34+
The plugin_args input to run can be used to control the SGE execution.
35+
Currently supported options are:
36+
37+
- template : template to use for batch job submission
38+
- qsub_args : arguments to be prepended to the job execution script in the
39+
qsub call
40+
41+
"""
42+
_template="#!/bin/bash"
43+
44+
def __init__(self, **kwargs):
45+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
46+
if 'retry_timeout' in kwargs['plugin_args']:
47+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
48+
if 'max_tries' in kwargs['plugin_args']:
49+
self._max_tries = kwargs['plugin_args']['max_tries']
50+
if 'template' in kwargs['plugin_args']:
51+
self._template = kwargs['plugin_args']['template']
52+
if os.path.isfile(self._template):
53+
self._template = open(self._template).read()
54+
if 'sbatch_args' in kwargs['plugin_args']:
55+
self._sbatch_args = kwargs['plugin_args']['sbatch_args']
56+
if 'dont_resubmit_completed_jobs' in kwargs['plugin_args']:
57+
self._dont_resubmit_completed_jobs = kwargs['plugin_args']['dont_resubmit_completed_jobs']
58+
else:
59+
self._dont_resubmit_completed_jobs = False
60+
super(SLURMGraphPlugin, self).__init__(**kwargs)
61+
62+
def _submit_graph(self, pyfiles, dependencies, nodes):
63+
def make_job_name(jobnumber, nodeslist):
64+
"""
65+
- jobnumber: The index number of the job to create
66+
- nodeslist: The name of the node being processed
67+
- return: A string representing this job to be displayed by SLURM
68+
"""
69+
job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
70+
# Condition job_name to be a valid bash identifier (i.e. - is invalid)
71+
job_name=job_name.replace('-','_').replace('.','_').replace(':','_')
72+
return job_name
73+
batch_dir, _ = os.path.split(pyfiles[0])
74+
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
75+
76+
cache_doneness_per_node = dict()
77+
if self._dont_resubmit_completed_jobs: ## A future parameter for controlling this behavior could be added here
78+
for idx, pyscript in enumerate(pyfiles):
79+
node = nodes[idx]
80+
node_status_done = node_completed_status(node)
81+
82+
#if the node itself claims done, then check to ensure all
83+
#dependancies are also done
84+
if node_status_done and idx in dependencies:
85+
for child_idx in dependencies[idx]:
86+
if child_idx in cache_doneness_per_node:
87+
child_status_done = cache_doneness_per_node[child_idx]
88+
else:
89+
child_status_done = node_completed_status(nodes[child_idx])
90+
node_status_done = node_status_done and child_status_done
91+
92+
cache_doneness_per_node[idx] = node_status_done
93+
94+
with open(submitjobsfile, 'wt') as fp:
95+
fp.writelines('#!/usr/bin/env bash\n')
96+
fp.writelines('# Condense format attempted\n')
97+
for idx, pyscript in enumerate(pyfiles):
98+
node = nodes[idx]
99+
if cache_doneness_per_node.get(idx,False):
100+
continue
101+
else:
102+
template, sbatch_args = self._get_args(
103+
node, ["template", "sbatch_args"])
104+
105+
batch_dir, name = os.path.split(pyscript)
106+
name = '.'.join(name.split('.')[:-1])
107+
batchscript = '\n'.join((template,
108+
'%s %s' % (sys.executable, pyscript)))
109+
batchscriptfile = os.path.join(batch_dir,
110+
'batchscript_%s.sh' % name)
111+
112+
batchscriptoutfile = batchscriptfile + '.o'
113+
batchscripterrfile = batchscriptfile + '.e'
114+
115+
with open(batchscriptfile, 'wt') as batchfp:
116+
batchfp.writelines(batchscript)
117+
batchfp.close()
118+
deps = ''
119+
if idx in dependencies:
120+
values = ''
121+
for jobid in dependencies[idx]:
122+
## Avoid dependancies of done jobs
123+
if not self._dont_resubmit_completed_jobs or cache_doneness_per_node[jobid] == False:
124+
values += "${{{0}}}:".format(make_job_name(jobid, nodes))
125+
if values != '': # i.e. if some jobs were added to dependency list
126+
values = values.rstrip(':')
127+
deps = '--dependency=afterok:%s' % values
128+
jobname = make_job_name(idx, nodes)
129+
# Do not use default output locations if they are set in self._sbatch_args
130+
stderrFile = ''
131+
if self._sbatch_args.count('-e ') == 0:
132+
stderrFile = '-e {errFile}'.format(
133+
errFile=batchscripterrfile)
134+
stdoutFile = ''
135+
if self._sbatch_args.count('-o ') == 0:
136+
stdoutFile = '-o {outFile}'.format(
137+
outFile=batchscriptoutfile)
138+
full_line = '{jobNm}=$(sbatch {outFileOption} {errFileOption} {extraSBatchArgs} {dependantIndex} -J {jobNm} {batchscript} | awk \'{{print $4}}\')\n'.format(
139+
jobNm=jobname,
140+
outFileOption=stdoutFile,
141+
errFileOption=stderrFile,
142+
extraSBatchArgs=sbatch_args,
143+
dependantIndex=deps,
144+
batchscript=batchscriptfile)
145+
fp.writelines(full_line)
146+
cmd = CommandLine('bash', environ=os.environ.data,
147+
terminal_output='allatonce')
148+
cmd.inputs.args = '%s' % submitjobsfile
149+
cmd.run()
150+
logger.info('submitted all jobs to queue')

0 commit comments

Comments
 (0)