Skip to content

Commit 518a65c

Browse files
author
Chad Cumba
committed
Created slurm launcher for workflow plugins
This hasn't been well tested but it should work without too many issues. Of particular note are the environment variables that we're referencing. Those commands have the capability to fail with no error and need to be watched closely.
1 parent 1dec142 commit 518a65c

File tree

2 files changed

+161
-0
lines changed

2 files changed

+161
-0
lines changed

nipype/pipeline/plugins/base.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import numpy as np
1818
import scipy.sparse as ssp
1919

20+
2021
from ..utils import (nx, dfs_preorder)
2122
from ..engine import (MapNode, str2bool)
2223

@@ -591,3 +592,50 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
591592
dependencies: dictionary of dependencies based on the toplogical sort
592593
"""
593594
raise NotImplementedError
595+
596+
class SLURMLikeBatchManagerBase(SGELikeBatchManagerBase):
597+
"""Execute workflow with SLURM like batch system
598+
"""
599+
600+
def __init__(self, template, plugin_args=None):
601+
super(SLURMLikeBatchManagerBase, self).__init__(plugin_args=plugin_args)
602+
self._template = template
603+
self._sbatch_args = None
604+
if plugin_args:
605+
if 'template' in plugin_args:
606+
self._template = plugin_args['template']
607+
if os.path.isfile(self._template):
608+
self._template = open(self._template).read()
609+
if 'sbatch' in plugin_args:
610+
self._sbatch_args = plugin_args['sbatch_args']
611+
self._pending = {}
612+
613+
def _get_result(self, taskid):
614+
if taskid not in self._pending:
615+
raise Exception('Task %d not found' % taskid)
616+
if self._is_pending(taskid):
617+
return None
618+
node_dir = self._pending[taskid]
619+
620+
621+
logger.debug(os.listdir(os.path.realpath(os.path.join(node_dir,
622+
'..'))))
623+
logger.debug(os.listdir(node_dir))
624+
glob(os.path.join(node_dir, 'result_*.pklz')).pop()
625+
626+
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
627+
result_data = loadpkl(results_file)
628+
result_out = dict(result=None, traceback=None)
629+
630+
if isinstance(result_data, dict):
631+
result_out['result'] = result_data['result']
632+
result_out['traceback'] = result_data['traceback']
633+
result_out['hostname'] = result_data['hostname']
634+
if results_file:
635+
crash_file = os.path.join(node_dir, 'crashstore.pklz')
636+
os.rename(results_file, crash_file)
637+
else:
638+
result_out['result'] = result_data
639+
640+
return result_out
641+

nipype/pipeline/plugins/slurm.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
'''
2+
Created on Aug 2, 2013
3+
4+
@author: chadcumba
5+
6+
Parallel workflow execution with SLURM
7+
'''
8+
9+
import os
10+
import re
11+
import subprocess
12+
from time import sleep
13+
14+
from .base import (SLURMLikeBatchManagerBase, logger, iflogger, logging)
15+
16+
from nipype.interfaces.base import CommandLine
17+
18+
19+
20+
21+
class SLURMPlugin(SLURMLikeBatchManagerBase):
22+
'''
23+
Execute using SLURM
24+
25+
The plugin_args input to run can be used to control the SLURM execution.
26+
Currently supported options are:
27+
28+
- template : template to use for batch job submission
29+
30+
31+
'''
32+
33+
34+
def __init__(self, **kwargs):
35+
'''
36+
Constructor
37+
'''
38+
template="""
39+
#!/bin/sh
40+
"""
41+
self._retry_timeout = 2
42+
self._max_tries = 2
43+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
44+
if 'retry_timeout' in kwargs['plugin_args']:
45+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
46+
if 'max_tries' in kwargs['plugin_args']:
47+
self._max_tries = kwargs['plugin_args']['max_tries']
48+
super(SLURMPlugin, self).__init__(template, **kwargs)
49+
50+
def _is_pending(self, taskid):
51+
# subprocess.Popen requires taskid to be a string
52+
proc = subprocess.Popen(["showq", '-u'],
53+
stdout=subprocess.PIPE,
54+
stderr=subprocess.PIPE)
55+
o, _ = proc.communicate()
56+
return o.find(taskid) > -1
57+
58+
def _submit_batchtask(self, scriptfile, node):
59+
cmd = CommandLine('sbatch', environ=os.environ.data,
60+
terminal_output='allatonce')
61+
path = os.path.dirname(scriptfile)
62+
63+
slurmargs = ''
64+
if self._slurmargs:
65+
slurmargs = self._slurm_args
66+
if 'slurm_args' in node.plugin_args:
67+
if 'overwrite' in node.plugin_args and\
68+
node.plugin_args['overwrite']:
69+
slurmargs = node.plugin_args['slurm_args']
70+
else:
71+
slurmargs += (" " + node.plugin_args['slurm_args'])
72+
if '-o' not in slurmargs:
73+
slurmargs = '%s -o %s' % (slurmargs, path)
74+
if '-e' not in slurmargs:
75+
slurmargs = '%s -e %s' % (slurmargs, path)
76+
if node._hierarchy:
77+
jobname = '.'.join((os.environ.data['LOGNAME'],
78+
node._hierarchy,
79+
node._id))
80+
else:
81+
jobname = '.'.join((os.environ.data['LOGNAME'],
82+
node._id))
83+
jobnameitems = jobname.split('.')
84+
jobnameitems.reverse()
85+
jobname = '.'.join(jobnameitems)
86+
cmd.inputs.args = '%s -J %s %s' % (slurmargs,
87+
jobname,
88+
scriptfile)
89+
oldlevel = iflogger.level
90+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
91+
tries = 0
92+
while True:
93+
try:
94+
result = cmd.run()
95+
except Exception, e:
96+
if tries < self._max_tries:
97+
tries += 1
98+
sleep(self._retry_timeout) # sleep 2 seconds and try again.
99+
else:
100+
iflogger.setLevel(oldlevel)
101+
raise RuntimeError('\n'.join((('Could not submit slurm task'
102+
' for node %s') % node._id,
103+
str(e))))
104+
else:
105+
break
106+
iflogger.setLevel(oldlevel)
107+
# retrieve sge taskid
108+
lines = [line for line in result.runtime.stdout.split('\n') if line]
109+
taskid = int(re.match("Your job ([0-9]*) .* has been submitted",
110+
lines[-1]).groups()[0])
111+
self._pending[taskid] = node.output_dir()
112+
logger.debug('submitted slurm task: %d for node %s' % (taskid, node._id))
113+
return taskid

0 commit comments

Comments
 (0)