2020from DIRAC .Core .Utilities .ClassAd .ClassAdLight import ClassAd
2121from DIRAC .Core .Utilities .Decorators import deprecated
2222from DIRAC .Core .Utilities .DErrno import EWMSJMAN , EWMSSUBM , cmpError
23- from DIRAC .Core .Utilities .ReturnValues import S_ERROR , S_OK
23+ from DIRAC .Core .Utilities .ReturnValues import S_ERROR , S_OK , convertToReturnValue , returnValueOrRaise , SErrorException
2424from DIRAC .FrameworkSystem .Client .Logger import contextLogger
2525from DIRAC .ResourceStatusSystem .Client .SiteStatus import SiteStatus
2626from DIRAC .WorkloadManagementSystem .Client import JobMinorStatus , JobStatus
@@ -106,18 +106,7 @@ def getJobParameters(self, jobID, paramList=None):
106106 Returns a dictionary with the Job Parameters.
107107 If parameterList is empty - all the parameters are returned.
108108 """
109-
110- if isinstance (jobID , (str , int )):
111- jobID = [jobID ]
112-
113- jobIDList = []
114- for jID in jobID :
115- ret = self ._escapeString (str (jID ))
116- if not ret ["OK" ]:
117- return ret
118- jobIDList .append (ret ["Value" ])
119-
120- # self.log.debug('JobDB.getParameters: Getting Parameters for jobs %s' % ','.join(jobIDList))
109+ jobIDList = [jobID ] if isinstance (jobID , (str , int )) else jobID
121110
122111 resultDict = {}
123112 if paramList :
@@ -130,7 +119,7 @@ def getJobParameters(self, jobID, paramList=None):
130119 return ret
131120 paramNameList .append (ret ["Value" ])
132121 cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})" .format (
133- "," .join (jobIDList ),
122+ "," .join (str ( int ( j )) for j in jobIDList ),
134123 "," .join (paramNameList ),
135124 )
136125 result = self ._query (cmd )
@@ -207,13 +196,13 @@ def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
207196 return S_ERROR ("JobDB.getAtticJobParameters: failed to retrieve parameters" )
208197
209198 #############################################################################
199+ @convertToReturnValue
210200 def getJobsAttributes (self , jobIDs , attrList = None ):
211201 """Get all Job(s) Attributes for a given list of jobIDs.
212202 Return a dictionary with all Job Attributes as value pairs
213203 """
214-
215204 if not jobIDs :
216- return S_OK ({})
205+ return {}
217206
218207 # If no list of attributes is given, return all attributes
219208 if not attrList :
@@ -229,28 +218,29 @@ def getJobsAttributes(self, jobIDs, attrList=None):
229218
230219 attrNameListS = []
231220 for x in attrList :
232- ret = self ._escapeString (x )
233- if not ret ["OK" ]:
234- return ret
235- x = "`" + ret ["Value" ][1 :- 1 ] + "`"
221+ x = "`" + returnValueOrRaise (self ._escapeString (x ))[1 :- 1 ] + "`"
236222 attrNameListS .append (x )
237223 attrNames = "JobID," + "," .join (attrNameListS )
238224
239- cmd = f"SELECT { attrNames } FROM Jobs WHERE JobID IN ({ ',' .join (str (jobID ) for jobID in jobIDs )} )"
240- res = self ._query (cmd )
241- if not res ["OK" ]:
242- return res
243- if not res ["Value" ]:
244- return S_OK ({})
225+ sqlCmd = "CREATE TEMPORARY TABLE to_select_Jobs (JobID INTEGER NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
226+ returnValueOrRaise (self ._update (sqlCmd ))
227+ try :
228+ sqlCmd = "INSERT INTO to_select_Jobs (JobID) VALUES ( %s )"
229+ returnValueOrRaise (self ._updatemany (sqlCmd , [(int (j ),) for j in jobIDs ]))
230+ sqlCmd = f"SELECT { attrNames } FROM Jobs JOIN to_select_Jobs USING (JobID)"
231+ result = returnValueOrRaise (self ._query (sqlCmd ))
232+ finally :
233+ sqlCmd = "DROP TEMPORARY TABLE to_select_Jobs"
234+ returnValueOrRaise (self ._update (sqlCmd ))
245235
246236 attributes = {}
247- for t_att in res [ "Value" ] :
237+ for t_att in result :
248238 jobID = int (t_att [0 ])
249239 attributes .setdefault (jobID , {})
250240 for tx , ax in zip (t_att [1 :], attrList ):
251241 attributes [jobID ].setdefault (ax , tx )
252242
253- return S_OK ( attributes )
243+ return attributes
254244
255245 #############################################################################
256246 def getJobAttributes (self , jobID , attrList = None ):
@@ -527,12 +517,10 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
527517 if not isinstance (jobID , (list , tuple )):
528518 jobIDList = [jobID ]
529519
530- jIDList = []
531- for jID in jobIDList :
532- ret = self ._escapeString (jID )
533- if not ret ["OK" ]:
534- return ret
535- jIDList .append (ret ["Value" ])
520+ try :
521+ jIDList = [int (jID ) for jID in jobIDList ]
522+ except ValueError as e :
523+ return S_ERROR (f"JobDB.setAttributes: { e } " )
536524
537525 if len (attrNames ) != len (attrValues ):
538526 return S_ERROR ("JobDB.setAttributes: incompatible Argument length" )
@@ -561,7 +549,7 @@ def setJobAttributes(self, jobID, attrNames, attrValues, update=False, myDate=No
561549 if not attr :
562550 return S_ERROR ("JobDB.setAttributes: Nothing to do" )
563551
564- cmd = f"UPDATE Jobs SET { ', ' .join (attr )} WHERE JobID in ( { ', ' .join (jIDList )} )"
552+ cmd = f"UPDATE Jobs SET { ', ' .join (attr )} WHERE JobID in ( { ', ' .join (str ( int ( j )) for j in jIDList )} )"
565553
566554 if myDate :
567555 cmd += f" AND LastUpdateTime < { myDate } "
@@ -987,44 +975,42 @@ def __checkAndPrepareJob(self, jobID, classAdJob, classAdReq, owner, ownerGroup,
987975 return S_OK ()
988976
989977 #############################################################################
978+ @convertToReturnValue
990979 def removeJobFromDB (self , jobIDs ):
991980 """
992981 Remove jobs from the Job DB and clean up all the job related data in various tables
993982 """
994-
995- # ret = self._escapeString(jobID)
996- # if not ret['OK']:
997- # return ret
998- # e_jobID = ret['Value']
999-
1000983 if not jobIDs :
1001- return S_OK ()
1002-
1003- if not isinstance (jobIDs , list ):
1004- jobIDList = [jobIDs ]
1005- else :
1006- jobIDList = jobIDs
984+ return None
985+ jobIDList = jobIDs if isinstance (jobIDs , list ) else [jobIDs ]
1007986
1008987 failedTablesList = []
1009- for table in [
1010- "InputData" ,
1011- "JobParameters" ,
1012- "AtticJobParameters" ,
1013- "HeartBeatLoggingInfo" ,
1014- "OptimizerParameters" ,
1015- "JobCommands" ,
1016- "Jobs" ,
1017- "JobJDLs" ,
1018- ]:
1019- cmd = f"DELETE FROM { table } WHERE JobID in ({ ',' .join (str (j ) for j in jobIDList )} )"
1020- result = self ._update (cmd )
1021- if not result ["OK" ]:
1022- failedTablesList .append (table )
1023988
1024- if failedTablesList :
1025- return S_ERROR (f"Errors while job removal (tables { ',' .join (failedTablesList )} )" )
989+ sqlCmd = "CREATE TEMPORARY TABLE to_delete_Jobs (JobID INT(11) UNSIGNED NOT NULL, PRIMARY KEY (JobID)) ENGINE=MEMORY;"
990+ returnValueOrRaise (self ._update (sqlCmd ))
991+ try :
992+ sqlCmd = "INSERT INTO to_delete_Jobs (JobID) VALUES ( %s )"
993+ returnValueOrRaise (self ._updatemany (sqlCmd , [(j ,) for j in jobIDList ]))
994+
995+ for table in [
996+ "InputData" ,
997+ "JobParameters" ,
998+ "AtticJobParameters" ,
999+ "HeartBeatLoggingInfo" ,
1000+ "OptimizerParameters" ,
1001+ "JobCommands" ,
1002+ "Jobs" ,
1003+ "JobJDLs" ,
1004+ ]:
1005+ sqlCmd = f"DELETE m from `{ table } ` m JOIN to_delete_Jobs t USING (JobID)"
1006+ if not self ._update (sqlCmd )["OK" ]:
1007+ failedTablesList .append (table )
1008+ finally :
1009+ sqlCmd = "DROP TEMPORARY TABLE to_delete_Jobs"
1010+ returnValueOrRaise (self ._update (sqlCmd ))
10261011
1027- return S_OK ()
1012+ if failedTablesList :
1013+ raise SErrorException (f"Errors while job removal (tables { ',' .join (failedTablesList )} )" )
10281014
10291015 #############################################################################
10301016 def rescheduleJob (self , jobID ):
0 commit comments