Skip to content

Commit 12da254

Browse files
committed
sweep: #7093 refactor job submit
1 parent 50af210 commit 12da254

File tree

3 files changed

+219
-191
lines changed

3 files changed

+219
-191
lines changed

src/DIRAC/Core/Utilities/ReturnValues.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def __init__(self, result: DErrorReturnType | str, errCode: int = 0):
200200
self.result = cast(DErrorReturnType, result)
201201

202202

203-
def returnValueOrRaise(result: DReturnType[T]) -> T:
203+
def returnValueOrRaise(result: DReturnType[T], *, errorCode: int = 0) -> T:
204204
"""Unwrap an S_OK/S_ERROR response into a value or Exception
205205
206206
This method assists with using exceptions in DIRAC code by raising
@@ -217,7 +217,7 @@ def returnValueOrRaise(result: DReturnType[T]) -> T:
217217
if "ExecInfo" in result:
218218
raise result["ExecInfo"][0]
219219
else:
220-
raise SErrorException(result)
220+
raise SErrorException(result, errorCode)
221221
return result["Value"]
222222

223223

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 53 additions & 189 deletions
Original file line numberDiff line numberDiff line change
@@ -23,38 +23,22 @@
2323
from DIRAC.Core.Base.DB import DB
2424
from DIRAC.Core.Utilities import DErrno
2525
from DIRAC.Core.Utilities.ClassAd.ClassAdLight import ClassAd
26-
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR
27-
from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN
26+
from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR, convertToReturnValue
27+
from DIRAC.Core.Utilities.DErrno import EWMSSUBM, EWMSJMAN, cmpError
2828
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
2929
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
3030
from DIRAC.WorkloadManagementSystem.Client.JobState.JobManifest import JobManifest
3131
from DIRAC.WorkloadManagementSystem.Client import JobStatus
3232
from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
3333
from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
34-
35-
#############################################################################
36-
# utility functions
37-
38-
39-
def compressJDL(jdl):
40-
"""Return compressed JDL string."""
41-
return base64.b64encode(zlib.compress(jdl.encode(), -1)).decode()
42-
43-
44-
def extractJDL(compressedJDL):
45-
"""Return decompressed JDL string."""
46-
# the starting bracket is guaranteeed by JobManager.submitJob
47-
# we need the check to be backward compatible
48-
if isinstance(compressedJDL, bytes):
49-
if compressedJDL.startswith(b"["):
50-
return compressedJDL.decode()
51-
else:
52-
if compressedJDL.startswith("["):
53-
return compressedJDL
54-
return zlib.decompress(base64.b64decode(compressedJDL)).decode()
55-
56-
57-
#############################################################################
34+
from DIRAC.WorkloadManagementSystem.DB.JobDBUtils import (
35+
checkAndAddOwner,
36+
fixJDL,
37+
checkAndPrepareJob,
38+
createJDLWithInitialStatus,
39+
compressJDL,
40+
extractJDL,
41+
)
5842

5943

6044
class JobDB(DB):
@@ -70,10 +54,6 @@ def __init__(self, parentLogger=None):
7054
self.maxRescheduling = self.getCSOption("MaxRescheduling", 3)
7155

7256
# loading the function that will be used to determine the platform (it can be VO specific)
73-
res = ObjectLoader().loadObject("ConfigurationSystem.Client.Helpers.Resources", "getDIRACPlatform")
74-
if not res["OK"]:
75-
self.log.fatal(res["Message"])
76-
self.getDIRACPlatform = res["Value"]
7757

7858
self.jobAttributeNames = []
7959

@@ -923,42 +903,28 @@ def insertNewJobIntoDB(
923903
:param str initialMinorStatus: optional initial minor job status
924904
:return: new job ID
925905
"""
926-
jobManifest = JobManifest()
927-
result = jobManifest.load(jdl)
928-
if not result["OK"]:
929-
return result
930-
jobManifest.setOptionsFromDict({"Owner": owner, "OwnerGroup": ownerGroup})
931-
result = jobManifest.check()
906+
jobAttrs = {
907+
"LastUpdateTime": str(datetime.datetime.utcnow()),
908+
"SubmissionTime": str(datetime.datetime.utcnow()),
909+
"Owner": owner,
910+
"OwnerGroup": ownerGroup,
911+
}
912+
913+
result = checkAndAddOwner(jdl, owner, ownerGroup)
932914
if not result["OK"]:
933915
return result
934-
jobAttrNames = []
935-
jobAttrValues = []
916+
jobManifest = result["Value"]
917+
jdl = fixJDL(jdl)
936918

937-
# 1.- insert original JDL on DB and get new JobID
938-
# Fix the possible lack of the brackets in the JDL
939-
if jdl.strip()[0].find("[") != 0:
940-
jdl = "[" + jdl + "]"
941919
result = self.__insertNewJDL(jdl)
942920
if not result["OK"]:
943921
return S_ERROR(EWMSSUBM, "Failed to insert JDL in to DB")
922+
944923
jobID = result["Value"]
945924

946925
jobManifest.setOption("JobID", jobID)
947926

948-
jobAttrNames.append("JobID")
949-
jobAttrValues.append(jobID)
950-
951-
jobAttrNames.append("LastUpdateTime")
952-
jobAttrValues.append(str(datetime.datetime.utcnow()))
953-
954-
jobAttrNames.append("SubmissionTime")
955-
jobAttrValues.append(str(datetime.datetime.utcnow()))
956-
957-
jobAttrNames.append("Owner")
958-
jobAttrValues.append(owner)
959-
960-
jobAttrNames.append("OwnerGroup")
961-
jobAttrValues.append(ownerGroup)
927+
jobAttrs["JobID"] = jobID
962928

963929
# 2.- Check JDL and Prepare DIRAC JDL
964930
jobJDL = jobManifest.dumpAsJDL()
@@ -972,13 +938,11 @@ def insertNewJobIntoDB(
972938
retVal = S_OK(jobID)
973939
retVal["JobID"] = jobID
974940
if not classAdJob.isOK():
975-
jobAttrNames.append("Status")
976-
jobAttrValues.append(JobStatus.FAILED)
941+
jobAttrs["Status"] = JobStatus.FAILED
977942

978-
jobAttrNames.append("MinorStatus")
979-
jobAttrValues.append("Error in JDL syntax")
943+
jobAttrs["MinorStatus"] = "Error in JDL syntax"
980944

981-
result = self.insertFields("Jobs", jobAttrNames, jobAttrValues)
945+
result = self.insertFields("Jobs", inDict=jobAttrs)
982946
if not result["OK"]:
983947
return result
984948

@@ -987,53 +951,21 @@ def insertNewJobIntoDB(
987951
return retVal
988952

989953
classAdJob.insertAttributeInt("JobID", jobID)
990-
result = self.__checkAndPrepareJob(
991-
jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues
992-
)
954+
vo = getVOForGroup(ownerGroup)
955+
result = self.__checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo)
993956
if not result["OK"]:
994957
return result
995958

996-
priority = classAdJob.getAttributeInt("Priority")
997-
if priority is None:
998-
priority = 0
999-
jobAttrNames.append("UserPriority")
1000-
jobAttrValues.append(priority)
1001-
1002-
for jdlName in self.jdl2DBParameters:
1003-
# Defaults are set by the DB.
1004-
jdlValue = classAdJob.getAttributeString(jdlName)
1005-
if jdlValue:
1006-
jobAttrNames.append(jdlName)
1007-
jobAttrValues.append(jdlValue)
1008-
1009-
jdlValue = classAdJob.getAttributeString("Site")
1010-
if jdlValue:
1011-
jobAttrNames.append("Site")
1012-
if jdlValue.find(",") != -1:
1013-
jobAttrValues.append("Multiple")
1014-
else:
1015-
jobAttrValues.append(jdlValue)
1016-
1017-
jobAttrNames.append("VerifiedFlag")
1018-
jobAttrValues.append("True")
1019-
1020-
jobAttrNames.append("Status")
1021-
jobAttrValues.append(initialStatus)
1022-
1023-
jobAttrNames.append("MinorStatus")
1024-
jobAttrValues.append(initialMinorStatus)
1025-
1026-
reqJDL = classAdReq.asJDL()
1027-
classAdJob.insertAttributeInt("JobRequirements", reqJDL)
1028-
1029-
jobJDL = classAdJob.asJDL()
959+
jobJDL = createJDLWithInitialStatus(
960+
classAdJob, classAdReq, self.jdl2DBParameters, jobAttrs, initialStatus, initialMinorStatus
961+
)
1030962

1031963
result = self.setJobJDL(jobID, jobJDL)
1032964
if not result["OK"]:
1033965
return result
1034966

1035967
# Adding the job in the Jobs table
1036-
result = self.insertFields("Jobs", jobAttrNames, jobAttrValues)
968+
result = self.insertFields("Jobs", inDict=jobAttrs)
1037969
if not result["OK"]:
1038970
return result
1039971

@@ -1076,79 +1008,22 @@ def insertNewJobIntoDB(
10761008

10771009
return retVal
10781010

1079-
def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrNames, jobAttrValues):
1011+
def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo):
10801012
"""
10811013
Check Consistency of Submitted JDL and set some defaults
10821014
Prepare subJDL with Job Requirements
10831015
"""
1084-
error = ""
1085-
vo = getVOForGroup(ownerGroup)
1086-
1087-
jdlOwner = classAdJob.getAttributeString("Owner")
1088-
jdlOwnerGroup = classAdJob.getAttributeString("OwnerGroup")
1089-
jdlVO = classAdJob.getAttributeString("VirtualOrganization")
1090-
1091-
if jdlOwner and jdlOwner != owner:
1092-
error = "Wrong Owner in JDL"
1093-
elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup:
1094-
error = "Wrong Owner Group in JDL"
1095-
elif jdlVO and jdlVO != vo:
1096-
error = "Wrong Virtual Organization in JDL"
1097-
1098-
classAdJob.insertAttributeString("Owner", owner)
1099-
classAdJob.insertAttributeString("OwnerGroup", ownerGroup)
1100-
1101-
if vo:
1102-
classAdJob.insertAttributeString("VirtualOrganization", vo)
1103-
1104-
classAdReq.insertAttributeString("OwnerGroup", ownerGroup)
1105-
if vo:
1106-
classAdReq.insertAttributeString("VirtualOrganization", vo)
1016+
retVal = checkAndPrepareJob(jobID, classAdJob, classAdReq, owner, ownerGroup, jobAttrs, vo)
11071017

1108-
inputDataPolicy = Operations(vo=vo).getValue("InputDataPolicy/InputDataModule")
1109-
if inputDataPolicy and not classAdJob.lookupAttribute("InputDataModule"):
1110-
classAdJob.insertAttributeString("InputDataModule", inputDataPolicy)
1018+
if not retVal["OK"]:
1019+
if cmpError(retVal, EWMSSUBM):
1020+
resultInsert = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()))
1021+
if not resultInsert["OK"]:
1022+
retVal["MinorStatus"] += f"; {resultInsert['Message']}"
11111023

1112-
# priority
1113-
priority = classAdJob.getAttributeInt("Priority")
1114-
if priority is None:
1115-
priority = 0
1116-
classAdReq.insertAttributeInt("UserPriority", priority)
1117-
1118-
# CPU time
1119-
cpuTime = classAdJob.getAttributeInt("CPUTime")
1120-
if cpuTime is None:
1121-
opsHelper = Operations(group=ownerGroup)
1122-
cpuTime = opsHelper.getValue("JobDescription/DefaultCPUTime", 86400)
1123-
classAdReq.insertAttributeInt("CPUTime", cpuTime)
1124-
1125-
# platform(s)
1126-
platformList = classAdJob.getListFromExpression("Platform")
1127-
if platformList:
1128-
result = self.getDIRACPlatform(platformList)
1129-
if not result["OK"]:
1130-
return result
1131-
if result["Value"]:
1132-
classAdReq.insertAttributeVectorString("Platforms", result["Value"])
1024+
return retVal
11331025
else:
1134-
error = "OS compatibility info not found"
1135-
1136-
if error:
1137-
retVal = S_ERROR(EWMSSUBM, error)
1138-
retVal["JobId"] = jobID
1139-
retVal["Status"] = JobStatus.FAILED
1140-
retVal["MinorStatus"] = error
1141-
1142-
jobAttrNames.append("Status")
1143-
jobAttrValues.append(JobStatus.FAILED)
1144-
1145-
jobAttrNames.append("MinorStatus")
1146-
jobAttrValues.append(error)
1147-
resultInsert = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues)
1148-
if not resultInsert["OK"]:
1149-
retVal["MinorStatus"] += f"; {resultInsert['Message']}"
1150-
1151-
return retVal
1026+
return retVal
11521027

11531028
return S_OK()
11541029

@@ -1237,11 +1112,7 @@ def rescheduleJob(self, jobID):
12371112
return res
12381113
return S_ERROR(f"Maximum number of reschedulings is reached: {self.maxRescheduling}")
12391114

1240-
jobAttrNames = []
1241-
jobAttrValues = []
1242-
1243-
jobAttrNames.append("RescheduleCounter")
1244-
jobAttrValues.append(rescheduleCounter)
1115+
jobAttrs = {"RescheduleCounter": rescheduleCounter}
12451116

12461117
# Save the job parameters for later debugging
12471118
result = JobMonitoringClient().getJobParameters(jobID)
@@ -1281,14 +1152,15 @@ def rescheduleJob(self, jobID):
12811152
retVal["JobID"] = jobID
12821153

12831154
classAdJob.insertAttributeInt("JobID", jobID)
1155+
12841156
result = self.__checkAndPrepareJob(
12851157
jobID,
12861158
classAdJob,
12871159
classAdReq,
12881160
resultDict["Owner"],
12891161
resultDict["OwnerGroup"],
1290-
jobAttrNames,
1291-
jobAttrValues,
1162+
jobAttrs,
1163+
getVOForGroup(resultDict["OwnerGroup"]),
12921164
)
12931165

12941166
if not result["OK"]:
@@ -1297,8 +1169,7 @@ def rescheduleJob(self, jobID):
12971169
priority = classAdJob.getAttributeInt("Priority")
12981170
if priority is None:
12991171
priority = 0
1300-
jobAttrNames.append("UserPriority")
1301-
jobAttrValues.append(priority)
1172+
jobAttrs["UserPriority"] = priority
13021173

13031174
siteList = classAdJob.getListFromExpression("Site")
13041175
if not siteList:
@@ -1308,26 +1179,19 @@ def rescheduleJob(self, jobID):
13081179
else:
13091180
site = siteList[0]
13101181

1311-
jobAttrNames.append("Site")
1312-
jobAttrValues.append(site)
1182+
jobAttrs["Site"] = site
13131183

1314-
jobAttrNames.append("Status")
1315-
jobAttrValues.append(JobStatus.RECEIVED)
1184+
jobAttrs["Status"] = JobStatus.RECEIVED
13161185

1317-
jobAttrNames.append("MinorStatus")
1318-
jobAttrValues.append(JobMinorStatus.RESCHEDULED)
1186+
jobAttrs["MinorStatus"] = JobMinorStatus.RESCHEDULED
13191187

1320-
jobAttrNames.append("ApplicationStatus")
1321-
jobAttrValues.append("Unknown")
1188+
jobAttrs["ApplicationStatus"] = "Unknown"
13221189

1323-
jobAttrNames.append("ApplicationNumStatus")
1324-
jobAttrValues.append(0)
1190+
jobAttrs["ApplicationNumStatus"] = 0
13251191

1326-
jobAttrNames.append("LastUpdateTime")
1327-
jobAttrValues.append(str(datetime.datetime.utcnow()))
1192+
jobAttrs["LastUpdateTime"] = str(datetime.datetime.utcnow())
13281193

1329-
jobAttrNames.append("RescheduleTime")
1330-
jobAttrValues.append(str(datetime.datetime.utcnow()))
1194+
jobAttrs["RescheduleTime"] = str(datetime.datetime.utcnow())
13311195

13321196
reqJDL = classAdReq.asJDL()
13331197
classAdJob.insertAttributeInt("JobRequirements", reqJDL)
@@ -1346,7 +1210,7 @@ def rescheduleJob(self, jobID):
13461210
if not result["OK"]:
13471211
return result
13481212

1349-
result = self.setJobAttributes(jobID, jobAttrNames, jobAttrValues, force=True)
1213+
result = self.setJobAttributes(jobID, list(jobAttrs), list(jobAttrs.values()), force=True)
13501214
if not result["OK"]:
13511215
return result
13521216

0 commit comments

Comments
 (0)