2
2
"""
3
3
4
4
import os
5
+ import stat
5
6
from time import sleep
6
7
import subprocess
7
8
import json
@@ -26,6 +27,7 @@ class OARPlugin(SGELikeBatchManagerBase):
26
27
27
28
# Addtional class variables
28
29
_max_jobname_len = 15
30
+ _oarsub_args = ''
29
31
30
32
def __init__ (self , ** kwargs ):
31
33
template = """
@@ -53,9 +55,10 @@ def _is_pending(self, taskid):
53
55
stderr = subprocess .PIPE
54
56
)
55
57
o , e = proc .communicate ()
58
+ parsed_result = json .loads (o )[taskid ].lower ()
59
+ is_pending = 'error' not in parsed_result
56
60
57
- parsed_result = json .loads (o )[taskid ]
58
- return 'error' not in parsed_result
61
+ return is_pending
59
62
60
63
def _submit_batchtask (self , scriptfile , node ):
61
64
cmd = CommandLine ('oarsub' , environ = os .environ .data ,
@@ -72,10 +75,7 @@ def _submit_batchtask(self, scriptfile, node):
72
75
oarsubargs = node .plugin_args ['oarsub_args' ]
73
76
else :
74
77
oarsubargs += (" " + node .plugin_args ['oarsub_args' ])
75
- if '-o' not in oarsubargs :
76
- oarsubargs = '%s -o %s' % (oarsubargs , path )
77
- if '-E' not in oarsubargs :
78
- oarsubargs = '%s -E %s' % (oarsubargs , path )
78
+
79
79
if node ._hierarchy :
80
80
jobname = '.' .join ((os .environ .data ['LOGNAME' ],
81
81
node ._hierarchy ,
@@ -87,6 +87,21 @@ def _submit_batchtask(self, scriptfile, node):
87
87
jobnameitems .reverse ()
88
88
jobname = '.' .join (jobnameitems )
89
89
jobname = jobname [0 :self ._max_jobname_len ]
90
+
91
+ if '-O' not in oarsubargs :
92
+ oarsubargs = '%s -O %s' % (
93
+ oarsubargs ,
94
+ os .path .join (path , jobname + '.stdout' )
95
+ )
96
+ if '-E' not in oarsubargs :
97
+ oarsubargs = '%s -E %s' % (
98
+ oarsubargs ,
99
+ os .path .join (path , jobname + '.stderr' )
100
+ )
101
+ if '-J' not in oarsubargs :
102
+ oarsubargs = '%s -J' % (oarsubargs )
103
+
104
+ os .chmod (scriptfile , stat .S_IEXEC | stat .S_IREAD | stat .S_IWRITE )
90
105
cmd .inputs .args = '%s -n %s -S %s' % (
91
106
oarsubargs ,
92
107
jobname ,
@@ -106,7 +121,7 @@ def _submit_batchtask(self, scriptfile, node):
106
121
# sleep 2 seconds and try again.
107
122
else :
108
123
iflogger .setLevel (oldlevel )
109
- raise RuntimeError ('\n ' .join ((('Could not submit pbs task'
124
+ raise RuntimeError ('\n ' .join ((('Could not submit OAR task'
110
125
' for node %s' ) % node ._id ,
111
126
str (e ))))
112
127
else :
@@ -126,5 +141,4 @@ def _submit_batchtask(self, scriptfile, node):
126
141
taskid = json .loads (o )['job_id' ]
127
142
self ._pending [taskid ] = node .output_dir ()
128
143
logger .debug ('submitted OAR task: %s for node %s' % (taskid , node ._id ))
129
-
130
144
return taskid
0 commit comments