Skip to content

Commit 92635c0

Browse files
committed
Inintial work on SLURMGraph plugin
1 parent 3fa6f3e commit 92635c0

File tree

2 files changed

+162
-0
lines changed

2 files changed

+162
-0
lines changed

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@
1515
from .sgegraph import SGEGraphPlugin
1616
from .lsf import LSFPlugin
1717
from .slurm import SLURMPlugin
18+
from .slurmgraph import SLURMGraphPlugin

nipype/pipeline/plugins/slurmgraph.py

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Parallel workflow execution via SGE
2+
"""
3+
4+
import os
5+
import sys
6+
7+
from .base import (GraphPluginBase, logger)
8+
9+
from ...interfaces.base import CommandLine
10+
11+
12+
def node_completed_status( checknode):
13+
"""
14+
A function to determine if a node has previously completed it's work
15+
:param checknode: The node to check the run status
16+
:return: boolean value True indicates that the node does not need to be run.
17+
"""
18+
""" TODO: place this in the base.py file and refactor """
19+
node_state_does_not_require_overwrite = ( checknode.overwrite == False or
20+
(checknode.overwrite == None and
21+
not checknode._interface.always_run )
22+
)
23+
hash_exists = False
24+
try:
25+
hash_exists, _, _, _ = checknode.hash_exists()
26+
except Exception:
27+
hash_exists = False
28+
return (hash_exists and node_state_does_not_require_overwrite )
29+
30+
31+
class SLURMGraphPlugin(GraphPluginBase):
32+
"""Execute using SGE
33+
34+
The plugin_args input to run can be used to control the SGE execution.
35+
Currently supported options are:
36+
37+
- template : template to use for batch job submission
38+
- qsub_args : arguments to be prepended to the job execution script in the
39+
qsub call
40+
41+
"""
42+
_template = """
43+
#!/bin/bash
44+
"""
45+
46+
def __init__(self, **kwargs):
47+
self._qsub_args = ''
48+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
49+
if 'retry_timeout' in kwargs['plugin_args']:
50+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
51+
if 'max_tries' in kwargs['plugin_args']:
52+
self._max_tries = kwargs['plugin_args']['max_tries']
53+
if 'template' in kwargs['plugin_args']:
54+
self._template = kwargs['plugin_args']['template']
55+
if os.path.isfile(self._template):
56+
self._template = open(self._template).read()
57+
if 'sbatch_args' in kwargs['plugin_args']:
58+
self._sbatch_args = kwargs['plugin_args']['sbatch_args']
59+
if 'dont_resubmit_completed_jobs' in plugin_args:
60+
self._dont_resubmit_completed_jobs = plugin_args['dont_resubmit_completed_jobs']
61+
else:
62+
self._dont_resubmit_completed_jobs = False
63+
super(SGEGraphPlugin, self).__init__(**kwargs)
64+
65+
def _submit_graph(self, pyfiles, dependencies, nodes):
66+
def make_job_name(jobnumber, nodeslist):
67+
"""
68+
- jobnumber: The index number of the job to create
69+
- nodeslist: The name of the node being processed
70+
- return: A string representing this job to be displayed by SGE
71+
"""
72+
job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
73+
# Condition job_name to be a valid bash identifier (i.e. - is invalid)
74+
job_name=job_name.replace('-','_').replace('.','_').replace(':','_')
75+
return job_name
76+
batch_dir, _ = os.path.split(pyfiles[0])
77+
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
78+
79+
cache_doneness_per_node = dict()
80+
if self._dont_resubmit_completed_jobs: ## A future parameter for controlling this behavior could be added here
81+
for idx, pyscript in enumerate(pyfiles):
82+
node = nodes[idx]
83+
node_status_done = node_completed_status(node)
84+
## If a node has no dependencies, and it is requested to run_without_submitting
85+
## then run this node in place
86+
if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True):
87+
try:
88+
node.run()
89+
except Exception:
90+
node._clean_queue(idx, nodes)
91+
node_status_done = True # if successfully run locally, then claim true
92+
93+
#if the node itself claims done, then check to ensure all
94+
#dependancies are also done
95+
if node_status_done and idx in dependencies:
96+
for child_idx in dependencies[idx]:
97+
if child_idx in cache_doneness_per_node:
98+
child_status_done = cache_doneness_per_node[child_idx]
99+
else:
100+
child_status_done = node_completed_status(nodes[child_idx])
101+
node_status_done = node_status_done and child_status_done
102+
103+
cache_doneness_per_node[idx] = node_status_done
104+
105+
with open(submitjobsfile, 'wt') as fp:
106+
fp.writelines('#!/usr/bin/env bash\n')
107+
fp.writelines('# Condense format attempted\n')
108+
for idx, pyscript in enumerate(pyfiles):
109+
node = nodes[idx]
110+
if cache_doneness_per_node.get(idx,False):
111+
continue
112+
else:
113+
template, sbatch_args = self._get_args(
114+
node, ["template", "sbatch_args"])
115+
116+
batch_dir, name = os.path.split(pyscript)
117+
name = '.'.join(name.split('.')[:-1])
118+
batchscript = '\n'.join((template,
119+
'%s %s' % (sys.executable, pyscript)))
120+
batchscriptfile = os.path.join(batch_dir,
121+
'batchscript_%s.sh' % name)
122+
123+
batchscriptoutfile = batchscriptfile + '.o'
124+
batchscripterrfile = batchscriptfile + '.e'
125+
126+
with open(batchscriptfile, 'wt') as batchfp:
127+
batchfp.writelines(batchscript)
128+
batchfp.close()
129+
deps = ''
130+
if idx in dependencies:
131+
values = ' '
132+
for jobid in dependencies[idx]:
133+
## Avoid dependancies of done jobs
134+
if cache_doneness_per_node[jobid] == False:
135+
values += "${{{0}}}:".format(make_job_name(jobid, nodes))
136+
if values != ' ': # i.e. if some jobs were added to dependency list
137+
values = values.rstrip(',')
138+
deps = '--dependency=afterok:%s' % values
139+
jobname = make_job_name(idx, nodes)
140+
# Do not use default output locations if they are set in self._sbatch_args
141+
stderrFile = ''
142+
if self._sbatch_args.count('-e ') == 0:
143+
stderrFile = '-e {errFile}'.format(
144+
errFile=batchscripterrfile)
145+
stdoutFile = ''
146+
if self._sbatch_args.count('-o ') == 0:
147+
stdoutFile = '-o {outFile}'.format(
148+
outFile=batchscriptoutfile)
149+
full_line = '{jobNm}=$(sbatch {outFileOption} {errFileOption} {extraSBatchArgs} {dependantIndex} -J {jobNm} {batchscript} | awk \'{{print $4}}\')\n'.format(
150+
jobNm=jobname,
151+
outFileOption=stdoutFile,
152+
errFileOption=stderrFile,
153+
extraSBatchArgs=sbatch_args,
154+
dependantIndex=deps,
155+
batchscript=batchscriptfile)
156+
fp.writelines(full_line)
157+
cmd = CommandLine('bash', environ=os.environ.data,
158+
terminal_output='allatonce')
159+
cmd.inputs.args = '%s' % submitjobsfile
160+
cmd.run()
161+
logger.info('submitted all jobs to queue')

0 commit comments

Comments
 (0)