Skip to content

Commit 34c6674

Browse files
committed
ENH: Add support for reduced SGEgraph on re-run
During the re-running of a workflow with SGEgraph, we can greatly reduce the dependancies by determining which nodes have completed prior to generating the submit-graph. Job numbers need to be based on job ID's not on job names. Job names are not unique. BUG: Job names must be valid bash variables.
1 parent f7558b9 commit 34c6674

File tree

1 file changed

+97
-43
lines changed

1 file changed

+97
-43
lines changed

nipype/pipeline/plugins/sgegraph.py

Lines changed: 97 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,25 @@
99
from ...interfaces.base import CommandLine
1010

1111

12+
def node_completed_status( checknode):
13+
"""
14+
A function to determine if a node has previously completed it's work
15+
:param checknode: The node to check the run status
16+
:return: boolean value True indicates that the node does not need to be run.
17+
"""
18+
""" TODO: place this in the base.py file and refactor """
19+
node_state_does_not_require_overwrite = ( checknode.overwrite == False or
20+
(checknode.overwrite == None and
21+
not checknode._interface.always_run )
22+
)
23+
hash_exists = False
24+
try:
25+
hash_exists, _, _, _ = checknode.hash_exists()
26+
except Exception:
27+
hash_exists = False
28+
return (hash_exists and node_state_does_not_require_overwrite )
29+
30+
1231
class SGEGraphPlugin(GraphPluginBase):
1332
"""Execute using SGE
1433
@@ -45,56 +64,91 @@ def make_job_name(jobnumber, nodeslist):
4564
- nodeslist: The name of the node being processed
4665
- return: A string representing this job to be displayed by SGE
4766
"""
48-
return 'j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
67+
job_name='j{0}_{1}'.format(jobnumber, nodeslist[jobnumber]._id)
68+
# Condition job_name to be a valid bash identifier (i.e. - is invalid)
69+
job_name=job_name.replace('-','_').replace('.','_').replace(':','_')
70+
return job_name
4971
batch_dir, _ = os.path.split(pyfiles[0])
5072
submitjobsfile = os.path.join(batch_dir, 'submit_jobs.sh')
73+
74+
cache_doneness_per_node = dict()
75+
if True: ## A future parameter for controlling this behavior could be added here
76+
for idx, pyscript in enumerate(pyfiles):
77+
node = nodes[idx]
78+
node_status_done = node_completed_status(node)
79+
## If a node has no dependencies, and it is requested to run_without_submitting
80+
## then run this node in place
81+
if (not node_status_done) and (len(dependencies[idx]) == 0 ) and (node.run_without_submitting == True):
82+
try:
83+
node.run()
84+
except Exception:
85+
node._clean_queue(idx, nodes)
86+
node_status_done = True # if successfully run locally, then claim true
87+
88+
#if the node itself claims done, then check to ensure all
89+
#dependancies are also done
90+
if node_status_done and idx in dependencies:
91+
for child_idx in dependencies[idx]:
92+
if child_idx in cache_doneness_per_node:
93+
child_status_done = cache_doneness_per_node[child_idx]
94+
else:
95+
child_status_done = node_completed_status(nodes[child_idx])
96+
node_status_done = node_status_done and child_status_done
97+
98+
cache_doneness_per_node[idx] = node_status_done
99+
51100
with open(submitjobsfile, 'wt') as fp:
52101
fp.writelines('#!/usr/bin/env bash\n')
102+
fp.writelines('# Condense format attempted\n')
53103
for idx, pyscript in enumerate(pyfiles):
54104
node = nodes[idx]
55-
template, qsub_args = self._get_args(
56-
node, ["template", "qsub_args"])
57-
58-
batch_dir, name = os.path.split(pyscript)
59-
name = '.'.join(name.split('.')[:-1])
60-
batchscript = '\n'.join((template,
61-
'%s %s' % (sys.executable, pyscript)))
62-
batchscriptfile = os.path.join(batch_dir,
63-
'batchscript_%s.sh' % name)
64-
65-
batchscriptoutfile = batchscriptfile + '.o'
66-
batchscripterrfile = batchscriptfile + '.e'
67-
68-
with open(batchscriptfile, 'wt') as batchfp:
69-
batchfp.writelines(batchscript)
70-
batchfp.close()
71-
deps = ''
72-
if idx in dependencies:
73-
values = ' '
74-
for jobid in dependencies[idx]:
75-
values += make_job_name(jobid, nodes)
76-
if values != ' ': # i.e. if some jobs were added to dependency list
77-
values = values.rstrip(',')
78-
deps = '-hold_jid%s' % values
79-
jobname = make_job_name(idx, nodes)
80-
# Do not use default output locations if they are set in self._qsub_args
81-
stderrFile = ''
82-
if self._qsub_args.count('-e ') == 0:
83-
stderrFile = '-e {errFile}'.format(
84-
errFile=batchscripterrfile)
85-
stdoutFile = ''
86-
if self._qsub_args.count('-o ') == 0:
87-
stdoutFile = '-o {outFile}'.format(
88-
outFile=batchscriptoutfile)
89-
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript} | awk \'{{print $3}}\')\n'.format(
90-
jobNm=jobname,
91-
outFileOption=stdoutFile,
92-
errFileOption=stderrFile,
93-
extraQSubArgs=qsub_args,
94-
dependantIndex=deps,
95-
batchscript=batchscriptfile)
96-
fp.writelines(full_line)
105+
if cache_doneness_per_node.get(idx,False):
106+
continue
107+
else:
108+
template, qsub_args = self._get_args(
109+
node, ["template", "qsub_args"])
110+
111+
batch_dir, name = os.path.split(pyscript)
112+
name = '.'.join(name.split('.')[:-1])
113+
batchscript = '\n'.join((template,
114+
'%s %s' % (sys.executable, pyscript)))
115+
batchscriptfile = os.path.join(batch_dir,
116+
'batchscript_%s.sh' % name)
117+
118+
batchscriptoutfile = batchscriptfile + '.o'
119+
batchscripterrfile = batchscriptfile + '.e'
97120

121+
with open(batchscriptfile, 'wt') as batchfp:
122+
batchfp.writelines(batchscript)
123+
batchfp.close()
124+
deps = ''
125+
if idx in dependencies:
126+
values = ' '
127+
for jobid in dependencies[idx]:
128+
## Avoid dependancies of done jobs
129+
if cache_doneness_per_node[jobid] == False:
130+
values += "${{{0}}},".format(make_job_name(jobid, nodes))
131+
if values != ' ': # i.e. if some jobs were added to dependency list
132+
values = values.rstrip(',')
133+
deps = '-hold_jid%s' % values
134+
jobname = make_job_name(idx, nodes)
135+
# Do not use default output locations if they are set in self._qsub_args
136+
stderrFile = ''
137+
if self._qsub_args.count('-e ') == 0:
138+
stderrFile = '-e {errFile}'.format(
139+
errFile=batchscripterrfile)
140+
stdoutFile = ''
141+
if self._qsub_args.count('-o ') == 0:
142+
stdoutFile = '-o {outFile}'.format(
143+
outFile=batchscriptoutfile)
144+
full_line = '{jobNm}=$(qsub {outFileOption} {errFileOption} {extraQSubArgs} {dependantIndex} -N {jobNm} {batchscript} | awk \'{{print $3}}\')\n'.format(
145+
jobNm=jobname,
146+
outFileOption=stdoutFile,
147+
errFileOption=stderrFile,
148+
extraQSubArgs=qsub_args,
149+
dependantIndex=deps,
150+
batchscript=batchscriptfile)
151+
fp.writelines(full_line)
98152
cmd = CommandLine('bash', environ=os.environ.data,
99153
terminal_output='allatonce')
100154
cmd.inputs.args = '%s' % submitjobsfile

0 commit comments

Comments
 (0)