Skip to content

Commit d32b8cb

Browse files
authored
Merge pull request #8216 from fstagni/90_accounting_simplif
[9.0] accounting and monitoring simplifications
2 parents b1686ff + 061b3a0 commit d32b8cb

File tree

10 files changed

+0
-319
lines changed

10 files changed

+0
-319
lines changed

src/DIRAC/AccountingSystem/Client/AccountingCLI.py

Lines changed: 0 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -57,99 +57,6 @@ def showTraceback(self):
5757
traceback.print_tb(sys.exc_info()[2])
5858
print("________________________\n")
5959

60-
def do_registerType(self, args):
61-
"""
62-
Registers a new accounting type
63-
Usage : registerType <typeName>
64-
<DIRACRoot>/DIRAC/AccountingSystem/Client/Types/<typeName>
65-
should exist and inherit the base type
66-
"""
67-
try:
68-
argList = args.split()
69-
if argList:
70-
typeName = argList[0].strip()
71-
else:
72-
gLogger.error("No type name specified")
73-
return
74-
# Try to import the type
75-
result = self.objectLoader.loadObject(f"DIRAC.AccountingSystem.Client.Types.{typeName}")
76-
if not result["OK"]:
77-
return result
78-
typeClass = result["Value"]
79-
80-
gLogger.info(f"Loaded type {typeClass.__name__}")
81-
typeDef = typeClass().getDefinition()
82-
acClient = DataStoreClient()
83-
retVal = acClient.registerType(*typeDef)
84-
if retVal["OK"]:
85-
gLogger.info("Type registered successfully")
86-
else:
87-
gLogger.error(f"Error: {retVal['Message']}")
88-
except Exception:
89-
self.showTraceback()
90-
91-
def do_resetBucketLength(self, args):
92-
"""
93-
Set the bucket Length. Will trigger a recalculation of buckets. Can take a while.
94-
Usage : resetBucketLength <typeName>
95-
<DIRACRoot>/DIRAC/AccountingSystem/Client/Types/<typeName>
96-
should exist and inherit the base type
97-
"""
98-
try:
99-
argList = args.split()
100-
if argList:
101-
typeName = argList[0].strip()
102-
else:
103-
gLogger.error("No type name specified")
104-
return
105-
106-
# Try to import the type
107-
result = self.objectLoader.loadObject(f"DIRAC.AccountingSystem.Client.Types.{typeName}")
108-
if not result["OK"]:
109-
return result
110-
typeClass = result["Value"]
111-
gLogger.info(f"Loaded type {typeClass.__name__}")
112-
typeDef = typeClass().getDefinition()
113-
acClient = DataStoreClient()
114-
retVal = acClient.setBucketsLength(typeDef[0], typeDef[3])
115-
if retVal["OK"]:
116-
gLogger.info("Type registered successfully")
117-
else:
118-
gLogger.error(f"Error: {retVal['Message']}")
119-
except Exception:
120-
self.showTraceback()
121-
122-
def do_regenerateBuckets(self, args):
123-
"""
124-
Regenerate buckets for type. Can take a while.
125-
Usage : regenerateBuckets <typeName>
126-
<DIRACRoot>/DIRAC/AccountingSystem/Client/Types/<typeName>
127-
should exist and inherit the base type
128-
"""
129-
try:
130-
argList = args.split()
131-
if argList:
132-
typeName = argList[0].strip()
133-
else:
134-
gLogger.error("No type name specified")
135-
return
136-
137-
# Try to import the type
138-
result = self.objectLoader.loadObject(f"DIRAC.AccountingSystem.Client.Types.{typeName}")
139-
if not result["OK"]:
140-
return result
141-
typeClass = result["Value"]
142-
gLogger.info(f"Loaded type {typeClass.__name__}")
143-
typeDef = typeClass().getDefinition()
144-
acClient = DataStoreClient()
145-
retVal = acClient.regenerateBuckets(typeDef[0])
146-
if retVal["OK"]:
147-
gLogger.info("Buckets recalculated!")
148-
else:
149-
gLogger.error(f"Error: {retVal['Message']}")
150-
except Exception:
151-
self.showTraceback()
152-
15360
def do_showRegisteredTypes(self, args):
15461
"""
15562
Get a list of registered types
@@ -170,50 +77,3 @@ def do_showRegisteredTypes(self, args):
17077
print(" Value fields:\n %s" % "\n ".join(typeList[2]))
17178
except Exception:
17279
self.showTraceback()
173-
174-
def do_deleteType(self, args):
175-
"""
176-
Delete a registered accounting type.
177-
Usage : deleteType <typeName>
178-
WARN! It will delete all data associated to that type! VERY DANGEROUS!
179-
If you screw it, you'll discover a new dimension of pain and doom! :)
180-
"""
181-
try:
182-
argList = args.split()
183-
if argList:
184-
typeName = argList[0].strip()
185-
else:
186-
gLogger.error("No type name specified")
187-
return
188-
189-
choice = input(
190-
f"Are you completely sure you want to delete type {typeName} and all it's data? yes/no [no]: "
191-
)
192-
choice = choice.lower()
193-
if choice not in ("yes", "y"):
194-
print("Delete aborted")
195-
return
196-
197-
acClient = DataStoreClient()
198-
retVal = acClient.deleteType(typeName)
199-
if not retVal["OK"]:
200-
gLogger.error(f"Error: {retVal['Message']}")
201-
return
202-
print("Hope you meant it, because it's done")
203-
except Exception:
204-
self.showTraceback()
205-
206-
def do_compactBuckets(self, args):
207-
"""
208-
Compact buckets table
209-
Usage : compactBuckets
210-
"""
211-
try:
212-
acClient = DataStoreClient()
213-
retVal = acClient.compactDB()
214-
if not retVal["OK"]:
215-
gLogger.error(f"Error: {retVal['Message']}")
216-
return
217-
gLogger.info("Done")
218-
except Exception:
219-
self.showTraceback()

src/DIRAC/AccountingSystem/Client/DataStoreClient.py

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -122,19 +122,6 @@ def delayedCommit(self):
122122

123123
return S_OK()
124124

125-
def remove(self, register):
126-
"""
127-
Remove a Register from the Accounting DataStore
128-
"""
129-
if not self.__checkBaseType(register.__class__):
130-
return S_ERROR("register is not a valid type (has to inherit from BaseAccountingType")
131-
retVal = register.checkValues()
132-
if not retVal["OK"]:
133-
return retVal
134-
if gConfig.getValue("/LocalSite/DisableAccounting", False):
135-
return S_OK()
136-
return self._getRPC().remove(*register.getValues())
137-
138125

139126
def _sendToFailover(rpcStub):
140127
"""Create a ForwardDISET operation for failover"""

src/DIRAC/AccountingSystem/Client/Types/BaseAccountingType.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,6 @@ def getContents(self):
161161
cD[self.fieldsList[iPos]] = self.valuesList[iPos]
162162
return cD
163163

164-
def registerToServer(self):
165-
"""
166-
Register type in server
167-
"""
168-
rpcClient = Client(url="Accounting/DataStore")
169-
return rpcClient.registerType(*self.getDefinition())
170-
171164
def commit(self):
172165
"""
173166
Commit register to server

src/DIRAC/AccountingSystem/ConfigTemplate.cfg

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,6 @@ Services
99
Authorization
1010
{
1111
Default = authenticated
12-
compactDB = ServiceAdministrator
13-
deleteType = ServiceAdministrator
14-
registerType = ServiceAdministrator
15-
setBucketsLength = ServiceAdministrator
16-
regenerateBuckets = ServiceAdministrator
1712
}
1813
}
1914
##END

src/DIRAC/AccountingSystem/Service/DataStoreHandler.py

Lines changed: 0 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from DIRAC import S_ERROR, S_OK
1515
from DIRAC.AccountingSystem.DB.MultiAccountingDB import MultiAccountingDB
1616
from DIRAC.ConfigurationSystem.Client import PathFinder
17-
from DIRAC.Core.Base.Client import Client
1817
from DIRAC.Core.DISET.RequestHandler import RequestHandler, getServiceOption
1918
from DIRAC.Core.Utilities import TimeUtilities
2019
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
@@ -39,30 +38,6 @@ def initializeHandler(cls, svcInfoDict):
3938
gThreadScheduler.addPeriodicTask(60, cls.__acDB.loadPendingRecords)
4039
return S_OK()
4140

42-
types_registerType = [str, list, list, list]
43-
44-
def export_registerType(self, typeName, definitionKeyFields, definitionAccountingFields, bucketsLength):
45-
"""
46-
Register a new type. (Only for all powerful admins)
47-
"""
48-
return self.__acDB.registerType(typeName, definitionKeyFields, definitionAccountingFields, bucketsLength)
49-
50-
types_setBucketsLength = [str, list]
51-
52-
def export_setBucketsLength(self, typeName, bucketsLength):
53-
"""
54-
Change the buckets Length. (Only for all powerful admins)
55-
"""
56-
return self.__acDB.changeBucketsLength(typeName, bucketsLength)
57-
58-
types_regenerateBuckets = [str]
59-
60-
def export_regenerateBuckets(self, typeName):
61-
"""
62-
Recalculate buckets. (Only for all powerful admins)
63-
"""
64-
return self.__acDB.regenerateBuckets(typeName)
65-
6641
types_getRegisteredTypes = []
6742

6843
def export_getRegisteredTypes(self):
@@ -106,51 +81,4 @@ def export_commitRegisters(self, entriesList):
10681
records.append((entry[0], startTime, endTime, entry[3]))
10782
return self.__acDB.insertRecordBundleThroughQueue(records)
10883

109-
types_compactDB = []
110-
111-
def export_compactDB(self):
112-
"""
113-
Compact the db by grouping buckets
114-
"""
115-
# if we are running workers (not only one service) we can redirect the request to the master
116-
# For more information please read the Administrative guide Accounting part!
117-
# ADVICE: If you want to trigger the bucketing, please make sure the bucketing is not running!!!!
118-
if self.runBucketing:
119-
return self.__acDB.compactBuckets()
120-
121-
return Client(url="Accounting/DataStoreMaster").compactDB()
122-
12384
types_remove = [str, datetime.datetime, datetime.datetime, list]
124-
125-
def export_remove(self, typeName, startTime, endTime, valuesList):
126-
"""
127-
Remove a record for a type
128-
"""
129-
startTime = int(TimeUtilities.toEpoch(startTime))
130-
endTime = int(TimeUtilities.toEpoch(endTime))
131-
return self.__acDB.deleteRecord(typeName, startTime, endTime, valuesList)
132-
133-
types_removeRegisters = [list]
134-
135-
def export_removeRegisters(self, entriesList):
136-
"""
137-
Remove a record for a type
138-
"""
139-
expectedTypes = [str, datetime.datetime, datetime.datetime, list]
140-
for entry in entriesList:
141-
if len(entry) != 4:
142-
return S_ERROR("Invalid records")
143-
for i, en in enumerate(entry):
144-
if not isinstance(en, expectedTypes[i]):
145-
return S_ERROR(f"{i} field in the records should be {expectedTypes[i]}")
146-
ok = 0
147-
for entry in entriesList:
148-
startTime = int(TimeUtilities.toEpoch(entry[1]))
149-
endTime = int(TimeUtilities.toEpoch(entry[2]))
150-
record = entry[3]
151-
result = self.__acDB.deleteRecord(entry[0], startTime, endTime, record)
152-
if not result["OK"]:
153-
return S_OK(ok)
154-
ok += 1
155-
156-
return S_OK(ok)

src/DIRAC/MonitoringSystem/DB/MonitoringDB.py

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -447,26 +447,6 @@ def __getRawData(self, typeName, condDict, size=-1):
447447
records.append({paramName: getattr(resObj["_source"], paramName) for paramName in paramNames})
448448
return S_OK(records)
449449

450-
def getLastDayData(self, typeName, condDict):
451-
"""
452-
It returns the last day data for a given monitoring type.
453-
454-
:returns: for example
455-
456-
.. code-block:: python
457-
458-
{'sort': [{'timestamp': {'order': 'desc'}}],
459-
'query': {'bool': {'must': [{'match': {'host': 'dzmathe.cern.ch'}},
460-
{'match': {'component': 'Bookkeeping_BookkeepingManager'}}]}}}
461-
462-
:param str typeName: name of the monitoring type
463-
:param dict condDict: conditions for the query
464-
465-
* key -> name of the field
466-
* value -> list of possible values
467-
"""
468-
return self.__getRawData(typeName, condDict)
469-
470450
def getLimitedData(self, typeName, condDict, size=10):
471451
"""
472452
Returns a list of records for a given selection.

src/DIRAC/MonitoringSystem/Service/MonitoringHandler.py

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -244,24 +244,6 @@ def export_getReport(self, reportRequest):
244244
reportRequest["generatePlot"] = False
245245
return reporter.generate(reportRequest)
246246

247-
types_addMonitoringRecords = [str, list]
248-
249-
def export_addMonitoringRecords(self, monitoringtype, data):
250-
"""
251-
Bulk insert data directly to the given monitoring type.
252-
253-
:param str monitoringtype: monitoring type name
254-
:param list data: list of documents
255-
:returns: S_OK or S_ERROR
256-
"""
257-
258-
retVal = self.__db.getIndexName(monitoringtype)
259-
if not retVal["OK"]:
260-
return retVal
261-
prefix = retVal["Value"]
262-
gLogger.debug("addMonitoringRecords:", prefix)
263-
return self.__db.bulk_index(prefix, data)
264-
265247
types_addRecords = [str, str, list]
266248

267249
def export_addRecords(self, indexname, monitoringType, data):
@@ -290,21 +272,6 @@ def export_deleteIndex(self, indexName):
290272
gLogger.debug("delete index:", indexName)
291273
return self.__db.deleteIndex(indexName)
292274

293-
types_getLastDayData = [str, dict]
294-
295-
def export_getLastDayData(self, typeName, condDict):
296-
"""
297-
It returns the data from the last day index. Note: we create daily indexes.
298-
299-
:param str typeName: name of the monitoring type
300-
:param dict condDict: conditions for the query
301-
302-
* key -> name of the field
303-
* value -> list of possible values
304-
"""
305-
306-
return self.__db.getLastDayData(typeName, condDict)
307-
308275
types_getLimitedDat = [str, dict, int]
309276

310277
def export_getLimitedData(self, typeName, condDict, size):

tests/Integration/AccountingSystem/Test_DataStoreClient.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,3 @@ def test_addAndRemoveStorageOccupancy():
2828
assert res["OK"], res["Message"]
2929
res = gDataStoreClient.commit()
3030
assert res["OK"], res["Message"]
31-
# now removing that record
32-
res = gDataStoreClient.remove(record)
33-
assert res["OK"], res["Message"]

tests/Integration/AccountingSystem/Test_ReportsClient.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,3 @@ def test_addAndRemoveStorageOccupancy():
4949
"StorageElement",
5050
)
5151
assert res["OK"], res["Message"]
52-
53-
# now removing that record
54-
res = gDataStoreClient.remove(record)
55-
assert res["OK"], res["Message"]

0 commit comments

Comments
 (0)