17
17
getGroupedPilotSummary()
18
18
19
19
"""
20
- import threading
21
20
import datetime
22
21
import decimal
22
+ import threading
23
23
24
- from DIRAC import S_OK , S_ERROR
25
- from DIRAC .Core .Base .DB import DB
26
24
import DIRAC .Core .Utilities .TimeUtilities as TimeUtilities
27
- from DIRAC .Core .Utilities import DErrno
25
+ from DIRAC import S_ERROR , S_OK
26
+ from DIRAC .ConfigurationSystem .Client .Helpers .Registry import getDNForUsername , getUsernameForDN , getVOForGroup
28
27
from DIRAC .ConfigurationSystem .Client .Helpers .Resources import getCESiteMapping
29
- from DIRAC .ConfigurationSystem . Client . Helpers . Registry import getUsernameForDN , getDNForUsername , getVOForGroup
30
- from DIRAC .ResourceStatusSystem . Client . SiteStatus import SiteStatus
28
+ from DIRAC .Core . Base . DB import DB
29
+ from DIRAC .Core . Utilities import DErrno
31
30
from DIRAC .Core .Utilities .MySQL import _quotedList
31
+ from DIRAC .ResourceStatusSystem .Client .SiteStatus import SiteStatus
32
32
from DIRAC .WorkloadManagementSystem .Client import PilotStatus
33
33
34
34
@@ -112,12 +112,8 @@ def setPilotStatus(
112
112
setList .append (f"GridSite='{ res ['Value' ][destination ]} '" )
113
113
114
114
set_string = "," .join (setList )
115
- req = "UPDATE PilotAgents SET " + set_string + f" WHERE PilotJobReference='{ pilotRef } '"
116
- result = self ._update (req , conn = conn )
117
- if not result ["OK" ]:
118
- return result
119
-
120
- return S_OK ()
115
+ req = f"UPDATE PilotAgents SET { set_string } WHERE PilotJobReference='{ pilotRef } '"
116
+ return self ._update (req , conn = conn )
121
117
122
118
# ###########################################################################################
123
119
# FIXME: this can't work ATM because of how the DB table is made. Maybe it would be useful later.
@@ -330,9 +326,9 @@ def getPilotInfo(self, pilotRef=False, parentId=False, conn=False, paramNames=[]
330
326
pilotIDs = []
331
327
for row in result ["Value" ]:
332
328
pilotDict = {}
333
- for i in range ( len ( parameters ) ):
334
- pilotDict [parameters [ i ] ] = row [i ]
335
- if parameters [ i ] == "PilotID" :
329
+ for i , par in enumerate ( parameters ):
330
+ pilotDict [par ] = row [i ]
331
+ if par == "PilotID" :
336
332
pilotIDs .append (row [i ])
337
333
resDict [row [0 ]] = pilotDict
338
334
@@ -341,8 +337,7 @@ def getPilotInfo(self, pilotRef=False, parentId=False, conn=False, paramNames=[]
341
337
return S_OK (resDict )
342
338
343
339
jobsDict = result ["Value" ]
344
- for pilotRef in resDict :
345
- pilotInfo = resDict [pilotRef ]
340
+ for pilotRef , pilotInfo in resDict .items ():
346
341
pilotID = pilotInfo ["PilotID" ]
347
342
if pilotID in jobsDict :
348
343
pilotInfo ["Jobs" ] = jobsDict [pilotID ]
@@ -367,16 +362,14 @@ def setPilotBenchmark(self, pilotRef, mark):
367
362
"""Set the pilot agent benchmark"""
368
363
369
364
req = f"UPDATE PilotAgents SET BenchMark='{ mark :f} ' WHERE PilotJobReference='{ pilotRef } '"
370
- result = self ._update (req )
371
- return result
365
+ return self ._update (req )
372
366
373
367
##########################################################################################
374
368
def setAccountingFlag (self , pilotRef , mark = "True" ):
375
369
"""Set the pilot AccountingSent flag"""
376
370
377
371
req = f"UPDATE PilotAgents SET AccountingSent='{ mark } ' WHERE PilotJobReference='{ pilotRef } '"
378
- result = self ._update (req )
379
- return result
372
+ return self ._update (req )
380
373
381
374
##########################################################################################
382
375
def storePilotOutput (self , pilotRef , output , error ):
@@ -409,21 +402,19 @@ def getPilotOutput(self, pilotRef):
409
402
result = self ._query (req )
410
403
if not result ["OK" ]:
411
404
return result
412
- else :
413
- if result ["Value" ]:
414
- try :
415
- stdout = result ["Value" ][0 ][0 ].decode () # account for the use of BLOBs
416
- error = result ["Value" ][0 ][1 ].decode ()
417
- except AttributeError :
418
- stdout = result ["Value" ][0 ][0 ]
419
- error = result ["Value" ][0 ][1 ]
420
- if stdout == '""' :
421
- stdout = ""
422
- if error == '""' :
423
- error = ""
424
- return S_OK ({"StdOut" : stdout , "StdErr" : error })
425
- else :
426
- return S_ERROR ("PilotJobReference " + pilotRef + " not found" )
405
+ if not result ["Value" ]:
406
+ return S_ERROR (f"PilotJobReference { pilotRef } not found" )
407
+ try :
408
+ stdout = result ["Value" ][0 ][0 ].decode () # account for the use of BLOBs
409
+ error = result ["Value" ][0 ][1 ].decode ()
410
+ except AttributeError :
411
+ stdout = result ["Value" ][0 ][0 ]
412
+ error = result ["Value" ][0 ][1 ]
413
+ if stdout == '""' :
414
+ stdout = ""
415
+ if error == '""' :
416
+ error = ""
417
+ return S_OK ({"StdOut" : stdout , "StdErr" : error })
427
418
428
419
##########################################################################################
429
420
def __getPilotID (self , pilotRef ):
@@ -434,19 +425,17 @@ def __getPilotID(self, pilotRef):
434
425
result = self ._query (req )
435
426
if not result ["OK" ]:
436
427
return 0
437
- else :
438
- if result ["Value" ]:
439
- return int (result ["Value" ][0 ][0 ])
440
- return 0
441
- else :
442
- refString = "," .join (["'" + ref + "'" for ref in pilotRef ])
443
- req = f"SELECT PilotID from PilotAgents WHERE PilotJobReference in ( { refString } )"
444
- result = self ._query (req )
445
- if not result ["OK" ]:
446
- return []
447
428
if result ["Value" ]:
448
- return [x [0 ] for x in result ["Value" ]]
429
+ return int (result ["Value" ][0 ][0 ])
430
+ return 0
431
+ refString = "," .join (["'" + ref + "'" for ref in pilotRef ])
432
+ req = f"SELECT PilotID from PilotAgents WHERE PilotJobReference in ( { refString } )"
433
+ result = self ._query (req )
434
+ if not result ["OK" ]:
449
435
return []
436
+ if result ["Value" ]:
437
+ return [x [0 ] for x in result ["Value" ]]
438
+ return []
450
439
451
440
##########################################################################################
452
441
def setJobForPilot (self , jobID , pilotRef , site = None , updateStatus = True ):
@@ -455,17 +444,13 @@ def setJobForPilot(self, jobID, pilotRef, site=None, updateStatus=True):
455
444
pilotID = self .__getPilotID (pilotRef )
456
445
if pilotID :
457
446
if updateStatus :
458
- reason = "Report from job %d" % int ( jobID )
447
+ reason = f "Report from job { jobID } "
459
448
result = self .setPilotStatus (pilotRef , status = PilotStatus .RUNNING , statusReason = reason , gridSite = site )
460
449
if not result ["OK" ]:
461
450
return result
462
- req = "INSERT INTO JobToPilotMapping (PilotID,JobID,StartTime) VALUES (%d,%d,UTC_TIMESTAMP())" % (
463
- pilotID ,
464
- jobID ,
465
- )
451
+ req = f"INSERT INTO JobToPilotMapping (PilotID,JobID,StartTime) VALUES ({ int (pilotID )} , { int (jobID )} , UTC_TIMESTAMP())"
466
452
return self ._update (req )
467
- else :
468
- return S_ERROR ("PilotJobReference " + pilotRef + " not found" )
453
+ return S_ERROR (f"PilotJobReference { pilotRef } not found" )
469
454
470
455
##########################################################################################
471
456
def setCurrentJobID (self , pilotRef , jobID ):
@@ -568,16 +553,15 @@ def getPilotSummary(self, startdate="", enddate=""):
568
553
result = self ._query (req )
569
554
if not result ["OK" ]:
570
555
return result
571
- else :
572
- if result ["Value" ]:
573
- for res in result ["Value" ]:
574
- site = res [0 ]
575
- count = res [1 ]
576
- if site :
577
- if site not in summary_dict :
578
- summary_dict [site ] = {}
579
- summary_dict [site ][st ] = int (count )
580
- summary_dict ["Total" ][st ] += int (count )
556
+ if result ["Value" ]:
557
+ for res in result ["Value" ]:
558
+ site = res [0 ]
559
+ count = res [1 ]
560
+ if site :
561
+ if site not in summary_dict :
562
+ summary_dict [site ] = {}
563
+ summary_dict [site ][st ] = int (count )
564
+ summary_dict ["Total" ][st ] += int (count )
581
565
582
566
# Get aborted pilots in the last hour, day
583
567
req = "SELECT DestinationSite,count(DestinationSite) FROM PilotAgents WHERE Status='Aborted' AND "
@@ -723,14 +707,12 @@ def _getElementStatus(self, total, eff):
723
707
if total > 10 :
724
708
if eff < 25.0 :
725
709
return "Bad"
726
- elif eff < 60.0 :
710
+ if eff < 60.0 :
727
711
return "Poor"
728
- elif eff < 85.0 :
712
+ if eff < 85.0 :
729
713
return "Fair"
730
- else :
731
- return "Good"
732
- else :
733
- return "Idle"
714
+ return "Good"
715
+ return "Idle"
734
716
735
717
def getPilotSummaryWeb (self , selectDict , sortList , startItem , maxItems ):
736
718
"""Get summary of the pilot jobs status by CE/site in a standard structure"""
@@ -861,26 +843,26 @@ def getPilotSummaryWeb(self, selectDict, sortList, startItem, maxItems):
861
843
862
844
records = []
863
845
siteSumDict = {}
864
- for site in resultDict :
846
+ for site , ces in resultDict . items () :
865
847
sumDict = {}
866
848
for state in allStateNames :
867
849
if state not in sumDict :
868
850
sumDict [state ] = 0
869
851
sumDict ["Total" ] = 0
870
- for ce in resultDict [ site ] :
852
+ for ce , ceDict in ces . items () :
871
853
itemList = [site , ce ]
872
854
total = 0
873
855
for state in allStateNames :
874
- itemList .append (resultDict [ site ][ ce ] [state ])
875
- sumDict [state ] += resultDict [ site ][ ce ] [state ]
856
+ itemList .append (ceDict [state ])
857
+ sumDict [state ] += ceDict [state ]
876
858
if state == PilotStatus .DONE :
877
- done = resultDict [ site ][ ce ] [state ]
859
+ done = ceDict [state ]
878
860
if state == "Done_Empty" :
879
- empty = resultDict [ site ][ ce ] [state ]
861
+ empty = ceDict [state ]
880
862
if state == PilotStatus .ABORTED :
881
- aborted = resultDict [ site ][ ce ] [state ]
882
- if state != "Aborted_Hour" and state != "Done_Empty" :
883
- total += resultDict [ site ][ ce ] [state ]
863
+ aborted = ceDict [state ]
864
+ if state not in ( "Aborted_Hour" , "Done_Empty" ) :
865
+ total += ceDict [state ]
884
866
885
867
sumDict ["Total" ] += total
886
868
# Add the total number of pilots seen in the last day
@@ -915,10 +897,10 @@ def getPilotSummaryWeb(self, selectDict, sortList, startItem, maxItems):
915
897
else :
916
898
itemList .append ("Idle" )
917
899
918
- if len (resultDict [ site ] ) == 1 or expand_site :
900
+ if len (ces ) == 1 or expand_site :
919
901
records .append (itemList )
920
902
921
- if len (resultDict [ site ] ) > 1 and not expand_site :
903
+ if len (ces ) > 1 and not expand_site :
922
904
itemList = [site , "Multiple" ]
923
905
for state in allStateNames + ["Total" ]:
924
906
if state in sumDict :
@@ -1210,7 +1192,7 @@ def __init__(self, columnList):
1210
1192
1211
1193
self ._columns += self .pstates # MySQL._query() does not give us column names, sadly.
1212
1194
1213
- def buildSQL (self , selectDict = None ):
1195
+ def buildSQL (self ):
1214
1196
"""
1215
1197
Build an SQL query to create a table with all status counts in one row, ("pivoted")
1216
1198
grouped by columns in the column list.
0 commit comments