Skip to content

Commit e33c0bb

Browse files
committed
fix: fixes to StatesAccountingAgent for new fields
1 parent 21b5ff8 commit e33c0bb

File tree

2 files changed

+21
-17
lines changed

2 files changed

+21
-17
lines changed

src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ def setValuesFromDict(self, dataDict):
9797
if errKeys:
9898
return S_ERROR(f"Key(s) {', '.join(errKeys)} are not valid")
9999
for key in dataDict:
100-
self.setValueByKey(key, dataDict[key])
100+
res = self.setValueByKey(key, dataDict[key])
101+
if not res["OK"]:
102+
return res
101103
return S_OK()
102104

103105
def getValue(self, key):

src/DIRAC/WorkloadManagementSystem/Agent/StatesAccountingAgent.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,22 @@ class StatesAccountingAgent(AgentModule):
3131
__summaryKeyFieldsMapping = [
3232
"Status",
3333
"Site",
34-
"User",
35-
"UserGroup",
34+
"Owner",
35+
"OwnerGroup",
3636
"JobGroup",
3737
"VO",
3838
"JobType",
3939
"ApplicationStatus",
4040
"MinorStatus",
4141
]
42-
__summaryDefinedFields = [("ApplicationStatus", "unset"), ("MinorStatus", "unset")]
43-
__summaryValueFieldsMapping = ["Jobs", "Reschedules"]
44-
__renameFieldsMapping = {"JobType": "JobSplitType"}
42+
__summaryValueFieldsMapping = ["JobCount", "RescheduleSum"]
43+
__renameFieldsMapping = {
44+
"Owner": "User",
45+
"OwnerGroup": "UserGroup",
46+
"JobType": "JobSplitType",
47+
"JobCount": "Jobs",
48+
"RescheduleSum": "Reschedules",
49+
}
4550

4651
# PilotsHistory fields
4752
__pilotsMapping = ["GridSite", "ComputingElement", "GridType", "Status", "VO", "NumOfPilots"]
@@ -95,7 +100,7 @@ def execute(self):
95100
f"{result['Message']}: won't commit PilotsHistory at this cycle",
96101
)
97102

98-
values = result["Value"][1]
103+
values = result["Value"]
99104
for record in values:
100105
rD = {}
101106
for iP, _ in enumerate(self.__pilotsMapping):
@@ -111,32 +116,29 @@ def execute(self):
111116

112117
# WMSHistory to Monitoring or Accounting
113118
self.log.info(f"Committing WMSHistory to {'and '.join(self.jobMonitoringOption)} backend")
114-
result = JobDB()._query("SELECT * FROM JobsHistorySummary")
119+
result = JobDB()._query(
120+
f"SELECT {','.join(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping)} FROM JobsHistorySummary ORDER BY {','.join(self.__summaryKeyFieldsMapping)}"
121+
)
115122
if not result["OK"]:
116123
self.log.error("Can't get the JobDB summary", f"{result['Message']}: won't commit WMSHistory at this cycle")
117124
return S_ERROR()
118125

119-
values = result["Value"][1]
120-
126+
values = result["Value"]
121127
now = datetime.datetime.utcnow()
122128
self.log.info("Start sending WMSHistory records")
123129
for record in values:
124130
rD = {}
125-
for fV in self.__summaryDefinedFields:
126-
rD[fV[0]] = fV[1]
127-
for iP, _ in enumerate(self.__summaryKeyFieldsMapping):
128-
fieldName = self.__summaryKeyFieldsMapping[iP]
131+
for iP, fieldName in enumerate(self.__summaryKeyFieldsMapping + self.__summaryValueFieldsMapping):
129132
rD[self.__renameFieldsMapping.get(fieldName, fieldName)] = record[iP]
130-
record = record[len(self.__summaryKeyFieldsMapping) :]
131-
for iP, _ in enumerate(self.__summaryValueFieldsMapping):
132-
rD[self.__summaryValueFieldsMapping[iP]] = int(record[iP])
133133

134134
for backend in self.datastores:
135135
if backend.lower() == "monitoring":
136136
rD["timestamp"] = int(TimeUtilities.toEpochMilliSeconds(now))
137137
self.datastores["Monitoring"].addRecord(rD)
138138

139139
elif backend.lower() == "accounting":
140+
rD.pop("VO") # Remove VO field for Accounting
141+
rD.pop("ApplicationStatus") # Remove ApplicationStatus for Accounting
140142
acWMS = WMSHistory()
141143
acWMS.setStartTime(now)
142144
acWMS.setEndTime(now)

0 commit comments

Comments
 (0)