Skip to content

Commit aebc5ac

Browse files
committed
Initial hooks into Jobs table with Dispatcher support
1 parent 4ebd539 commit aebc5ac

File tree

2 files changed

+95
-13
lines changed

2 files changed

+95
-13
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
BATCH_SYSTEM=swif
2+
3+
NCORES=1
4+
PROJECT = gluex
5+
TRACK= simulation
6+
7+
ENVIRONMENT_FILE=/work/halld2/home/tbritton/groupMC/group_build.csh
8+
9+
DISK=5GB
10+
RAM=4GB
11+
TIMELIMIT=24h
12+
OS=centos7

MCwrapper/gluex_MC.py

Lines changed: 83 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
from subprocess import call
3838
import glob
3939

40-
def swif_add_job(WORKFLOW, RUNNO, FILENO,SCRIPT,COMMAND, VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR):
40+
dbcnx = mysql.connector.connect(user='mcuser', database='gluex_mc', host='hallddb.jlab.org')
41+
dbcursor = dbcnx.cursor()
42+
43+
def swif_add_job(WORKFLOW, RUNNO, FILENO,SCRIPT,COMMAND, VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR, PROJECT_ID):
4144

4245

4346
# PREPARE NAMES
@@ -76,8 +79,14 @@ def swif_add_job(WORKFLOW, RUNNO, FILENO,SCRIPT,COMMAND, VERBOSE,PROJECT,TRACK,N
7679
print( "Nice try.....you cannot use ; or &")
7780
exit(1)
7881
status = subprocess.call(add_command.split(" "))
82+
83+
if PROJECT_ID != -1:
84+
recordJob(PROJECT_ID,RUNNO,FILENO)
85+
86+
7987

80-
def qsub_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, MEMLIMIT, QUEUENAME, LOG_DIR ):
88+
89+
def qsub_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, MEMLIMIT, QUEUENAME, LOG_DIR, PROJECT_ID ):
8190
#name
8291
STUBNAME = str(RUNNUM) + "_" + str(FILENUM)
8392
JOBNAME = WORKFLOW + "_" + STUBNAME
@@ -129,8 +138,11 @@ def qsub_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DA
129138
if ( VERBOSE == False ) :
130139
status = subprocess.call("rm MCqsub.submit", shell=True)
131140

141+
if PROJECT_ID != -1:
142+
recordJob(PROJECT_ID,RUNNO,FILENO)
143+
132144

133-
def condor_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR ):
145+
def condor_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, PROJECT_ID ):
134146
STUBNAME = str(RUNNUM) + "_" + str(FILENUM)
135147
JOBNAME = WORKFLOW + "_" + STUBNAME
136148

@@ -154,7 +166,11 @@ def condor_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES,
154166
status = subprocess.call(add_command, shell=True)
155167
status = subprocess.call("rm MCcondor.submit", shell=True)
156168

157-
def OSG_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG ):
169+
if PROJECT_ID != -1:
170+
recordJob(PROJECT_ID,RUNNO,FILENO)
171+
172+
173+
def OSG_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID ):
158174
STUBNAME = str(RUNNUM) + "_" + str(FILENUM)
159175
JOBNAME = WORKFLOW + "_" + STUBNAME
160176

@@ -271,7 +287,48 @@ def OSG_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DAT
271287
status = subprocess.call(add_command, shell=True)
272288
status = subprocess.call("rm MCOSG.submit", shell=True)
273289

290+
if PROJECT_ID != -1:
291+
recordJob(PROJECT_ID,RUNNO,FILENO)
292+
293+
def SLURM_add_job(VERBOSE, WORKFLOW, RUNNUM, FILENUM, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID ):
294+
STUBNAME = str(RUNNUM) + "_" + str(FILENUM)
295+
JOBNAME = WORKFLOW + "_" + STUBNAME
274296

297+
#mkdircom="mkdir -p "+DATA_OUTPUT_BASE_DIR+"/log/"
298+
299+
f=open('MCSLURM.submit','w')
300+
f.write("#!/bin/bash -l"+"\n")
301+
f.write("#SBATCH -J "+JOBNAME+"\n")
302+
f.write("#SBATCH --image=docker:jeffersonlab/hdrecon:latest"+"\n")
303+
f.write("#SBATCH --nodes=1"+"\n")
304+
f.write("#SBATCH --time="+TIMELIMIT+"\n")
305+
f.write("#SBATCH --tasks-per-node=1"+"\n")
306+
f.write("#SBATCH --cpus-per-task="+NCORES+"\n")
307+
f.write("#SBATCH --qos=regular"+"\n")
308+
f.write("#SBATCH -C haswell"+"\n")
309+
f.write("#SBATCH -L project"+"\n")
310+
f.write("shifter $MCWRAPPER_CENTRAL/MakeMC.sh"+COMMAND+"\n")
311+
312+
f.close()
313+
314+
exit(1)
315+
316+
add_command="condor_submit -name "+JOBNAME+" MCOSG.submit"
317+
if add_command.find(';')!=-1 or add_command.find('&')!=-1 :#THIS CHECK HELPS PROTEXT AGAINST A POTENTIAL HACK VIA CONFIG FILES
318+
print( "Nice try.....you cannot use ; or &")
319+
exit(1)
320+
321+
status = subprocess.call(mkdircom, shell=True)
322+
status = subprocess.call(add_command, shell=True)
323+
status = subprocess.call("rm MCOSG.submit", shell=True)
324+
325+
if PROJECT_ID != -1:
326+
recordJob(PROJECT_ID,RUNNO,FILENO)
327+
328+
def recordJob(PROJECT_ID,RUNNO,FILENO):
329+
330+
dbcursor.execute("INSERT INTO Jobs (Project_ID, RunNumber, FileNumber, Creation_Time, Status) VALUES ("+str(PROJECT_ID)+", "+str(RUNNO)+", "+str(FILENO)+", NOW(), 1)")
331+
dbcnx.commit()
275332

276333
def showhelp():
277334
helpstring= "variation=%s where %s is a valid jana_calib_context variation string (default is \"mc\")\n"
@@ -316,7 +373,7 @@ def main(argv):
316373

317374
print( "*********************************")
318375
print( "Welcome to v1.15 of the MCwrapper")
319-
print( "Thomas Britton 6/25/18")
376+
print( "Thomas Britton 7/25/18")
320377
print( "*********************************")
321378

322379
#load all argument passed in and set default options
@@ -365,6 +422,7 @@ def main(argv):
365422
TIMELIMIT = "300minutes" # Max walltime
366423
OS = "centos7" # Specify CentOS65 machines
367424

425+
PROJECT_ID=-1 #internally used when needed
368426
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
369427
VERSION = "mc"
370428
CALIBTIME="notime"
@@ -579,9 +637,14 @@ def main(argv):
579637
if flag[0]=="logdir":
580638
argfound=1
581639
LOG_DIR=str(flag[1])
640+
if flag[0]=="projid":
641+
argfound=1
642+
PROJECT_ID=str(flag[1])
582643
if argfound==0:
583644
print( "WARNING OPTION: "+argu+" NOT FOUND!")
584645

646+
647+
585648
# if str(GEANTVER)=="3":
586649
# print "!!! Warning: Geant 3 detected! NumThreads has been set to 1"
587650
# print "!!! This is done to ensure efficient use of resources while running and should provide faster job starts."
@@ -727,13 +790,15 @@ def main(argv):
727790
os.system(str(indir)+" "+COMMAND)
728791
else:
729792
if BATCHSYS.upper()=="SWIF":
730-
swif_add_job(WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1,str(indir),COMMAND,VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR)
793+
swif_add_job(WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1,str(indir),COMMAND,VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR, PROJECT_ID)
731794
elif BATCHSYS.upper()=="QSUB":
732-
qsub_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, RAM, QUEUENAME, LOG_DIR )
795+
qsub_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, RAM, QUEUENAME, LOG_DIR, PROJECT_ID )
733796
elif BATCHSYS.upper()=="CONDOR":
734-
condor_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR )
797+
condor_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, PROJECT_ID )
735798
elif BATCHSYS.upper()=="OSG":
736-
OSG_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG )
799+
OSG_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID)
800+
elif BATCHSYS.upper()=="SLURM":
801+
SLURM_add_job(VERBOSE, WORKFLOW, runs[0], BASEFILENUM+FILENUM_this_run+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID )
737802
#print "----------------"
738803

739804
else:
@@ -756,20 +821,25 @@ def main(argv):
756821
os.system(str(indir)+" "+COMMAND)
757822
else:
758823
if BATCHSYS.upper()=="SWIF":
759-
swif_add_job(WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1,str(indir),COMMAND,VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR)
824+
swif_add_job(WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1,str(indir),COMMAND,VERBOSE,PROJECT,TRACK,NCORES,DISK,RAM,TIMELIMIT,OS,DATA_OUTPUT_BASE_DIR, PROJECT_ID)
760825
elif BATCHSYS.upper()=="QSUB":
761-
qsub_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, RAM, QUEUENAME, LOG_DIR )
826+
qsub_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, RAM, QUEUENAME, LOG_DIR, PROJECT_ID )
762827
elif BATCHSYS.upper()=="CONDOR":
763-
condor_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR )
828+
condor_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, PROJECT_ID )
764829
elif BATCHSYS.upper()=="OSG":
765-
OSG_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG )
830+
OSG_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID )
831+
elif BATCHSYS.upper()=="SLURM":
832+
SLURM_add_job(VERBOSE, WORKFLOW, RUNNUM, BASEFILENUM+FILENUM+-1, indir, COMMAND, NCORES, DATA_OUTPUT_BASE_DIR, TIMELIMIT, RUNNING_DIR, ENVFILE, LOG_DIR, RANDBGTAG, PROJECT_ID )
766833

767834

768835
if BATCHRUN == 1 and BATCHSYS.upper() == "SWIF":
769836
print( "All Jobs created. Please call \"swif run "+WORKFLOW+"\" to run")
770837
elif BATCHRUN == 2 and BATCHSYS.upper()=="SWIF":
771838
swifrun = "swif run "+WORKFLOW
772839
subprocess.call(swifrun.split(" "))
840+
841+
842+
dbcnx.close()
773843

774844
if __name__ == "__main__":
775845
main(sys.argv[1:])

0 commit comments

Comments
 (0)