Skip to content

Commit 26c72ac

Browse files
authored
Merge pull request #1582 from satra/fix/slurm_variant
fix: slurm job submission and result checking
2 parents 304ffa4 + 00f07b3 commit 26c72ac

File tree

3 files changed

+21
-16
lines changed

3 files changed

+21
-16
lines changed

doc/users/plugins.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ Optional arguments::
178178

179179
template: custom template file to use
180180
sbatch_args: any other command line args to be passed to bsub.
181+
jobid_re: regular expression for custom job submission id search
181182

182183

183184
SLURMGraph

nipype/pipeline/engine/nodes.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@
5757
from ...interfaces.base import (traits, InputMultiPath, CommandLine,
5858
Undefined, TraitedSpec, DynamicTraitedSpec,
5959
Bunch, InterfaceResult, md5, Interface,
60-
TraitDictObject, TraitListObject, isdefined)
60+
TraitDictObject, TraitListObject, isdefined,
61+
runtime_profile)
6162
from ...utils.misc import (getsource, create_function_from_source,
6263
flatten, unflatten)
6364
from ...utils.filemanip import (save_json, FileNotFoundError,
@@ -743,11 +744,12 @@ def write_report(self, report_type=None, cwd=None):
743744
rst_dict = {'hostname' : self.result.runtime.hostname,
744745
'duration' : self.result.runtime.duration}
745746
# Try and insert memory/threads usage if available
746-
try:
747-
rst_dict['runtime_memory_gb'] = self.result.runtime.runtime_memory_gb
748-
rst_dict['runtime_threads'] = self.result.runtime.runtime_threads
749-
except AttributeError:
750-
logger.info('Runtime memory and threads stats unavailable')
747+
if runtime_profile:
748+
try:
749+
rst_dict['runtime_memory_gb'] = self.result.runtime.runtime_memory_gb
750+
rst_dict['runtime_threads'] = self.result.runtime.runtime_threads
751+
except AttributeError:
752+
logger.info('Runtime memory and threads stats unavailable')
751753
if hasattr(self.result.runtime, 'cmdline'):
752754
rst_dict['command'] = self.result.runtime.cmdline
753755
fp.writelines(write_rst_dict(rst_dict))

nipype/pipeline/plugins/slurm.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
1515

16-
from nipype.interfaces.base import CommandLine
16+
from ...interfaces.base import CommandLine
1717

1818

1919
class SLURMPlugin(SGELikeBatchManagerBase):
@@ -38,12 +38,15 @@ def __init__(self, **kwargs):
3838
self._max_tries = 2
3939
self._template = template
4040
self._sbatch_args = None
41+
self._jobid_re = "Submitted batch job ([0-9]*)"
4142

4243
if 'plugin_args' in kwargs and kwargs['plugin_args']:
4344
if 'retry_timeout' in kwargs['plugin_args']:
4445
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
4546
if 'max_tries' in kwargs['plugin_args']:
4647
self._max_tries = kwargs['plugin_args']['max_tries']
48+
if 'jobid_re' in kwargs['plugin_args']:
49+
self._jobid_re = kwargs['plugin_args']['jobid_re']
4750
if 'template' in kwargs['plugin_args']:
4851
self._template = kwargs['plugin_args']['template']
4952
if os.path.isfile(self._template):
@@ -55,17 +58,16 @@ def __init__(self, **kwargs):
5558

5659
def _is_pending(self, taskid):
5760
# subprocess.Popen requires taskid to be a string
58-
proc = subprocess.Popen(["squeue", '-j', '%s' % taskid],
59-
stdout=subprocess.PIPE,
60-
stderr=subprocess.PIPE)
61-
o, _ = proc.communicate()
62-
63-
return o.find(str(taskid)) > -1
61+
res = CommandLine('squeue',
62+
args=' '.join(['-j', '%s' % taskid]),
63+
terminal_output='allatonce').run()
64+
return res.runtime.stdout.find(str(taskid)) > -1
6465

6566
def _submit_batchtask(self, scriptfile, node):
6667
"""
67-
This is more or less the _submit_batchtask from sge.py with flipped variable
68-
names, different command line switches, and different output formatting/processing
68+
This is more or less the _submit_batchtask from sge.py with flipped
69+
variable names, different command line switches, and different output
70+
formatting/processing
6971
"""
7072
cmd = CommandLine('sbatch', environ=dict(os.environ),
7173
terminal_output='allatonce')
@@ -118,7 +120,7 @@ def _submit_batchtask(self, scriptfile, node):
118120
iflogger.setLevel(oldlevel)
119121
# retrieve taskid
120122
lines = [line for line in result.runtime.stdout.split('\n') if line]
121-
taskid = int(re.match("Submitted batch job ([0-9]*)",
123+
taskid = int(re.match(self._jobid_re,
122124
lines[-1]).groups()[0])
123125
self._pending[taskid] = node.output_dir()
124126
logger.debug('submitted sbatch task: %d for node %s' % (taskid, node._id))

0 commit comments

Comments
 (0)