Skip to content

Commit 29a1dfc

Browse files
committed
ENH: Added support for OAR
1 parent e4f6e7e commit 29a1dfc

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed

nipype/pipeline/plugins/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from .linear import LinearPlugin
66
from .ipythonx import IPythonXPlugin
77
from .pbs import PBSPlugin
8+
from .oar import OARPlugin
89
from .sge import SGEPlugin
910
from .condor import CondorPlugin
1011
from .dagman import CondorDAGManPlugin

nipype/pipeline/plugins/oar.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
"""Parallel workflow execution via OAR http://oar.imag.fr
2+
"""
3+
4+
import os
5+
from time import sleep
6+
import subprocess
7+
import json
8+
9+
from .base import (SGELikeBatchManagerBase, logger, iflogger, logging)
10+
11+
from nipype.interfaces.base import CommandLine
12+
13+
14+
class OARPlugin(SGELikeBatchManagerBase):
15+
"""Execute using OAR
16+
17+
The plugin_args input to run can be used to control the OAR execution.
18+
Currently supported options are:
19+
20+
- template : template to use for batch job submission
21+
- oarsub_args : arguments to be prepended to the job execution
22+
script in the oarsub call
23+
- max_jobname_len: maximum length of the job name. Default 15.
24+
25+
"""
26+
27+
# Addtional class variables
28+
_max_jobname_len = 15
29+
30+
def __init__(self, **kwargs):
31+
template = """
32+
# oarsub -J
33+
"""
34+
self._retry_timeout = 2
35+
self._max_tries = 2
36+
self._max_jobname_length = 15
37+
if 'plugin_args' in kwargs and kwargs['plugin_args']:
38+
if 'retry_timeout' in kwargs['plugin_args']:
39+
self._retry_timeout = kwargs['plugin_args']['retry_timeout']
40+
if 'max_tries' in kwargs['plugin_args']:
41+
self._max_tries = kwargs['plugin_args']['max_tries']
42+
if 'max_jobname_len' in kwargs['plugin_args']:
43+
self._max_jobname_len = \
44+
kwargs['plugin_args']['max_jobname_len']
45+
super(OARPlugin, self).__init__(template, **kwargs)
46+
47+
def _is_pending(self, taskid):
48+
# subprocess.Popen requires taskid to be a string
49+
proc = subprocess.Popen(
50+
['oarstat', '-J', '-s',
51+
'-j', taskid],
52+
stdout=subprocess.PIPE,
53+
stderr=subprocess.PIPE
54+
)
55+
o, e = proc.communicate()
56+
57+
parsed_result = json.loads(o)[taskid]
58+
return 'error' not in parsed_result
59+
60+
def _submit_batchtask(self, scriptfile, node):
61+
cmd = CommandLine('oarsub', environ=os.environ.data,
62+
terminal_output='allatonce')
63+
path = os.path.dirname(scriptfile)
64+
oarsubargs = ''
65+
if self._oarsub_args:
66+
oarsubargs = self._oarsub_args
67+
if 'oarsub_args' in node.plugin_args:
68+
if (
69+
'overwrite' in node.plugin_args and
70+
node.plugin_args['overwrite']
71+
):
72+
oarsubargs = node.plugin_args['oarsub_args']
73+
else:
74+
oarsubargs += (" " + node.plugin_args['oarsub_args'])
75+
if '-o' not in oarsubargs:
76+
oarsubargs = '%s -o %s' % (oarsubargs, path)
77+
if '-E' not in oarsubargs:
78+
oarsubargs = '%s -E %s' % (oarsubargs, path)
79+
if node._hierarchy:
80+
jobname = '.'.join((os.environ.data['LOGNAME'],
81+
node._hierarchy,
82+
node._id))
83+
else:
84+
jobname = '.'.join((os.environ.data['LOGNAME'],
85+
node._id))
86+
jobnameitems = jobname.split('.')
87+
jobnameitems.reverse()
88+
jobname = '.'.join(jobnameitems)
89+
jobname = jobname[0:self._max_jobname_len]
90+
cmd.inputs.args = '%s -n %s -S %s' % (
91+
oarsubargs,
92+
jobname,
93+
scriptfile
94+
)
95+
96+
oldlevel = iflogger.level
97+
iflogger.setLevel(logging.getLevelName('CRITICAL'))
98+
tries = 0
99+
while True:
100+
try:
101+
result = cmd.run()
102+
except Exception, e:
103+
if tries < self._max_tries:
104+
tries += 1
105+
sleep(self._retry_timeout)
106+
# sleep 2 seconds and try again.
107+
else:
108+
iflogger.setLevel(oldlevel)
109+
raise RuntimeError('\n'.join((('Could not submit pbs task'
110+
' for node %s') % node._id,
111+
str(e))))
112+
else:
113+
break
114+
iflogger.setLevel(oldlevel)
115+
# retrieve OAR taskid
116+
117+
o = ''
118+
add = False
119+
for line in result.runtime.stdout.splitlines():
120+
if line.strip().startswith('{'):
121+
add = True
122+
if add:
123+
o += line + '\n'
124+
if line.strip().startswith('}'):
125+
break
126+
taskid = json.loads(o)['job_id']
127+
self._pending[taskid] = node.output_dir()
128+
logger.debug('submitted OAR task: %s for node %s' % (taskid, node._id))
129+
130+
return taskid

0 commit comments

Comments
 (0)