Skip to content

Commit 53c7f5b

Browse files
authored
Merge pull request #5955 from rupozzi/dataop
[integration] Hackaton fix: DataOperation and PilotSubmission Monitoring issues
2 parents efe4d1f + 95e6793 commit 53c7f5b

File tree

15 files changed

+196
-121
lines changed

15 files changed

+196
-121
lines changed

src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ def execute(self):
649649
return S_OK()
650650

651651
def endExecution(self):
652-
self.dataOpSender.concludeSending()
652+
return self.dataOpSender.concludeSending()
653653

654654
def __sendAccounting(self, ftsJob):
655655

src/DIRAC/DataManagementSystem/Client/DataManager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1877,4 +1877,4 @@ def getReplica(self, lfn, storageElementName, localPath=False):
18771877
return self.__executeIfReplicaExists(storageElementName, lfn, "getFile", localPath=localPath)
18781878

18791879
def __del__(self):
1880-
self.dataOpSender.concludeSending()
1880+
return self.dataOpSender.concludeSending()

src/DIRAC/MonitoringSystem/Client/DataOperationSender.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
4141
"""
4242
if "Monitoring" in self.monitoringOption:
4343
baseDict["ExecutionSite"] = DIRAC.siteName()
44+
baseDict["Channel"] = baseDict["Source"] + "->" + baseDict["Destination"]
4445
self.dataOperationReporter.addRecord(baseDict)
4546
if commitFlag or delayedCommit:
4647
result = self.dataOperationReporter.commit()
4748
sLog.debug("Committing data operation to monitoring")
4849
if not result["OK"]:
4950
sLog.error("Could not commit data operation to monitoring", result["Message"])
50-
return result
51-
sLog.debug("Done committing to monitoring")
51+
else:
52+
sLog.debug("Done committing to monitoring")
5253

5354
if "Accounting" in self.monitoringOption:
5455
self.dataOp.setValuesFromDict(baseDict)
@@ -80,13 +81,20 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
8081

8182
return S_OK()
8283

83-
# Call this method in order to commit all records added but not yet committed to Accounting
84+
# Call this method in order to commit all records added but not yet committed to Accounting and Monitoring
8485
def concludeSending(self):
86+
if "Monitoring" in self.monitoringOption:
87+
result = self.dataOperationReporter.commit()
88+
sLog.debug("Committing data operation to monitoring")
89+
if not result["OK"]:
90+
sLog.error("Could not commit data operation to monitoring", result["Message"])
91+
else:
92+
sLog.debug("Done committing to monitoring")
8593
if "Accounting" in self.monitoringOption:
8694
result = gDataStoreClient.commit()
8795
sLog.debug("Concluding the sending and committing data operation to accounting")
8896
if not result["OK"]:
8997
sLog.error("Could not commit data operation to accounting", result["Message"])
9098
return result
91-
sLog.debug("Done committing to accounting")
92-
return S_OK()
99+
sLog.debug("Done committing to accounting")
100+
return result

src/DIRAC/MonitoringSystem/Client/Types/DataOperation.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ def __init__(self):
1818
"Destination",
1919
"Protocol",
2020
"FinalStatus",
21+
"Channel",
2122
]
2223

2324
self.monitoringFields = [

src/DIRAC/MonitoringSystem/Client/Types/PilotSubmissionMonitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Monitoring Type for Pilot Submission
2+
Monitoring Type for Pilot Submission.
33
"""
44

55
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
@@ -32,7 +32,7 @@ def __init__(self):
3232
"Queue": {"type": "keyword"},
3333
"Status": {"type": "keyword"},
3434
"NumTotal": {"type": "long"},
35-
"NumSucceded": {"type": "long"},
35+
"NumSucceeded": {"type": "long"},
3636
}
3737
)
3838

src/DIRAC/MonitoringSystem/Service/MonitoringHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def transfer_toClient(self, fileId, token, fileHelper):
109109
if len(fileId) > 5 and fileId[1] == ":":
110110
gLogger.info("Seems the file request is a plot generation request!")
111111
try:
112-
result = self.__generatePlotFromFileId(fileId)
112+
result = self._generatePlotFromFileId(fileId)
113113
except Exception as e: # pylint: disable=broad-except
114114
gLogger.exception("Exception while generating plot", str(e))
115115
result = S_ERROR("Error while generating plot: %s" % str(e))

src/DIRAC/MonitoringSystem/private/DBUtils.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,47 @@ def _accumulate(granularity, startEpoch, endEpoch, dataDict):
176176
lastValue += currentDict[timeEpoch]
177177
currentDict[timeEpoch] = lastValue
178178
return dataDict
179+
180+
def stripDataField(self, dataDict, fieldId):
181+
"""
182+
Strip <fieldId> data and sum the rest as it was data from one key
183+
184+
:param dict dataDict: dictionary of the form::
185+
186+
{ 'key' : { <timeEpoch1>: [1, 2, 3],
187+
<timeEpoch2>: [3, 4, 5].. } }
188+
189+
The dataDict is modified in this function and the return structure is:
190+
191+
.. code-block :: python
192+
193+
dataDict : { 'key' : { <timeEpoch1>: 1,
194+
<timeEpoch2>: 3.. } }
195+
196+
:param int fieldId:
197+
198+
:returns: list of dictionaries
199+
200+
.. code-block:: python
201+
202+
[ { <timeEpoch1>: 2, <timeEpoch2>: 4... }
203+
{ <timeEpoch1>: 3, <timeEpoch2>): 5... } ]
204+
205+
:rtype: python:list
206+
207+
"""
208+
remainingData = [{}] # Hack for empty data
209+
for key in dataDict:
210+
for timestamp in dataDict[key]:
211+
for iPos in dataDict[key][timestamp]:
212+
remainingData.append({})
213+
break
214+
break
215+
for key in dataDict:
216+
for timestamp in dataDict[key]:
217+
strippedField = float(dataDict[key][timestamp].pop(fieldId))
218+
for iPos, value in enumerate(dataDict[key][timestamp]):
219+
remainingData[iPos].setdefault(timestamp, 0.0)
220+
remainingData[iPos] += float(value)
221+
dataDict[key][timestamp] = strippedField
222+
return remainingData

src/DIRAC/MonitoringSystem/private/Plotters/BasePlotter.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
It used to create several plots
2+
It is used to create several plots
33
"""
44
import time
55
import copy
@@ -107,7 +107,7 @@ def __init__(self, db, setup, extraArgs=None):
107107

108108
def generate(self, reportRequest):
109109
"""
110-
It retrives the data from the database and create the plot
110+
It retrives the data from the database and creates the plot
111111
112112
:param dict reportRequest: contains the plot attributes
113113
"""
@@ -347,7 +347,7 @@ def __checkThumbnailMetadata(self, metadata):
347347
return False
348348

349349
def __plotData(self, filename, dataDict, metadata, funcToPlot):
350-
"""It create the plot.
350+
"""It creates the plot.
351351
352352
:param str filename: the name of the file which contains the plot
353353
:param dict dataDict: data used to crate the plot
@@ -379,7 +379,7 @@ def _generateTimedStackedBarPlot(self, filename, dataDict, metadata):
379379

380380
def _generateQualityPlot(self, filename, dataDict, metadata):
381381
"""
382-
it create a quality plot
382+
it creates a quality plot
383383
"""
384384
return self.__plotData(filename, dataDict, metadata, generateQualityPlot)
385385

@@ -397,7 +397,7 @@ def _generatePiePlot(self, filename, dataDict, metadata):
397397

398398
def _generateStackedLinePlot(self, filename, dataDict, metadata):
399399
"""
400-
It create a stacked lien plot
400+
It creates a stacked lien plot
401401
"""
402402
return self.__plotData(filename, dataDict, metadata, generateStackedLinePlot)
403403

@@ -413,3 +413,28 @@ def _fillWithZero(self, granularity, startEpoch, endEpoch, dataDict):
413413
if timeEpoch not in currentDict:
414414
currentDict[timeEpoch] = 0
415415
return dataDict
416+
417+
def _calculateEfficiencyDict(self, totDataDict, dataDict):
418+
"""
419+
It returns a dict with efficiency calculated from each key in totDataDict and dataDict
420+
"""
421+
for item, val in dataDict.items():
422+
totVal = totDataDict.get(item, {})
423+
try:
424+
dataDict[item] = {key: (float(val[key]) / float(totVal[key])) * 100 for key in val if key in totVal}
425+
except (ZeroDivisionError, TypeError):
426+
gLogger.warn("Error in ", val)
427+
gLogger.warn("Dividing by zero or using None type field. Skipping the key of this dict item...")
428+
return dataDict
429+
430+
def _sumDictValues(self, dataDict):
431+
"""
432+
Sums the values of each item in `dataDict`.
433+
Returns the dictionary with the same keys and the values replaced by their sum.
434+
"""
435+
for key, values in dataDict.items():
436+
sum = 0
437+
for val in values.values():
438+
sum += val
439+
dataDict[key] = sum
440+
return dataDict

0 commit comments

Comments
 (0)