Skip to content

Commit 828ecff

Browse files
authored
Merge pull request #5807 from rupozzi/fix-dataop
[integration] Move DataOperation to Monitoring
2 parents dce4220 + d2ff899 commit 828ecff

File tree

11 files changed

+669
-129
lines changed

11 files changed

+669
-129
lines changed

docs/source/AdministratorGuide/Systems/MonitoringSystem/index.rst

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ Monitoring System
1010
Overview
1111
=========
1212

13-
The Monitoring system is used to monitor various components of DIRAC. Currently, we have three monitoring types:
13+
The Monitoring system is used to monitor various components of DIRAC. Currently, we have five monitoring types:
1414

1515
- WMSHistory: for monitoring the DIRAC WMS
1616
- Component Monitoring: for monitoring DIRAC components such as services, agents, etc.
1717
- RMS Monitoring: for monitoring the DIRAC RequestManagement System (mostly the Request Executing Agent).
18+
- PilotSubmission Monitoring: for monitoring the DIRAC pilot submission statistics from SiteDirector agents
19+
- DataOperation Monitoring: for monitoring the DIRAC data operation statistics
1820

1921
It is based on Elasticsearch distributed search and analytics NoSQL database.
2022
If you want to use it, you have to install the Monitoring service, and of course connect to a ElasticSearch instance.
@@ -160,6 +162,13 @@ Enable Pilot Submission Monitoring
160162
In order to enable the monitoring of the pilot submission so that they will be sent to ES backend (by default they are sent to Accounting), you need to set
161163
``sendPilotSubmissionMonitoring = True`` for this option in WorkloadManagement/SiteDirector.
162164

165+
Enable Data Operation Monitoring
166+
==================================
167+
168+
To enable the monitoring of data operation to have it sent to ES backend, you need to add ``Monitoring`` to the ``MonitoringBackends`` option in Operations/DataManagement.
169+
170+
171+
163172
Accessing the Monitoring information
164173
=====================================
165174

src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424

2525
from DIRAC import S_OK, S_ERROR
2626

27-
from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation
27+
28+
from DIRAC.MonitoringSystem.Client.DataOperationSender import DataOperationSender
2829
from DIRAC.Core.Base.AgentModule import AgentModule
2930
from DIRAC.Core.Utilities.DErrno import cmpError
3031
from DIRAC.Core.Utilities.DictCache import DictCache
31-
3232
from DIRAC.Core.Utilities.Time import fromString
3333
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getFTS3ServerDict
3434
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations as opHelper
@@ -40,7 +40,6 @@
4040
from DIRAC.DataManagementSystem.Client.FTS3Job import FTS3Job
4141
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
4242

43-
4443
# pylint: disable=attribute-defined-outside-init
4544

4645
AGENT_NAME = "DataManagement/FTS3Agent"
@@ -99,7 +98,6 @@ def initialize(self):
9998
10099
:return: S_OK()/S_ERROR()
101100
"""
102-
103101
self._globalContextCache = {}
104102

105103
# name that will be used in DB for assignment tag
@@ -120,6 +118,7 @@ def beginExecution(self):
120118
121119
:return: S_OK()/S_ERROR()
122120
"""
121+
self.dataOpSender = DataOperationSender()
123122
return self.__readConf()
124123

125124
def getFTS3Context(self, username, group, ftsServer, threadID):
@@ -630,16 +629,14 @@ def execute(self):
630629

631630
return S_OK()
632631

633-
@staticmethod
634-
def __sendAccounting(ftsJob):
635-
"""prepare and send DataOperation to AccountingDB
636-
637-
:param ftsJob: the FTS3Job from which we send the accounting info
638-
"""
632+
def endExecution(self):
633+
self.dataOpSender.concludeSending()
639634

640-
dataOp = DataOperation()
641-
dataOp.setStartTime(fromString(ftsJob.submitTime))
642-
dataOp.setEndTime(fromString(ftsJob.lastUpdate))
635+
def __sendAccounting(self, ftsJob):
643636

644-
dataOp.setValuesFromDict(ftsJob.accountingDict)
645-
dataOp.delayedCommit()
637+
self.dataOpSender.sendData(
638+
ftsJob.accountingDict,
639+
delayedCommit=True,
640+
startTime=fromString(ftsJob.submitTime),
641+
endTime=fromString(ftsJob.lastUpdate),
642+
)

src/DIRAC/DataManagementSystem/Client/DataManager.py

Lines changed: 80 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@
2020
import DIRAC
2121
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
2222
from DIRAC.Core.Utilities import DErrno
23+
from DIRAC.Core.Utilities import Time
2324
from DIRAC.Core.Utilities.Adler import fileAdler, compareAdler
2425
from DIRAC.Core.Utilities.File import makeGuid, getSize
2526
from DIRAC.Core.Utilities.List import randomize, breakListIntoChunks
2627
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
2728
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
2829
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
29-
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient
30-
from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation
30+
from DIRAC.MonitoringSystem.Client.DataOperationSender import DataOperationSender
3131
from DIRAC.DataManagementSystem.Utilities.DMSHelpers import DMSHelpers
3232
from DIRAC.Resources.Catalog.FileCatalog import FileCatalog
3333
from DIRAC.Resources.Storage.StorageElement import StorageElement
@@ -46,8 +46,8 @@ def _isOlderThan(stringTime, days):
4646
return False
4747

4848

49-
def _initialiseAccountingObject(operation, se, files):
50-
"""create accouting record"""
49+
def _initialiseAccountingDict(operation, se, files):
50+
"""create Accounting/Monitoring record"""
5151
accountingDict = {}
5252
accountingDict["OperationType"] = operation
5353
result = getProxyInfo()
@@ -67,9 +67,7 @@ def _initialiseAccountingObject(operation, se, files):
6767
accountingDict["TransferTime"] = 0.0
6868
accountingDict["FinalStatus"] = "Successful"
6969
accountingDict["Source"] = DIRAC.siteName()
70-
oDataOperation = DataOperation()
71-
oDataOperation.setValuesFromDict(accountingDict)
72-
return oDataOperation
70+
return accountingDict
7371

7472

7573
class DataManager(object):
@@ -104,6 +102,7 @@ def __init__(self, catalogs=None, masterCatalogOnly=False, vo=False):
104102
self.dmsHelper = DMSHelpers(vo=vo)
105103
self.registrationProtocol = self.dmsHelper.getRegistrationProtocols()
106104
self.thirdPartyProtocols = self.dmsHelper.getThirdPartyProtocols()
105+
self.dataOpSender = DataOperationSender()
107106

108107
def setAccountingClient(self, client):
109108
"""Set Accounting Client instance"""
@@ -540,24 +539,32 @@ def putAndRegister(self, lfn, fileName, diracSE, guid=None, path=None, checksum=
540539
failed = {}
541540
##########################################################
542541
# Perform the put here.
543-
oDataOperation = _initialiseAccountingObject("putAndRegister", diracSE, 1)
544-
oDataOperation.setStartTime()
545-
oDataOperation.setValueByKey("TransferSize", size)
546-
startTime = time.time()
542+
startTime = Time.dateTime()
543+
transferStartTime = time.time()
547544
res = returnSingleResult(storageElement.putFile(fileDict))
548-
putTime = time.time() - startTime
549-
oDataOperation.setValueByKey("TransferTime", putTime)
550-
if not res["OK"]:
545+
putTime = time.time() - transferStartTime
551546

547+
accountingDict = _initialiseAccountingDict("putAndRegister", diracSE, 1)
548+
accountingDict["TransferSize"] = size
549+
accountingDict["TransferTime"] = putTime
550+
551+
if not res["OK"]:
552552
# We don't consider it a failure if the SE is not valid
553553
if not DErrno.cmpError(res, errno.EACCES):
554-
oDataOperation.setValueByKey("TransferOK", 0)
555-
oDataOperation.setValueByKey("FinalStatus", "Failed")
556-
oDataOperation.setEndTime()
557-
gDataStoreClient.addRegister(oDataOperation)
558-
gDataStoreClient.commit()
559-
startTime = time.time()
560-
log.debug("putAndRegister: Sending accounting took %.1f seconds" % (time.time() - startTime))
554+
555+
accountingDict["TransferOK"] = 0
556+
accountingDict["FinalStatus"] = "Failed"
557+
sendingResult = self.dataOpSender.sendData(
558+
accountingDict, commitFlag=True, startTime=startTime, endTime=Time.dateTime()
559+
)
560+
561+
log.verbose("Committing data operation")
562+
if not sendingResult["OK"]:
563+
log.error("Couldn't commit data operation", sendingResult["Message"])
564+
return sendingResult
565+
log.verbose("Done committing")
566+
log.debug("putAndRegister: Sending took %.1f seconds" % (time.time() - transferStartTime))
567+
561568
errStr = "Failed to put file to Storage Element."
562569
log.debug(errStr, "%s: %s" % (fileName, res["Message"]))
563570
return S_ERROR("%s %s" % (errStr, res["Message"]))
@@ -572,7 +579,6 @@ def putAndRegister(self, lfn, fileName, diracSE, guid=None, path=None, checksum=
572579
log.debug(errStr, res["Message"])
573580
return S_ERROR("%s %s" % (errStr, res["Message"]))
574581
destUrl = res["Value"]
575-
oDataOperation.setValueByKey("RegistrationTotal", 1)
576582

577583
fileTuple = (lfn, destUrl, size, destinationSE, guid, checksum)
578584
registerDict = {
@@ -586,25 +592,35 @@ def putAndRegister(self, lfn, fileName, diracSE, guid=None, path=None, checksum=
586592
startTime = time.time()
587593
res = self.registerFile(fileTuple)
588594
registerTime = time.time() - startTime
589-
oDataOperation.setValueByKey("RegistrationTime", registerTime)
595+
596+
accountingDict["RegistrationTotal"] = 1
597+
accountingDict["RegistrationTime"] = registerTime
598+
590599
if not res["OK"]:
591600
errStr = "Completely failed to register file."
592601
log.debug(errStr, res["Message"])
593602
failed[lfn] = {"register": registerDict}
594-
oDataOperation.setValueByKey("FinalStatus", "Failed")
603+
accountingDict["FinalStatus"] = "Failed"
604+
595605
elif lfn in res["Value"]["Failed"]:
596606
errStr = "Failed to register file."
597607
log.debug(errStr, "%s %s" % (lfn, res["Value"]["Failed"][lfn]))
598-
oDataOperation.setValueByKey("FinalStatus", "Failed")
608+
accountingDict["FinalStatus"] = "Failed"
599609
failed[lfn] = {"register": registerDict}
600610
else:
601611
successful[lfn]["register"] = registerTime
602-
oDataOperation.setValueByKey("RegistrationOK", 1)
603-
oDataOperation.setEndTime()
604-
gDataStoreClient.addRegister(oDataOperation)
612+
accountingDict["RegistrationOK"] = 1
613+
614+
# Send to Monitoring/Accounting
605615
startTime = time.time()
606-
gDataStoreClient.commit()
607-
log.debug("Sending accounting took %.1f seconds" % (time.time() - startTime))
616+
sendingResult = self.dataOpSender.sendData(accountingDict, commitFlag=True)
617+
log.verbose("Committing data operation")
618+
if not sendingResult["OK"]:
619+
log.error("Couldn't commit data operation", sendingResult["Message"])
620+
return sendingResult
621+
log.verbose("Done committing")
622+
log.debug("putAndRegister: Sending took %.1f seconds" % (time.time() - startTime))
623+
608624
return S_OK({"Successful": successful, "Failed": failed})
609625

610626
def replicateAndRegister(self, lfn, destSE, sourceSE="", destPath="", localCache="", catalog=""):
@@ -1186,7 +1202,7 @@ def removeFile(self, lfn, force=None):
11861202
failed.update(res["Value"]["Failed"])
11871203
successful.update(res["Value"]["Successful"])
11881204

1189-
gDataStoreClient.commit()
1205+
self.dataOpSender.concludeSending()
11901206
return S_OK({"Successful": successful, "Failed": failed})
11911207

11921208
def __removeFile(self, lfnDict):
@@ -1286,7 +1302,7 @@ def removeReplica(self, storageElementName, lfn):
12861302
return res
12871303
failed.update(res["Value"]["Failed"])
12881304
successful.update(res["Value"]["Successful"])
1289-
gDataStoreClient.commit()
1305+
self.dataOpSender.concludeSending()
12901306
return S_OK({"Successful": successful, "Failed": failed})
12911307

12921308
def __removeReplica(self, storageElementName, lfns, replicaDict=None):
@@ -1399,20 +1415,23 @@ def __removeCatalogReplica(self, replicaTuples):
13991415
:param replicaTuples : list of (lfn, catalogPFN, se)
14001416
"""
14011417
log = self.log.getSubLogger("__removeCatalogReplica")
1402-
oDataOperation = _initialiseAccountingObject("removeCatalogReplica", "", len(replicaTuples))
1403-
oDataOperation.setStartTime()
1404-
start = time.time()
1418+
1419+
startTime = Time.dateTime()
1420+
registrationStartTime = time.time()
14051421
# HACK!
14061422
replicaDict = {}
14071423
for lfn, pfn, se in replicaTuples:
14081424
replicaDict[lfn] = {"SE": se, "PFN": pfn}
14091425
res = self.fileCatalog.removeReplica(replicaDict)
1410-
oDataOperation.setEndTime()
1411-
oDataOperation.setValueByKey("RegistrationTime", time.time() - start)
1426+
endTime = Time.dateTime()
1427+
accountingDict = _initialiseAccountingDict("removeCatalogReplica", "", len(replicaTuples))
1428+
accountingDict["RegistrationTime"] = time.time() - registrationStartTime
1429+
14121430
if not res["OK"]:
1413-
oDataOperation.setValueByKey("RegistrationOK", 0)
1414-
oDataOperation.setValueByKey("FinalStatus", "Failed")
1415-
gDataStoreClient.addRegister(oDataOperation)
1431+
accountingDict["RegistrationOK"] = 0
1432+
accountingDict["FinalStatus"] = "Failed"
1433+
self.dataOpSender.sendData(accountingDict, startTime=startTime, endTime=endTime)
1434+
14161435
errStr = "Completely failed to remove replica: "
14171436
log.debug(errStr, res["Message"])
14181437
return S_ERROR("%s %s" % (errStr, res["Message"]))
@@ -1437,8 +1456,9 @@ def __removeCatalogReplica(self, replicaTuples):
14371456
for lfn in success:
14381457
log.debug("Successfully removed replica.", lfn)
14391458

1440-
oDataOperation.setValueByKey("RegistrationOK", len(success))
1441-
gDataStoreClient.addRegister(oDataOperation)
1459+
accountingDict["RegistrationOK"] = len(success)
1460+
self.dataOpSender.sendData(accountingDict, startTime=startTime, endTime=endTime)
1461+
14421462
return res
14431463

14441464
def __removePhysicalReplica(self, storageElementName, lfnsToRemove, replicaDict=None):
@@ -1456,19 +1476,22 @@ def __removePhysicalReplica(self, storageElementName, lfnsToRemove, replicaDict=
14561476
errStr = "The storage element is not currently valid."
14571477
log.verbose(errStr, "%s %s" % (storageElementName, res["Message"]))
14581478
return S_ERROR("%s %s" % (errStr, res["Message"]))
1459-
oDataOperation = _initialiseAccountingObject("removePhysicalReplica", storageElementName, len(lfnsToRemove))
1460-
oDataOperation.setStartTime()
1461-
start = time.time()
1479+
1480+
startTime = Time.dateTime()
1481+
transferStartTime = time.time()
14621482
lfnsToRemove = list(lfnsToRemove)
14631483
ret = storageElement.getFileSize(lfnsToRemove, replicaDict=replicaDict)
14641484
deletedSizes = ret.get("Value", {}).get("Successful", {})
14651485
res = storageElement.removeFile(lfnsToRemove, replicaDict=replicaDict)
1466-
oDataOperation.setEndTime()
1467-
oDataOperation.setValueByKey("TransferTime", time.time() - start)
1486+
endTime = Time.dateTime()
1487+
accountingDict = _initialiseAccountingDict("removePhysicalReplica", storageElementName, len(lfnsToRemove))
1488+
accountingDict["TransferTime"] = time.time() - transferStartTime
1489+
14681490
if not res["OK"]:
1469-
oDataOperation.setValueByKey("TransferOK", 0)
1470-
oDataOperation.setValueByKey("FinalStatus", "Failed")
1471-
gDataStoreClient.addRegister(oDataOperation)
1491+
accountingDict["TransferOK"] = 0
1492+
accountingDict["FinalStatus"] = "Failed"
1493+
self.dataOpSender.sendData(accountingDict, startTime=startTime, endTime=endTime)
1494+
14721495
log.debug("Failed to remove replicas.", res["Message"])
14731496
else:
14741497
for lfn, value in list(res["Value"]["Failed"].items()):
@@ -1479,10 +1502,11 @@ def __removePhysicalReplica(self, storageElementName, lfnsToRemove, replicaDict=
14791502
res["Value"]["Successful"][lfn] = True
14801503

14811504
deletedSize = sum(deletedSizes.get(lfn, 0) for lfn in res["Value"]["Successful"])
1482-
oDataOperation.setValueByKey("TransferSize", deletedSize)
1483-
oDataOperation.setValueByKey("TransferOK", len(res["Value"]["Successful"]))
14841505

1485-
gDataStoreClient.addRegister(oDataOperation)
1506+
accountingDict["TransferSize"] = deletedSize
1507+
accountingDict["TransferOK"] = len(res["Value"]["Successful"])
1508+
self.dataOpSender.sendData(accountingDict, startTime=startTime, endTime=endTime)
1509+
14861510
infoStr = "Successfully issued accounting removal request."
14871511
log.debug(infoStr)
14881512
return res
@@ -1851,3 +1875,6 @@ def getReplica(self, lfn, storageElementName, localPath=False):
18511875
:param bool singleFile: execute for the first LFN only
18521876
"""
18531877
return self.__executeIfReplicaExists(storageElementName, lfn, "getFile", localPath=localPath)
1878+
1879+
def __del__(self):
1880+
self.dataOpSender.concludeSending()

0 commit comments

Comments
 (0)