Skip to content

Commit ebfa133

Browse files
committed
Merge pull request #7 from jpata/slurm_cleaned
[RFC] Added a SLURM scheduler
2 parents d581a77 + cf34647 commit ebfa133

File tree

1 file changed

+338
-0
lines changed

1 file changed

+338
-0
lines changed
Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
#!/usr/bin/env python
2+
"""
3+
BossLite SLURM interface
4+
5+
Written by joosep.pata@cern.ch, based on modifications by mario@cern.ch
6+
7+
Based on the Pbsv2 interface
8+
9+
"""
10+
11+
__revision__ = "$Id: "
12+
__version__ = "$Revision: 1.4 $"
13+
14+
import re, os, time, uuid
15+
import tempfile, os.path
16+
import subprocess, re, socket
17+
import shutil
18+
19+
from ProdCommon.BossLite.Scheduler.SchedulerInterface import SchedulerInterface
20+
from ProdCommon.BossLite.Common.Exceptions import SchedulerError
21+
from ProdCommon.BossLite.DbObjects.Job import Job
22+
from ProdCommon.BossLite.DbObjects.Task import Task
23+
from ProdCommon.BossLite.DbObjects.RunningJob import RunningJob
24+
25+
class SchedulerSlurm (SchedulerInterface) :
26+
"""
27+
basic class to handle pbs jobs
28+
"""
29+
def __init__( self, **args):
30+
super(SchedulerSlurm, self).__init__(**args)
31+
print("BossLite.SchedulerSlurm.__init__: args = ", args)
32+
self.jobScriptDir = args['jobScriptDir']
33+
self.jobResDir = args['jobResDir']
34+
self.queue = args['queue']
35+
self.workerNodeWorkDir = args.get('workernodebase', '')
36+
if not self.workerNodeWorkDir:
37+
self.workerNodeWorkDir='/scratch/' + os.environ['LOGNAME']
38+
print("BossLite.SchedulerSlurm.__init__: workernodebase not set, using default = ", self.workerNodeWorkDir)
39+
40+
self.hostname = args.get('hostname', None)
41+
if not self.hostname:
42+
self.hostname = socket.gethostname()
43+
self.resources = args.get('resources', '')
44+
self.use_proxy = args.get('use_proxy', True)
45+
self.forceTransferFiles= args.get('forcetransferfiles', 0)
46+
47+
self.res_dict = {}
48+
self.proxy_location = os.environ.get( 'X509_USER_PROXY', \
49+
'/tmp/x509up_u'+ repr(os.getuid()) )
50+
51+
self.status_map={'E':'R',
52+
'H':'SS',
53+
'Q':'SS',
54+
'R':'R',
55+
'S':'R',
56+
'T':'R',
57+
'W':'SS',
58+
'PD':'SS',
59+
'Done':'SD',
60+
'C':'SD',
61+
'CG':'SD'}
62+
63+
def jobDescription ( self, obj, requirements='', config='', service = '' ):
64+
"""
65+
retrieve scheduler specific job description
66+
return it as a string
67+
"""
68+
raise NotImplementedError
69+
70+
def submit ( self, obj, requirements='', config='', service = '' ) :
71+
"""
72+
set up submission parameters and submit
73+
74+
return jobAttributes, bulkId, service
75+
76+
- jobAttributs is a map of the format
77+
jobAttributes[ 'name' : 'schedulerId' ]
78+
- bulkId is an eventual bulk submission identifier
79+
- service is a endpoit to connect withs (such as the WMS)
80+
"""
81+
82+
if type(obj) == RunningJob or type(obj) == Job:
83+
map, taskId, queue = self.submitJob(obj, requirements)
84+
elif type(obj) == Task :
85+
map, taskId, queue = self.submitTask (obj, requirements )
86+
87+
return map, taskId, queue
88+
89+
def submitTask ( self, task, requirements=''):
90+
91+
ret_map={}
92+
for job in task.getJobs() :
93+
map, taskId, queue = self.submitJob(job, task, requirements)
94+
ret_map.update(map)
95+
96+
return ret_map, taskId, queue
97+
98+
def submitJob ( self, job, task=None, requirements=''):
99+
""" Need to copy the inputsandbox to WN before submitting a job"""
100+
# Write a temporary submit script
101+
# NB: we assume an env var SLURM_JOBCOOKIE points to the exec dir on the batch host
102+
103+
inputFiles = task['globalSandbox'].split(',')
104+
pbsScript = tempfile.NamedTemporaryFile()
105+
epilogue = tempfile.NamedTemporaryFile( prefix = 'epilogue.' )
106+
if not self.workerNodeWorkDir:
107+
self.workerNodeWorkDir = os.path.join( os.getcwd(), 'CRAB-SLURM' )
108+
if not os.path.exists( self.workerNodeWorkDir ):
109+
os.mkdir( self.workerNodeWorkDir )
110+
111+
self.stageDir = os.path.join( os.getcwd(), 'CRAB-SLURM' )
112+
if not os.path.exists( self.stageDir ):
113+
os.mkdir( self.stageDir )
114+
115+
# Generate a UUID for transfering input files
116+
randomPrefix = uuid.uuid4().hex
117+
118+
# Begin building SLURM script
119+
s=[]
120+
s.append('#!/bin/sh')
121+
s.append('# This script generated by CRAB2 from http://cms.cern.ch')
122+
s.append('#SBATCH --error %swrapper_%s' % (self.jobResDir, job['standardError']) )
123+
s.append('#SBATCH --output %swrapper_%s' % (self.jobResDir, job['standardOutput']) )
124+
s.append('#SBATCH --job-name CMS_CRAB2')
125+
#if self.resources:
126+
# resourceList = self.resources.split(',')
127+
# for resource in resourceList:
128+
# s.append('#SBATCH -l %s' % resource)
129+
130+
#s.append('ls -lah')
131+
#s.append('pwd')
132+
#s.append('set -x')
133+
#s.append('#SBATCH -T %s' % os.path.abspath(epilogue.name))
134+
135+
# get files for stagein
136+
fileList = []
137+
inputFiles = task['globalSandbox'].split(',')
138+
139+
# Do we want the proxy?
140+
if self.use_proxy:
141+
if os.path.exists( self.proxy_location ):
142+
inputFiles.append( self.proxy_location )
143+
else:
144+
raise SchedulerError('Proxy Error',"Proxy not found at %s" % self.proxy_location)
145+
146+
147+
if self.queue:
148+
s.append('#SBATCH --partition %s' % self.queue)
149+
150+
s.append('mkdir -p %s' % self.workerNodeWorkDir)
151+
152+
for file in inputFiles:
153+
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
154+
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
155+
stageFile = os.path.abspath( os.path.join( self.stageDir,
156+
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
157+
if self.forceTransferFiles:
158+
raise Exception("forceTransferFiles not implemented")
159+
s.append('#SBATCH -W stagein=%s@%s:%s' % (targetFile, self.hostname, file))
160+
else:
161+
s.append('cp %s %s' % ( os.path.abspath(file), targetFile ) )
162+
163+
#if fileList:
164+
# s.append('#SLURM -W stagein=%s' % ','.join(fileList))
165+
166+
# Inform SLURM of what we want to stage out
167+
fileList = []
168+
for file in job['outputFiles']:
169+
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
170+
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
171+
stageFile = os.path.abspath( os.path.join( task['outputDirectory'],
172+
file ) )
173+
if self.forceTransferFiles:
174+
raise Exception("forceTransferFiles not implemented")
175+
s.append('#SBATCH -W stageout=%s@%s:%s' % \
176+
(targetFile,
177+
self.hostname,
178+
stageFile) ) # get out of $HOME
179+
else:
180+
s.append('cp -f %s %s' % (targetFile, stageFile) )
181+
182+
183+
s.append('set -x')
184+
s.append('pwd')
185+
s.append('ls -lah')
186+
s.append('echo ***BEGINNING SLURMV2***')
187+
s.append('CRAB2_OLD_DIRECTORY=`pwd`')
188+
s.append('CRAB2_SLURM_WORKDIR=%s' % self.workerNodeWorkDir)
189+
if self.workerNodeWorkDir:
190+
s.append('cd %s' % self.workerNodeWorkDir)
191+
192+
193+
s.append('CRAB2_WORKDIR=`pwd`/CRAB2-$SLURM_JOBCOOKIE$SLURM_JOBID')
194+
s.append('if [ ! -d $CRAB2_WORKDIR ] ; then ')
195+
s.append(' mkdir -p $CRAB2_WORKDIR')
196+
s.append('fi')
197+
s.append('cd $CRAB2_WORKDIR')
198+
199+
# move files up to $SLURM_JOBCOOKIE
200+
inputFiles = task['globalSandbox'].split(',')
201+
202+
# Do we want the proxy?
203+
if self.use_proxy:
204+
if os.path.exists( self.proxy_location ):
205+
inputFiles.append( self.proxy_location )
206+
else:
207+
raise SchedulerError, "Proxy not found at %s" % self.proxy_location
208+
209+
for file in inputFiles:
210+
targetFile = "%s-%s" % (randomPrefix, os.path.basename( file ) )
211+
s.append('mv $CRAB2_SLURM_WORKDIR/%s $CRAB2_WORKDIR/%s' % \
212+
( targetFile,
213+
os.path.basename( file ) ) )
214+
215+
# set proxy
216+
if self.use_proxy:
217+
s.append('export X509_USER_PROXY=$CRAB2_WORKDIR/%s' % os.path.basename( self.proxy_location ) )
218+
219+
s.append("./%s %s" % (job['executable'], job['arguments']) )
220+
221+
s.append('ls $CRAB2_WORKDIR')
222+
# move output files to where SLURM can find them
223+
for file in job['outputFiles']:
224+
s.append('mv $CRAB2_WORKDIR/%s $CRAB2_SLURM_WORKDIR/%s-%s' % (file, randomPrefix, file ) )
225+
226+
fileList = []
227+
for file in job['outputFiles']:
228+
targetFile = os.path.abspath( os.path.join( self.workerNodeWorkDir,
229+
"%s-%s" % (randomPrefix, os.path.basename( file ) ) ) )
230+
stageFile = os.path.abspath( os.path.join( task['outputDirectory'],
231+
file ) )
232+
if not self.forceTransferFiles:
233+
s.append('mv -f %s %s' % (targetFile, stageFile) )
234+
235+
236+
s.append('cd $CRAB2_OLD_DIRECTORY')
237+
s.append('rm -rf $CRAB2_WORKDIR')
238+
pbsScript.write('\n'.join(s))
239+
pbsScript.flush()
240+
for line in s:
241+
self.logging.debug(" CONFIG: %s" % line)
242+
243+
s = []
244+
s.append('#!/bin/sh');
245+
if self.workerNodeWorkDir:
246+
s.append('cd ' + self.workerNodeWorkDir)
247+
s.append('rm -fr $SLURM_JOBCOOKIE')
248+
s.append('touch $HOME/done.$1')
249+
epilogue.write( '\n'.join( s ) )
250+
epilogue.flush()
251+
os.chmod( epilogue.name, 700 )
252+
253+
p = subprocess.Popen("sbatch %s" % pbsScript.name, shell=True,
254+
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
255+
(child_stdout, child_stderr) = p.communicate()
256+
pbsScript.close()
257+
epilogue.close()
258+
259+
if p.returncode != 0:
260+
self.logging.error('Error in job submission')
261+
self.logging.error(child_stderr)
262+
raise SchedulerError('SLURM error', child_stderr)
263+
264+
try:
265+
jobid = int(child_stdout.strip().split(' ')[3])
266+
except:
267+
self.logging.error("SLURM could not submit job: %s" % (child_stdout))
268+
self.logging.error(child_stderr)
269+
raise SchedulerError('SLURM error', child_stderr)
270+
271+
return {job['name']:jobid}, None, None
272+
273+
def query(self, obj, service='', objType='node') :
274+
"""
275+
query status and eventually other scheduler related information
276+
It may use single 'node' scheduler id or bulk id for association
277+
"""
278+
if type(obj) != Task :
279+
raise SchedulerError('wrong argument type', str( type(obj) ))
280+
281+
jobids=[]
282+
for job in obj.jobs:
283+
if not self.valid( job.runningJob ): continue
284+
id=str(job.runningJob['schedulerId']).strip()
285+
#p = subprocess.Popen( ['qstat', '-x', id], stdout=subprocess.PIPE,
286+
p = subprocess.Popen( ['squeue', '-h', '-o','<jobid>%i</jobid><exec_host>%B</exec_host><job_state>%t</job_state>','-j', id], stdout=subprocess.PIPE,
287+
stderr=subprocess.PIPE)
288+
qstat_output, \
289+
qstat_error = p.communicate()
290+
qstat_return = p.returncode
291+
292+
if qstat_return:
293+
#if qstat_return != 153: # 153 means the job isn't there
294+
if qstat_return != 1: # 153 means the job isn't there
295+
self.logging.error('Error in job query for '+id)
296+
self.logging.error('SLURM stdout: \n %s' % qstat_output)
297+
self.logging.error('SLURM stderr: \n %s' % qstat_error)
298+
raise SchedulerError('SLURM error', '%s: %s' % (qstat_error, qstat_return) )
299+
300+
host=''
301+
if len(qstat_output)==0:
302+
pbs_stat='Done'
303+
else:
304+
if qstat_output.find('</exec_host>') >= 0:
305+
host = qstat_output[ qstat_output.find('<exec_host>') + len('<exec_host>') :
306+
qstat_output.find('</exec_host>') ]
307+
if qstat_output.find('</job_state>') >= 0:
308+
pbs_stat = qstat_output[ qstat_output.find('<job_state>') + len('<job_state>') :
309+
qstat_output.find('</job_state>') ]
310+
311+
job.runningJob['statusScheduler']=pbs_stat
312+
job.runningJob['status'] = self.status_map[pbs_stat]
313+
job.runningJob['destination']=host
314+
315+
def kill(self, obj):
316+
317+
for job in obj.jobs :
318+
if not self.valid( job.runningJob ): continue
319+
id=str(job.runningJob['schedulerId']).strip()
320+
321+
p = subprocess.Popen( ['scancel', id], stdout=subprocess.PIPE,
322+
stderr=subprocess.STDOUT)
323+
qdel_output, \
324+
qdel_error = p.communicate()
325+
qdel_return = p.returncode
326+
327+
if qdel_return != 0:
328+
self.logging.error('Error in job kill for '+id)
329+
self.logging.error('SLURM Error stdout: %s' % qdel_output)
330+
raise SchedulerError('SLURM Error in kill', qdel_output)
331+
332+
def getOutput( self, obj, outdir='' ):
333+
"""
334+
retrieve output or just put it in the destination directory
335+
336+
does not return
337+
"""
338+
pass

0 commit comments

Comments
 (0)