23
23
from DIRAC .Core .Base .DB import DB
24
24
from DIRAC .Core .Utilities import DErrno
25
25
from DIRAC .Core .Utilities .ClassAd .ClassAdLight import ClassAd
26
- from DIRAC .Core .Utilities .ReturnValues import S_OK , S_ERROR
27
- from DIRAC .Core .Utilities .DErrno import EWMSSUBM , EWMSJMAN
26
+ from DIRAC .Core .Utilities .ReturnValues import S_OK , S_ERROR , convertToReturnValue
27
+ from DIRAC .Core .Utilities .DErrno import EWMSSUBM , EWMSJMAN , cmpError
28
28
from DIRAC .Core .Utilities .ObjectLoader import ObjectLoader
29
29
from DIRAC .ResourceStatusSystem .Client .SiteStatus import SiteStatus
30
30
from DIRAC .WorkloadManagementSystem .Client .JobState .JobManifest import JobManifest
31
31
from DIRAC .WorkloadManagementSystem .Client import JobStatus
32
32
from DIRAC .WorkloadManagementSystem .Client import JobMinorStatus
33
33
from DIRAC .WorkloadManagementSystem .Client .JobMonitoringClient import JobMonitoringClient
34
-
35
- #############################################################################
36
- # utility functions
37
-
38
-
39
- def compressJDL (jdl ):
40
- """Return compressed JDL string."""
41
- return base64 .b64encode (zlib .compress (jdl .encode (), - 1 )).decode ()
42
-
43
-
44
- def extractJDL (compressedJDL ):
45
- """Return decompressed JDL string."""
46
- # the starting bracket is guaranteeed by JobManager.submitJob
47
- # we need the check to be backward compatible
48
- if isinstance (compressedJDL , bytes ):
49
- if compressedJDL .startswith (b"[" ):
50
- return compressedJDL .decode ()
51
- else :
52
- if compressedJDL .startswith ("[" ):
53
- return compressedJDL
54
- return zlib .decompress (base64 .b64decode (compressedJDL )).decode ()
55
-
56
-
57
- #############################################################################
34
+ from DIRAC .WorkloadManagementSystem .DB .JobDBUtils import (
35
+ checkAndAddOwner ,
36
+ fixJDL ,
37
+ checkAndPrepareJob ,
38
+ createJDLWithInitialStatus ,
39
+ compressJDL ,
40
+ extractJDL ,
41
+ )
58
42
59
43
60
44
class JobDB (DB ):
@@ -70,10 +54,6 @@ def __init__(self, parentLogger=None):
70
54
self .maxRescheduling = self .getCSOption ("MaxRescheduling" , 3 )
71
55
72
56
# loading the function that will be used to determine the platform (it can be VO specific)
73
- res = ObjectLoader ().loadObject ("ConfigurationSystem.Client.Helpers.Resources" , "getDIRACPlatform" )
74
- if not res ["OK" ]:
75
- self .log .fatal (res ["Message" ])
76
- self .getDIRACPlatform = res ["Value" ]
77
57
78
58
self .jobAttributeNames = []
79
59
@@ -923,42 +903,28 @@ def insertNewJobIntoDB(
923
903
:param str initialMinorStatus: optional initial minor job status
924
904
:return: new job ID
925
905
"""
926
- jobManifest = JobManifest ()
927
- result = jobManifest .load (jdl )
928
- if not result ["OK" ]:
929
- return result
930
- jobManifest .setOptionsFromDict ({"Owner" : owner , "OwnerGroup" : ownerGroup })
931
- result = jobManifest .check ()
906
+ jobAttrs = {
907
+ "LastUpdateTime" : str (datetime .datetime .utcnow ()),
908
+ "SubmissionTime" : str (datetime .datetime .utcnow ()),
909
+ "Owner" : owner ,
910
+ "OwnerGroup" : ownerGroup ,
911
+ }
912
+
913
+ result = checkAndAddOwner (jdl , owner , ownerGroup )
932
914
if not result ["OK" ]:
933
915
return result
934
- jobAttrNames = [ ]
935
- jobAttrValues = []
916
+ jobManifest = result [ "Value" ]
917
+ jdl = fixJDL ( jdl )
936
918
937
- # 1.- insert original JDL on DB and get new JobID
938
- # Fix the possible lack of the brackets in the JDL
939
- if jdl .strip ()[0 ].find ("[" ) != 0 :
940
- jdl = "[" + jdl + "]"
941
919
result = self .__insertNewJDL (jdl )
942
920
if not result ["OK" ]:
943
921
return S_ERROR (EWMSSUBM , "Failed to insert JDL in to DB" )
922
+
944
923
jobID = result ["Value" ]
945
924
946
925
jobManifest .setOption ("JobID" , jobID )
947
926
948
- jobAttrNames .append ("JobID" )
949
- jobAttrValues .append (jobID )
950
-
951
- jobAttrNames .append ("LastUpdateTime" )
952
- jobAttrValues .append (str (datetime .datetime .utcnow ()))
953
-
954
- jobAttrNames .append ("SubmissionTime" )
955
- jobAttrValues .append (str (datetime .datetime .utcnow ()))
956
-
957
- jobAttrNames .append ("Owner" )
958
- jobAttrValues .append (owner )
959
-
960
- jobAttrNames .append ("OwnerGroup" )
961
- jobAttrValues .append (ownerGroup )
927
+ jobAttrs ["JobID" ] = jobID
962
928
963
929
# 2.- Check JDL and Prepare DIRAC JDL
964
930
jobJDL = jobManifest .dumpAsJDL ()
@@ -972,13 +938,11 @@ def insertNewJobIntoDB(
972
938
retVal = S_OK (jobID )
973
939
retVal ["JobID" ] = jobID
974
940
if not classAdJob .isOK ():
975
- jobAttrNames .append ("Status" )
976
- jobAttrValues .append (JobStatus .FAILED )
941
+ jobAttrs ["Status" ] = JobStatus .FAILED
977
942
978
- jobAttrNames .append ("MinorStatus" )
979
- jobAttrValues .append ("Error in JDL syntax" )
943
+ jobAttrs ["MinorStatus" ] = "Error in JDL syntax"
980
944
981
- result = self .insertFields ("Jobs" , jobAttrNames , jobAttrValues )
945
+ result = self .insertFields ("Jobs" , inDict = jobAttrs )
982
946
if not result ["OK" ]:
983
947
return result
984
948
@@ -987,53 +951,21 @@ def insertNewJobIntoDB(
987
951
return retVal
988
952
989
953
classAdJob .insertAttributeInt ("JobID" , jobID )
990
- result = self .__checkAndPrepareJob (
991
- jobID , classAdJob , classAdReq , owner , ownerGroup , jobAttrNames , jobAttrValues
992
- )
954
+ vo = getVOForGroup (ownerGroup )
955
+ result = self .__checkAndPrepareJob (jobID , classAdJob , classAdReq , owner , ownerGroup , jobAttrs , vo )
993
956
if not result ["OK" ]:
994
957
return result
995
958
996
- priority = classAdJob .getAttributeInt ("Priority" )
997
- if priority is None :
998
- priority = 0
999
- jobAttrNames .append ("UserPriority" )
1000
- jobAttrValues .append (priority )
1001
-
1002
- for jdlName in self .jdl2DBParameters :
1003
- # Defaults are set by the DB.
1004
- jdlValue = classAdJob .getAttributeString (jdlName )
1005
- if jdlValue :
1006
- jobAttrNames .append (jdlName )
1007
- jobAttrValues .append (jdlValue )
1008
-
1009
- jdlValue = classAdJob .getAttributeString ("Site" )
1010
- if jdlValue :
1011
- jobAttrNames .append ("Site" )
1012
- if jdlValue .find ("," ) != - 1 :
1013
- jobAttrValues .append ("Multiple" )
1014
- else :
1015
- jobAttrValues .append (jdlValue )
1016
-
1017
- jobAttrNames .append ("VerifiedFlag" )
1018
- jobAttrValues .append ("True" )
1019
-
1020
- jobAttrNames .append ("Status" )
1021
- jobAttrValues .append (initialStatus )
1022
-
1023
- jobAttrNames .append ("MinorStatus" )
1024
- jobAttrValues .append (initialMinorStatus )
1025
-
1026
- reqJDL = classAdReq .asJDL ()
1027
- classAdJob .insertAttributeInt ("JobRequirements" , reqJDL )
1028
-
1029
- jobJDL = classAdJob .asJDL ()
959
+ jobJDL = createJDLWithInitialStatus (
960
+ classAdJob , classAdReq , self .jdl2DBParameters , jobAttrs , initialStatus , initialMinorStatus
961
+ )
1030
962
1031
963
result = self .setJobJDL (jobID , jobJDL )
1032
964
if not result ["OK" ]:
1033
965
return result
1034
966
1035
967
# Adding the job in the Jobs table
1036
- result = self .insertFields ("Jobs" , jobAttrNames , jobAttrValues )
968
+ result = self .insertFields ("Jobs" , inDict = jobAttrs )
1037
969
if not result ["OK" ]:
1038
970
return result
1039
971
@@ -1076,79 +1008,22 @@ def insertNewJobIntoDB(
1076
1008
1077
1009
return retVal
1078
1010
1079
- def __checkAndPrepareJob (self , jobID , classAdJob , classAdReq , owner , ownerGroup , jobAttrNames , jobAttrValues ):
1011
+ def __checkAndPrepareJob (self , jobID , classAdJob , classAdReq , owner , ownerGroup , jobAttrs , vo ):
1080
1012
"""
1081
1013
Check Consistency of Submitted JDL and set some defaults
1082
1014
Prepare subJDL with Job Requirements
1083
1015
"""
1084
- error = ""
1085
- vo = getVOForGroup (ownerGroup )
1086
-
1087
- jdlOwner = classAdJob .getAttributeString ("Owner" )
1088
- jdlOwnerGroup = classAdJob .getAttributeString ("OwnerGroup" )
1089
- jdlVO = classAdJob .getAttributeString ("VirtualOrganization" )
1090
-
1091
- if jdlOwner and jdlOwner != owner :
1092
- error = "Wrong Owner in JDL"
1093
- elif jdlOwnerGroup and jdlOwnerGroup != ownerGroup :
1094
- error = "Wrong Owner Group in JDL"
1095
- elif jdlVO and jdlVO != vo :
1096
- error = "Wrong Virtual Organization in JDL"
1097
-
1098
- classAdJob .insertAttributeString ("Owner" , owner )
1099
- classAdJob .insertAttributeString ("OwnerGroup" , ownerGroup )
1100
-
1101
- if vo :
1102
- classAdJob .insertAttributeString ("VirtualOrganization" , vo )
1103
-
1104
- classAdReq .insertAttributeString ("OwnerGroup" , ownerGroup )
1105
- if vo :
1106
- classAdReq .insertAttributeString ("VirtualOrganization" , vo )
1016
+ retVal = checkAndPrepareJob (jobID , classAdJob , classAdReq , owner , ownerGroup , jobAttrs , vo )
1107
1017
1108
- inputDataPolicy = Operations (vo = vo ).getValue ("InputDataPolicy/InputDataModule" )
1109
- if inputDataPolicy and not classAdJob .lookupAttribute ("InputDataModule" ):
1110
- classAdJob .insertAttributeString ("InputDataModule" , inputDataPolicy )
1018
+ if not retVal ["OK" ]:
1019
+ if cmpError (retVal , EWMSSUBM ):
1020
+ resultInsert = self .setJobAttributes (jobID , list (jobAttrs ), list (jobAttrs .values ()))
1021
+ if not resultInsert ["OK" ]:
1022
+ retVal ["MinorStatus" ] += f"; { resultInsert ['Message' ]} "
1111
1023
1112
- # priority
1113
- priority = classAdJob .getAttributeInt ("Priority" )
1114
- if priority is None :
1115
- priority = 0
1116
- classAdReq .insertAttributeInt ("UserPriority" , priority )
1117
-
1118
- # CPU time
1119
- cpuTime = classAdJob .getAttributeInt ("CPUTime" )
1120
- if cpuTime is None :
1121
- opsHelper = Operations (group = ownerGroup )
1122
- cpuTime = opsHelper .getValue ("JobDescription/DefaultCPUTime" , 86400 )
1123
- classAdReq .insertAttributeInt ("CPUTime" , cpuTime )
1124
-
1125
- # platform(s)
1126
- platformList = classAdJob .getListFromExpression ("Platform" )
1127
- if platformList :
1128
- result = self .getDIRACPlatform (platformList )
1129
- if not result ["OK" ]:
1130
- return result
1131
- if result ["Value" ]:
1132
- classAdReq .insertAttributeVectorString ("Platforms" , result ["Value" ])
1024
+ return retVal
1133
1025
else :
1134
- error = "OS compatibility info not found"
1135
-
1136
- if error :
1137
- retVal = S_ERROR (EWMSSUBM , error )
1138
- retVal ["JobId" ] = jobID
1139
- retVal ["Status" ] = JobStatus .FAILED
1140
- retVal ["MinorStatus" ] = error
1141
-
1142
- jobAttrNames .append ("Status" )
1143
- jobAttrValues .append (JobStatus .FAILED )
1144
-
1145
- jobAttrNames .append ("MinorStatus" )
1146
- jobAttrValues .append (error )
1147
- resultInsert = self .setJobAttributes (jobID , jobAttrNames , jobAttrValues )
1148
- if not resultInsert ["OK" ]:
1149
- retVal ["MinorStatus" ] += f"; { resultInsert ['Message' ]} "
1150
-
1151
- return retVal
1026
+ return retVal
1152
1027
1153
1028
return S_OK ()
1154
1029
@@ -1237,11 +1112,7 @@ def rescheduleJob(self, jobID):
1237
1112
return res
1238
1113
return S_ERROR (f"Maximum number of reschedulings is reached: { self .maxRescheduling } " )
1239
1114
1240
- jobAttrNames = []
1241
- jobAttrValues = []
1242
-
1243
- jobAttrNames .append ("RescheduleCounter" )
1244
- jobAttrValues .append (rescheduleCounter )
1115
+ jobAttrs = {"RescheduleCounter" : rescheduleCounter }
1245
1116
1246
1117
# Save the job parameters for later debugging
1247
1118
result = JobMonitoringClient ().getJobParameters (jobID )
@@ -1281,14 +1152,15 @@ def rescheduleJob(self, jobID):
1281
1152
retVal ["JobID" ] = jobID
1282
1153
1283
1154
classAdJob .insertAttributeInt ("JobID" , jobID )
1155
+
1284
1156
result = self .__checkAndPrepareJob (
1285
1157
jobID ,
1286
1158
classAdJob ,
1287
1159
classAdReq ,
1288
1160
resultDict ["Owner" ],
1289
1161
resultDict ["OwnerGroup" ],
1290
- jobAttrNames ,
1291
- jobAttrValues ,
1162
+ jobAttrs ,
1163
+ getVOForGroup ( resultDict [ "OwnerGroup" ]) ,
1292
1164
)
1293
1165
1294
1166
if not result ["OK" ]:
@@ -1297,8 +1169,7 @@ def rescheduleJob(self, jobID):
1297
1169
priority = classAdJob .getAttributeInt ("Priority" )
1298
1170
if priority is None :
1299
1171
priority = 0
1300
- jobAttrNames .append ("UserPriority" )
1301
- jobAttrValues .append (priority )
1172
+ jobAttrs ["UserPriority" ] = priority
1302
1173
1303
1174
siteList = classAdJob .getListFromExpression ("Site" )
1304
1175
if not siteList :
@@ -1308,26 +1179,19 @@ def rescheduleJob(self, jobID):
1308
1179
else :
1309
1180
site = siteList [0 ]
1310
1181
1311
- jobAttrNames .append ("Site" )
1312
- jobAttrValues .append (site )
1182
+ jobAttrs ["Site" ] = site
1313
1183
1314
- jobAttrNames .append ("Status" )
1315
- jobAttrValues .append (JobStatus .RECEIVED )
1184
+ jobAttrs ["Status" ] = JobStatus .RECEIVED
1316
1185
1317
- jobAttrNames .append ("MinorStatus" )
1318
- jobAttrValues .append (JobMinorStatus .RESCHEDULED )
1186
+ jobAttrs ["MinorStatus" ] = JobMinorStatus .RESCHEDULED
1319
1187
1320
- jobAttrNames .append ("ApplicationStatus" )
1321
- jobAttrValues .append ("Unknown" )
1188
+ jobAttrs ["ApplicationStatus" ] = "Unknown"
1322
1189
1323
- jobAttrNames .append ("ApplicationNumStatus" )
1324
- jobAttrValues .append (0 )
1190
+ jobAttrs ["ApplicationNumStatus" ] = 0
1325
1191
1326
- jobAttrNames .append ("LastUpdateTime" )
1327
- jobAttrValues .append (str (datetime .datetime .utcnow ()))
1192
+ jobAttrs ["LastUpdateTime" ] = str (datetime .datetime .utcnow ())
1328
1193
1329
- jobAttrNames .append ("RescheduleTime" )
1330
- jobAttrValues .append (str (datetime .datetime .utcnow ()))
1194
+ jobAttrs ["RescheduleTime" ] = str (datetime .datetime .utcnow ())
1331
1195
1332
1196
reqJDL = classAdReq .asJDL ()
1333
1197
classAdJob .insertAttributeInt ("JobRequirements" , reqJDL )
@@ -1346,7 +1210,7 @@ def rescheduleJob(self, jobID):
1346
1210
if not result ["OK" ]:
1347
1211
return result
1348
1212
1349
- result = self .setJobAttributes (jobID , jobAttrNames , jobAttrValues , force = True )
1213
+ result = self .setJobAttributes (jobID , list ( jobAttrs ), list ( jobAttrs . values ()) , force = True )
1350
1214
if not result ["OK" ]:
1351
1215
return result
1352
1216
0 commit comments