Skip to content

Commit a7fd06c

Browse files
committed
Merge branch 'pr/629'
Conflicts: CHANGES nipype/workflows/fmri/fsl/preprocess.py
2 parents 832b5f3 + 4016e01 commit a7fd06c

File tree

6 files changed

+174
-1
lines changed

6 files changed

+174
-1
lines changed

CHANGES

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Next release
55
* ENH: New interfaces: spm.ResliceToReference, FuzzyOverlap, afni.AFNItoNIFTI
66
spm.DicomImport, P2PDistance
77
* ENH: W3C PROV support with optional RDF export built into Nipype
8+
* ENH: Added support for Simple Linux Utility Resource Management (SLURM)
89

910
* ENH: Several new interfaces related to Camino were added:
1011
- camino.SFPICOCalibData

doc/users/plugins.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ available plugins allow local and distributed execution of workflows and
99
debugging. Each available plugin is described below.
1010

1111
Current plugins are available for Linear, Multiprocessing, IPython_ distributed
12-
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, and LSF_. We
12+
processing platforms and for direct processing on SGE_, PBS_, HTCondor_, LSF_, and SLURM_. We
1313
anticipate future plugins for the Soma_ workflow.
1414

1515
.. note::
@@ -270,3 +270,5 @@ Optional arguments::
270270
.. _DAGMan: http://research.cs.wisc.edu/htcondor/dagman/dagman.html
271271
.. _HTCondor documentation: http://research.cs.wisc.edu/htcondor/manual
272272
.. _DMTCP: http://dmtcp.sourceforge.net
273+
.. _SLURM: http://slurm.schedmd.com/
274+

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,4 @@
1414
from .pbsgraph import PBSGraphPlugin
1515
from .sgegraph import SGEGraphPlugin
1616
from .lsf import LSFPlugin
17+
from .slurm import SLURMPlugin

nipype/pipeline/plugins/base.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import numpy as np
1818
import scipy.sparse as ssp
1919

20+
2021
from ..utils import (nx, dfs_preorder)
2122
from ..engine import (MapNode, str2bool)
2223

@@ -597,3 +598,35 @@ def _submit_graph(self, pyfiles, dependencies, nodes):
597598
dependencies: dictionary of dependencies based on the toplogical sort
598599
"""
599600
raise NotImplementedError
601+
602+
603+
604+
def _get_result(self, taskid):
605+
if taskid not in self._pending:
606+
raise Exception('Task %d not found' % taskid)
607+
if self._is_pending(taskid):
608+
return None
609+
node_dir = self._pending[taskid]
610+
611+
612+
logger.debug(os.listdir(os.path.realpath(os.path.join(node_dir,
613+
'..'))))
614+
logger.debug(os.listdir(node_dir))
615+
glob(os.path.join(node_dir, 'result_*.pklz')).pop()
616+
617+
results_file = glob(os.path.join(node_dir, 'result_*.pklz'))[0]
618+
result_data = loadpkl(results_file)
619+
result_out = dict(result=None, traceback=None)
620+
621+
if isinstance(result_data, dict):
622+
result_out['result'] = result_data['result']
623+
result_out['traceback'] = result_data['traceback']
624+
result_out['hostname'] = result_data['hostname']
625+
if results_file:
626+
crash_file = os.path.join(node_dir, 'crashstore.pklz')
627+
os.rename(results_file, crash_file)
628+
else:
629+
result_out['result'] = result_data
630+
631+
return result_out
632+

nipype/pipeline/plugins/slurm.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
'''
2+
Created on Aug 2, 2013
3+
4+
@author: chadcumba
5+
6+
Parallel workflow execution with SLURM
7+
'''
8+
9+
import os
10+
import re
11+
import subprocess
12+
from time import sleep
13+
14+
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
15+
16+
from nipype.interfaces.base import CommandLine
17+
18+
19+
20+
21+
class SLURMPlugin(SGELikeBatchManagerBase):
22+
'''
23+
Execute using SLURM
24+
25+
The plugin_args input to run can be used to control the SLURM execution.
26+
Currently supported options are:
27+
28+
- template : template to use for batch job submission
29+
30+
- sbatch_args: arguments to pass prepend to the sbatch call
31+
32+
33+
'''
34+
35+
36+
def __init__(self, **kwargs):
37+
38+
template="#!/bin/bash"
39+
40+
self._retry_timeout = 2
41+
self._max_tries = 2
42+
self._template = template
43+
self._sbatch_args = None
44+
45+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
46+
if 'retry_timeout' in kwargs['plugin_args']:
47+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
48+
if 'max_tries' in kwargs['plugin_args']:
49+
self._max_tries = kwargs['plugin_args']['max_tries']
50+
if 'template' in kwargs['plugin_args']:
51+
self._template = kwargs['plugin_args']['template']
52+
if os.path.isfile(self._template):
53+
self._template = open(self._template).read()
54+
if 'sbatch_args' in kwargs['plugin_args']:
55+
self._sbatch_args = kwargs['plugin_args']['sbatch_args']
56+
self._pending = {}
57+
super(SLURMPlugin, self).__init__(template, **kwargs)
58+
59+
def _is_pending(self, taskid):
60+
# subprocess.Popen requires taskid to be a string
61+
proc = subprocess.Popen(["showq", '-u'],
62+
stdout=subprocess.PIPE,
63+
stderr=subprocess.PIPE)
64+
o, _ = proc.communicate()
65+
66+
return o.find(str(taskid)) > -1
67+
68+
def _submit_batchtask(self, scriptfile, node):
69+
"""
70+
This is more or less the _submit_batchtask from sge.py with flipped variable
71+
names, different command line switches, and different output formatting/processing
72+
"""
73+
cmd = CommandLine('sbatch', environ=os.environ.data,
74+
terminal_output='allatonce')
75+
path = os.path.dirname(scriptfile)
76+
77+
sbatch_args = ''
78+
if self._sbatch_args:
79+
sbatch_args = self._sbatch_args
80+
if 'sbatch_args' in node.plugin_args:
81+
if 'overwrite' in node.plugin_args and\
82+
node.plugin_args['overwrite']:
83+
sbatch_args = node.plugin_args['sbatch_args']
84+
else:
85+
sbatch_args += (" " + node.plugin_args['sbatch_args'])
86+
if '-o' not in sbatch_args:
87+
sbatch_args = '%s -o %s' % (sbatch_args, os.path.join(path, 'slurm-%j.out'))
88+
if '-e' not in sbatch_args:
89+
sbatch_args = '%s -e %s' % (sbatch_args, os.path.join(path, 'slurm-%j.out'))
90+
if '-p' not in sbatch_args:
91+
sbatch_args = '%s -p normal' % (sbatch_args)
92+
if '-n' not in sbatch_args:
93+
sbatch_args = '%s -n 16' % (sbatch_args)
94+
if '-t' not in sbatch_args:
95+
sbatch_args = '%s -t 1:00:00' % (sbatch_args)
96+
if node._hierarchy:
97+
jobname = '.'.join((os.environ.data['LOGNAME'],
98+
node._hierarchy,
99+
node._id))
100+
else:
101+
jobname = '.'.join((os.environ.data['LOGNAME'],
102+
node._id))
103+
jobnameitems = jobname.split('.')
104+
jobnameitems.reverse()
105+
jobname = '.'.join(jobnameitems)
106+
cmd.inputs.args = '%s -J %s %s' % (sbatch_args,
107+
jobname,
108+
scriptfile)
109+
oldlevel = iflogger.level
110+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
111+
tries = 0
112+
while True:
113+
try:
114+
result = cmd.run()
115+
except Exception, e:
116+
if tries < self._max_tries:
117+
tries += 1
118+
sleep(self._retry_timeout) # sleep 2 seconds and try again.
119+
else:
120+
iflogger.setLevel(oldlevel)
121+
raise RuntimeError('\n'.join((('Could not submit sbatch task'
122+
' for node %s') % node._id,
123+
str(e))))
124+
else:
125+
break
126+
logger.debug('Ran command ({0})'.format(cmd.cmdline))
127+
iflogger.setLevel(oldlevel)
128+
# retrieve taskid
129+
lines = [line for line in result.runtime.stdout.split('\n') if line]
130+
taskid = int(re.match("Submitted batch job ([0-9]*)",
131+
lines[-1]).groups()[0])
132+
self._pending[taskid] = node.output_dir()
133+
logger.debug('submitted sbatch task: %d for node %s' % (taskid, node._id))
134+
return taskid

nipype/workflows/fmri/fsl/preprocess.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import nipype.interfaces.fsl as fsl # fsl
66
import nipype.interfaces.utility as util # utility
77
import nipype.pipeline.engine as pe # pypeline engine
8+
import nipype.interfaces.freesurfer as fs # freesurfer
9+
import nipype.interfaces.spm as spm
810

911
from ...smri.freesurfer.utils import create_getmask_flow
1012

0 commit comments

Comments
 (0)