Skip to content

Commit 38c6f23

Browse files
committed
feat: Adding channel to monitoring records and fixed the plotters
1 parent a470cbb commit 38c6f23

File tree

7 files changed

+146
-39
lines changed

7 files changed

+146
-39
lines changed

src/DIRAC/MonitoringSystem/Client/DataOperationSender.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
"""
66

7+
from email.mime import base
78
import DIRAC
89
from DIRAC import S_OK, gLogger
910

@@ -41,6 +42,7 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
4142
"""
4243
if "Monitoring" in self.monitoringOption:
4344
baseDict["ExecutionSite"] = DIRAC.siteName()
45+
baseDict["Channel"] = baseDict["Source"] + "->" + baseDict["Destination"]
4446
self.dataOperationReporter.addRecord(baseDict)
4547
if commitFlag or delayedCommit:
4648
result = self.dataOperationReporter.commit()
@@ -80,13 +82,20 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
8082

8183
return S_OK()
8284

83-
# Call this method in order to commit all records added but not yet committed to Accounting
85+
# Call this method in order to commit all records added but not yet committed to Accounting and Monitoring
8486
def concludeSending(self):
87+
if "Monitoring" in self.monitoringOption:
88+
result = self.dataOperationReporter.commit()
89+
sLog.debug("Committing data operation to monitoring")
90+
if not result["OK"]:
91+
sLog.error("Could not commit data operation to monitoring", result["Message"])
92+
else:
93+
sLog.debug("Done committing to monitoring")
8594
if "Accounting" in self.monitoringOption:
8695
result = gDataStoreClient.commit()
8796
sLog.debug("Concluding the sending and committing data operation to accounting")
8897
if not result["OK"]:
8998
sLog.error("Could not commit data operation to accounting", result["Message"])
9099
return result
91-
sLog.debug("Done committing to accounting")
92-
return S_OK()
100+
sLog.debug("Done committing to accounting")
101+
return result

src/DIRAC/MonitoringSystem/Client/Types/PilotSubmissionMonitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
Monitoring Type for Pilot Submission
2+
Monitoring Type for Pilot Submission.
33
"""
44

55
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
@@ -32,7 +32,7 @@ def __init__(self):
3232
"Queue": {"type": "keyword"},
3333
"Status": {"type": "keyword"},
3434
"NumTotal": {"type": "long"},
35-
"NumSucceded": {"type": "long"},
35+
"NumSucceeded": {"type": "long"},
3636
}
3737
)
3838

src/DIRAC/MonitoringSystem/Service/MonitoringHandler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def transfer_toClient(self, fileId, token, fileHelper):
109109
if len(fileId) > 5 and fileId[1] == ":":
110110
gLogger.info("Seems the file request is a plot generation request!")
111111
try:
112-
result = self.__generatePlotFromFileId(fileId)
112+
result = self._generatePlotFromFileId(fileId)
113113
except Exception as e: # pylint: disable=broad-except
114114
gLogger.exception("Exception while generating plot", str(e))
115115
result = S_ERROR("Error while generating plot: %s" % str(e))

src/DIRAC/MonitoringSystem/private/Plotters/BasePlotter.py

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
It used to create several plots
2+
It is used to creates several plots
33
"""
44
import time
55
import copy
@@ -107,7 +107,7 @@ def __init__(self, db, setup, extraArgs=None):
107107

108108
def generate(self, reportRequest):
109109
"""
110-
It retrives the data from the database and create the plot
110+
It retrives the data from the database and creates the plot
111111
112112
:param dict reportRequest: contains the plot attributes
113113
"""
@@ -347,7 +347,7 @@ def __checkThumbnailMetadata(self, metadata):
347347
return False
348348

349349
def __plotData(self, filename, dataDict, metadata, funcToPlot):
350-
"""It create the plot.
350+
"""It creates the plot.
351351
352352
:param str filename: the name of the file which contains the plot
353353
:param dict dataDict: data used to crate the plot
@@ -379,7 +379,7 @@ def _generateTimedStackedBarPlot(self, filename, dataDict, metadata):
379379

380380
def _generateQualityPlot(self, filename, dataDict, metadata):
381381
"""
382-
it create a quality plot
382+
it creates a quality plot
383383
"""
384384
return self.__plotData(filename, dataDict, metadata, generateQualityPlot)
385385

@@ -397,7 +397,7 @@ def _generatePiePlot(self, filename, dataDict, metadata):
397397

398398
def _generateStackedLinePlot(self, filename, dataDict, metadata):
399399
"""
400-
It create a stacked lien plot
400+
It creates a stacked lien plot
401401
"""
402402
return self.__plotData(filename, dataDict, metadata, generateStackedLinePlot)
403403

@@ -413,3 +413,26 @@ def _fillWithZero(self, granularity, startEpoch, endEpoch, dataDict):
413413
if timeEpoch not in currentDict:
414414
currentDict[timeEpoch] = 0
415415
return dataDict
416+
417+
def _calculateEfficiencyDict(self, totDataDict, dataDict):
418+
"""
419+
It returns a dict with efficiency calculated from each key in totDataDict and dataDict
420+
"""
421+
for (totItem, totVal), (item, val) in zip(totDataDict.items(), dataDict.items()):
422+
try:
423+
dataDict[item] = {key: (float(val[key]) / float(totVal[key])) * 100 for key in val if key in totVal}
424+
except ZeroDivisionError and TypeError:
425+
gLogger.warn("Dividing by zero or using None type field. Skipping the key of this dict item...")
426+
return dataDict
427+
428+
def _sumDictValues(self, dataDict):
429+
"""
430+
Sums all the values of the keys of each item in the dict.
431+
Returns a dict with item-value pair and removes the keys.
432+
"""
433+
for item, key in dataDict.items():
434+
sum = 0
435+
for val in key:
436+
sum += key[val]
437+
dataDict[item] = sum
438+
return dataDict

src/DIRAC/MonitoringSystem/private/Plotters/DataOperationPlotter.py

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
"""
2-
This class is used to define the plot using the plot attributes.
2+
this class is used to define the plot using the plot attributes.
33
"""
44

55
from DIRAC import S_OK
@@ -19,6 +19,7 @@ class DataOperationPlotter(BasePlotter):
1919
"""
2020

2121
_typeName = "DataOperation"
22+
2223
_typeKeyFields = DataOperation().keyFields
2324

2425
_reportSuceededTransfersName = "Successful transfers"
@@ -32,6 +33,11 @@ def _reportFailedTransfers(self, reportRequest):
3233
return self.__reportTransfers(reportRequest, "Failed", ("Succeeded", 1))
3334

3435
def __reportTransfers(self, reportRequest, titleType, togetherFieldsToPlot):
36+
"""It is used to retrieve the data from the database.
37+
38+
:param dict reportRequest: contains attributes used to create the plot.
39+
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
40+
"""
3541
retVal = self._getTimedData(
3642
reportRequest["startTime"],
3743
reportRequest["endTime"],
@@ -68,38 +74,51 @@ def __plotTransfers(self, reportRequest, plotInfo, filename, titleType, together
6874
}
6975
return self._generateTimedStackedBarPlot(filename, plotInfo["graphDataDict"], metadata)
7076

71-
_reportQualityName = "Efficiency by protocol"
77+
_reportQualityName = "Transfer Efficiency"
7278

7379
def _reportQuality(self, reportRequest):
74-
selectFields = ["TransferOK", "TransferTotal"]
80+
"""It is used to retrieve the data from the database.
81+
82+
:param dict reportRequest: contains attributes used to create the plot.
83+
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
84+
"""
85+
# Retrieve the number of succeded transfers
7586
retVal = self._getTimedData(
7687
reportRequest["startTime"],
7788
reportRequest["endTime"],
78-
selectFields,
89+
"TransferOK",
90+
preCondDict=reportRequest["condDict"],
91+
metadataDict=None,
92+
)
93+
# Retrieve the number of total transfers
94+
retTotVal = self._getTimedData(
95+
reportRequest["startTime"],
96+
reportRequest["endTime"],
97+
"TransferTotal",
7998
preCondDict=reportRequest["condDict"],
8099
metadataDict=None,
81100
)
82101
if not retVal["OK"]:
83102
return retVal
103+
if not retTotVal["OK"]:
104+
return retTotVal
84105
dataDict, granularity = retVal["Value"]
85-
if len(dataDict) > 1:
86-
# Get the total for the plot
87-
selectFields = ["TransferOK", "TransferTotal"]
88-
retVal = self._getTimedData(
89-
reportRequest["startTime"],
90-
reportRequest["endTime"],
91-
selectFields,
92-
preCondDict=reportRequest["condDict"],
93-
metadataDict=None,
94-
)
95-
if not retVal["OK"]:
96-
return retVal
97-
totalDict = retVal["Value"][0]
98-
for key in totalDict:
99-
dataDict[key] = totalDict[key]
100-
return S_OK({"data": dataDict, "granularity": granularity})
106+
totDataDict, granularity = retTotVal["Value"]
107+
# Check that the dicts are not empty
108+
if bool(dataDict) and bool(totDataDict):
109+
# Return the efficiency in dataDict
110+
effDict = self._calculateEfficiencyDict(totDataDict, dataDict)
111+
return S_OK({"data": effDict, "granularity": granularity})
101112

102113
def _plotQuality(self, reportRequest, plotInfo, filename):
114+
"""
115+
Make 2 dimensional pilotSubmission efficiency plot
116+
117+
:param dict reportRequest: Condition to select data
118+
:param dict plotInfo: Data for plot.
119+
:param str filename: File name
120+
"""
121+
103122
metadata = {
104123
"title": "Transfer quality by %s" % reportRequest["grouping"],
105124
"starttime": reportRequest["startTime"],
@@ -111,6 +130,11 @@ def _plotQuality(self, reportRequest, plotInfo, filename):
111130
_reportTransferedDataName = "Cumulative transferred data"
112131

113132
def _reportTransferedData(self, reportRequest):
133+
"""It is used to retrieve the data from the database.
134+
135+
:param dict reportRequest: contains attributes used to create the plot.
136+
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
137+
"""
114138
retVal = self._getTimedData(
115139
reportRequest["startTime"],
116140
reportRequest["endTime"],
@@ -131,6 +155,13 @@ def _reportTransferedData(self, reportRequest):
131155
)
132156

133157
def _plotTransferedData(self, reportRequest, plotInfo, filename):
158+
"""It creates the plot.
159+
160+
:param dict reportRequest: plot attributes
161+
:param dict plotInfo: contains all the data which are used to create the plot
162+
:param str filename:
163+
:return: S_OK or S_ERROR { 'plot' : value1, 'thumbnail' : value2 } value1 and value2 are TRUE/FALSE
164+
"""
134165
metadata = {
135166
"title": "Transfered data by %s" % reportRequest["grouping"],
136167
"starttime": reportRequest["startTime"],
@@ -142,6 +173,11 @@ def _plotTransferedData(self, reportRequest, plotInfo, filename):
142173
return self._generateCumulativePlot(filename, plotInfo["graphDataDict"], metadata)
143174

144175
def _reportThroughput(self, reportRequest):
176+
"""It is used to retrieve the data from the database.
177+
178+
:param dict reportRequest: contains attributes used to create the plot.
179+
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
180+
"""
145181
retVal = self._getTimedData(
146182
reportRequest["startTime"],
147183
reportRequest["endTime"],
@@ -162,6 +198,13 @@ def _reportThroughput(self, reportRequest):
162198
)
163199

164200
def _plotThroughput(self, reportRequest, plotInfo, filename):
201+
"""It creates the plot.
202+
203+
:param dict reportRequest: plot attributes
204+
:param dict plotInfo: contains all the data which are used to create the plot
205+
:param str filename:
206+
:return: S_OK or S_ERROR { 'plot' : value1, 'thumbnail' : value2 } value1 and value2 are TRUE/FALSE
207+
"""
165208
metadata = {
166209
"title": "Throughput by %s" % reportRequest["grouping"],
167210
"ylabel": plotInfo["unit"],
@@ -174,21 +217,34 @@ def _plotThroughput(self, reportRequest, plotInfo, filename):
174217
_reportDataTransferedName = "Pie chart of transferred data"
175218

176219
def _reportDataTransfered(self, reportRequest):
177-
retVal = self._getSummaryData(
220+
"""It is used to retrieve the data from the database.
221+
222+
:param dict reportRequest: contains attributes used to create the plot.
223+
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
224+
"""
225+
retVal = self._getTimedData(
178226
reportRequest["startTime"],
179227
reportRequest["endTime"],
180-
"Transfersize",
228+
"TransferSize",
181229
preCondDict=reportRequest["condDict"],
182230
metadataDict=None,
183231
)
184232
if not retVal["OK"]:
185233
return retVal
186-
dataDict = retVal["Value"]
234+
dataDict, granularity = retVal["Value"]
235+
dataDict = self._sumDictValues(dataDict)
187236
for key in dataDict:
188237
dataDict[key] = int(dataDict[key])
189238
return S_OK({"data": dataDict})
190239

191240
def _plotDataTransfered(self, reportRequest, plotInfo, filename):
241+
"""It creates the plot.
242+
243+
:param dict reportRequest: plot attributes
244+
:param dict plotInfo: contains all the data which are used to create the plot
245+
:param str filename:
246+
:return: S_OK or S_ERROR { 'plot' : value1, 'thumbnail' : value2 } value1 and value2 are TRUE/FALSE
247+
"""
192248
metadata = {
193249
"title": "Total data transfered by %s" % reportRequest["grouping"],
194250
"ylabel": "bytes",

src/DIRAC/MonitoringSystem/private/Plotters/PilotSubmissionMonitoringPlotter.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010

1111
class PilotSubmissionMonitoringPlotter(BasePlotter):
12-
1312
"""
1413
.. class:: PilotSubmissionMonitoringPlotter
1514
@@ -22,6 +21,8 @@ class PilotSubmissionMonitoringPlotter(BasePlotter):
2221
_typeName = "PilotSubmissionMonitoring"
2322
_typeKeyFields = PilotSubmissionMonitoring().keyFields
2423

24+
_reportNumberOfSubmissions = "Total Number of Submission"
25+
2526
def _reportNumberOfSubmissions(self, reportRequest):
2627
"""It is used to retrieve the data from the database.
2728
@@ -66,12 +67,15 @@ def _plotNumberOfSubmissions(self, reportRequest, plotInfo, filename):
6667

6768
return self._generateStackedLinePlot(filename=filename, dataDict=plotInfo["data"], metadata=metadata)
6869

70+
_reportNumSucceededName = "Submission Efficiency"
71+
6972
def _reportNumSucceeded(self, reportRequest):
7073
"""It is used to retrieve the data from the database.
7174
7275
:param dict reportRequest: contains attributes used to create the plot.
7376
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
7477
"""
78+
# Retrieve the number of succeded submissions
7579
retVal = self._getTimedData(
7680
startTime=reportRequest["startTime"],
7781
endTime=reportRequest["endTime"],
@@ -81,8 +85,25 @@ def _reportNumSucceeded(self, reportRequest):
8185
)
8286
if not retVal["OK"]:
8387
return retVal
88+
89+
# Retrieve the number of total submissions
90+
retTotVal = self._getTimedData(
91+
startTime=reportRequest["startTime"],
92+
endTime=reportRequest["endTime"],
93+
selectField="NumTotal",
94+
preCondDict=reportRequest["condDict"],
95+
metadataDict=None,
96+
)
97+
if not retTotVal["OK"]:
98+
return retTotVal
99+
84100
dataDict, granularity = retVal["Value"]
85-
return S_OK({"data": dataDict, "granularity": granularity})
101+
totDataDict, granularity = retTotVal["Value"]
102+
# Check that the dicts are not empty
103+
if bool(dataDict) and bool(totDataDict):
104+
# Retrun the efficiency in dataDict
105+
effDict = self._calculateEfficiencyDict(totDataDict, dataDict)
106+
return S_OK({"data": effDict, "granularity": granularity})
86107

87108
def _plotNumSucceeded(self, reportRequest, plotInfo, filename):
88109
"""
@@ -92,12 +113,10 @@ def _plotNumSucceeded(self, reportRequest, plotInfo, filename):
92113
:param dict plotInfo: Data for plot.
93114
:param str filename: File name
94115
"""
95-
96116
metadata = {
97117
"title": "Pilot Submission efficiency by %s" % reportRequest["grouping"],
98118
"starttime": reportRequest["startTime"],
99119
"endtime": reportRequest["endTime"],
100120
"span": plotInfo["granularity"],
101121
}
102-
103122
return self._generateQualityPlot(filename, plotInfo["data"], metadata)

0 commit comments

Comments
 (0)