Skip to content

Commit d1515b1

Browse files
committed
feat: New tests for DataOpSender plus other fixes
1 parent d0d9a97 commit d1515b1

File tree

6 files changed

+191
-68
lines changed

6 files changed

+191
-68
lines changed

src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ def _monitorJob(self, ftsJob):
247247
res = self.fts3db.updateJobStatus(upDict)
248248

249249
if ftsJob.status in ftsJob.FINAL_STATES:
250-
self.__sendAccounting(self, ftsJob)
250+
self.__sendAccounting(ftsJob)
251251

252252
return ftsJob, res
253253

@@ -632,7 +632,6 @@ def execute(self):
632632
def endExecution(self):
633633
self.dataOpSender.concludeSending()
634634

635-
@staticmethod
636635
def __sendAccounting(self, ftsJob):
637636

638637
self.dataOpSender.sendData(
Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,25 @@
1+
"""
2+
This class is being called whenever there is need to send data operation to Accounting or Monitoring, or both.
3+
Created as replacement, or rather semplification, of the MonitoringReporter/gDataStoreClient usage for data operation to handle both cases.
4+
5+
"""
6+
17
import DIRAC
2-
from DIRAC import S_OK, S_ERROR, gLogger
8+
from DIRAC import S_OK, gLogger
39

410
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
511
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient
612
from DIRAC.AccountingSystem.Client.Types.DataOperation import DataOperation
713
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
814

15+
sLog = gLogger.getSubLogger("__name__")
16+
917

1018
class DataOperationSender:
19+
"""
20+
class:: DataOperationSender
21+
It reads the MonitoringBackends option to decide whether send and commit data operation to either Accounting or Monitoring.
22+
"""
1123

1224
# Initialize the object so that the Reporters are created only once
1325
def __init__(self):
@@ -24,17 +36,19 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
2436
:param dict baseDict: contains a key/value pair
2537
:param bool commitFlag: decides whether to commit the record or not.
2638
:param bool delayedCommit: decides whether to commit the record with delay (only for sending to Accounting)
39+
:param int startTime: epoch time, start time of the plot
40+
:param int endTime: epoch time, end time of the plot
2741
"""
2842
if "Monitoring" in self.monitoringOption:
2943
baseDict["ExecutionSite"] = DIRAC.siteName()
3044
self.dataOperationReporter.addRecord(baseDict)
31-
if commitFlag:
45+
if commitFlag or delayedCommit:
3246
result = self.dataOperationReporter.commit()
33-
gLogger.verbose("Committing data operation to monitoring")
47+
sLog.debug("Committing data operation to monitoring")
3448
if not result["OK"]:
35-
gLogger.error("Couldn't commit data operation to monitoring", result["Message"])
49+
sLog.error("Could not commit data operation to monitoring", result["Message"])
3650
return result
37-
gLogger.verbose("Done committing to monitoring")
51+
sLog.debug("Done committing to monitoring")
3852

3953
if "Accounting" in self.monitoringOption:
4054
self.dataOp.setValuesFromDict(baseDict)
@@ -46,22 +60,22 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
4660
self.dataOp.setEndTime()
4761
# Adding only to register
4862
if not commitFlag and not delayedCommit:
49-
result = gDataStoreClient.addRegister(self.dataOp)
50-
return result
63+
return gDataStoreClient.addRegister(self.dataOp)
64+
5165
# Adding to register and committing
5266
if commitFlag and not delayedCommit:
5367
gDataStoreClient.addRegister(self.dataOp)
5468
result = gDataStoreClient.commit()
55-
gLogger.verbose("Committing data operation to accounting")
69+
sLog.debug("Committing data operation to accounting")
5670
if not result["OK"]:
57-
gLogger.error("Couldn't commit data operation to accounting", result["Message"])
71+
sLog.error("Could not commit data operation to accounting", result["Message"])
5872
return result
59-
gLogger.verbose("Done committing to accounting")
73+
sLog.debug("Done committing to accounting")
6074
# Only late committing
6175
else:
6276
result = self.dataOp.delayedCommit()
6377
if not result["OK"]:
64-
gLogger.error("Couldn't delay-commit data operation to accounting")
78+
sLog.error("Could not delay-commit data operation to accounting")
6579
return result
6680

6781
return S_OK()
@@ -70,9 +84,9 @@ def sendData(self, baseDict, commitFlag=False, delayedCommit=False, startTime=Fa
7084
def concludeSending(self):
7185
if "Accounting" in self.monitoringOption:
7286
result = gDataStoreClient.commit()
73-
gLogger.verbose("Concluding the sending and committing data operation to accounting")
87+
sLog.debug("Concluding the sending and committing data operation to accounting")
7488
if not result["OK"]:
75-
gLogger.error("Couldn't commit data operation to accounting", result["Message"])
89+
sLog.error("Could not commit data operation to accounting", result["Message"])
7690
return result
77-
gLogger.verbose("Done committing to accounting")
91+
sLog.debug("Done committing to accounting")
7892
return S_OK()

src/DIRAC/MonitoringSystem/Client/Types/DataOperation.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
"""
44

55
from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType
6-
import DIRAC
76

87

98
class DataOperation(BaseType):
@@ -42,6 +41,13 @@ def __init__(self):
4241
"Destination": {"type": "keyword"},
4342
"Protocol": {"type": "keyword"},
4443
"FinalStatus": {"type": "keyword"},
44+
"TransferSize": {"type": "long"},
45+
"TransferTime": {"type": "long"},
46+
"RegistrationTime": {"type": "long"},
47+
"TransferOK": {"type": "long"},
48+
"TransferTotal": {"type": "long"},
49+
"RegistrationOK": {"type": "long"},
50+
"RegistrationTotal": {"type": "long"},
4551
}
4652
)
4753

src/DIRAC/Resources/Storage/StorageElement.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,13 +1308,12 @@ def __getattr__(self, name):
13081308

13091309
def addAccountingOperation(self, lfns, startDate, elapsedTime, storageParameters, callRes):
13101310
"""
1311-
Generates a DataOperation accounting if needs to be, and adds it to the DataStore client cache
1311+
Generates a DataOperationSender instance and sends the operation data filled in accountingDict.
13121312
13131313
:param lfns: list of lfns on which we attempted the operation
13141314
:param startDate: datetime, start of the operation
13151315
:param elapsedTime: time (seconds) the operation took
13161316
:param storageParameters: the parameters of the plugins used to perform the operation
1317-
:param monitoringOption: option to decide whether to send to Monitoring or Accounting
13181317
:param callRes: the return of the method call, S_OK or S_ERROR
13191318
13201319
The operation is generated with the OperationType "se.methodName"
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import pytest
2+
3+
from DIRAC import gLogger
4+
from DIRAC.MonitoringSystem.Client.DataOperationSender import DataOperationSender
5+
6+
gLogger.setLevel("DEBUG")
7+
8+
dataOpSender = DataOperationSender()
9+
10+
dataOpMonitoringData = [
11+
{
12+
"OperationType": "se.getFile",
13+
"User": "rpozzi",
14+
"ExecutionSite": "",
15+
"Source": "CertificationSandboxSE",
16+
"Destination": "LCG.PIC.es",
17+
"Protocol": "dips",
18+
"FinalStatus": "Successful",
19+
"TransferSize": 3,
20+
"TransferTime": 1458226213,
21+
"RegistrationTime": 1458226213,
22+
"TransferOK": 20,
23+
"TransferTotal": 50,
24+
"RegistrationOK": 10,
25+
"RegistrationTotal": 40,
26+
},
27+
{
28+
"OperationType": "se.getFile",
29+
"User": "fstagni",
30+
"ExecutionSite": "",
31+
"Source": "Failed",
32+
"Destination": "LCG.PIC.es",
33+
"Protocol": "dips",
34+
"FinalStatus": "Failed",
35+
"TransferSize": 343,
36+
"TransferTime": 1458226213,
37+
"RegistrationTime": 1458226213,
38+
"TransferOK": 6,
39+
"TransferTotal": 26,
40+
"RegistrationOK": 3,
41+
"RegistrationTotal": 35,
42+
},
43+
{
44+
"OperationType": "se.getFile",
45+
"User": "fstagni",
46+
"ExecutionSite": "",
47+
"Source": "Failed",
48+
"Destination": "LCG.PIC.es",
49+
"Protocol": "dips",
50+
"FinalStatus": "Failed",
51+
"TransferSize": 35555,
52+
"TransferTime": 1458226213,
53+
"RegistrationTime": 1458226213,
54+
"TransferOK": 1345,
55+
"TransferTotal": 2614,
56+
"RegistrationOK": 31245,
57+
"RegistrationTotal": 351255,
58+
},
59+
{
60+
"OperationType": "se.getFile",
61+
"User": "rpozzi",
62+
"ExecutionSite": "",
63+
"Source": "Failed",
64+
"Destination": "LCG.CNAF.it",
65+
"Protocol": "dips",
66+
"FinalStatus": "Failed",
67+
"TransferSize": 1000,
68+
"TransferTime": 1458222546,
69+
"RegistrationTime": 1458226000,
70+
"TransferOK": 109,
71+
"TransferTotal": 1204,
72+
"RegistrationOK": 321,
73+
"RegistrationTotal": 5000,
74+
},
75+
]
76+
delayedDataOpData = [
77+
{
78+
"OperationType": "se.getFile",
79+
"User": "fstagni",
80+
"ExecutionSite": "",
81+
"Source": "Failed",
82+
"Destination": "LCG.PIC.es",
83+
"Protocol": "dips",
84+
"FinalStatus": "Failed",
85+
"TransferSize": 3,
86+
"TransferTime": 1458226213,
87+
"RegistrationTime": 1458226213,
88+
"TransferOK": 6,
89+
"TransferTotal": 26,
90+
"RegistrationOK": 3,
91+
"RegistrationTotal": 35,
92+
},
93+
{
94+
"OperationType": "se.getFile",
95+
"User": "rpozzi",
96+
"ExecutionSite": "",
97+
"Source": "Failed",
98+
"Destination": "LCG.CNAF.it",
99+
"Protocol": "dips",
100+
"FinalStatus": "Successfull",
101+
"TransferSize": 10,
102+
"TransferTime": 1458226300,
103+
"RegistrationTime": 1458226300,
104+
"TransferOK": 23,
105+
"TransferTotal": 113,
106+
"RegistrationOK": 11,
107+
"RegistrationTotal": 403,
108+
},
109+
{
110+
"OperationType": "se.getFile",
111+
"User": "rpozzi",
112+
"ExecutionSite": "",
113+
"Source": "Failed",
114+
"Destination": "LCG.CNAF.it",
115+
"Protocol": "dips",
116+
"FinalStatus": "Successfull",
117+
"TransferSize": 10,
118+
"TransferTime": 1458226300,
119+
"RegistrationTime": 1458226300,
120+
"TransferOK": 23,
121+
"TransferTotal": 113,
122+
"RegistrationOK": 11,
123+
"RegistrationTotal": 403,
124+
},
125+
]
126+
127+
# fixture to have before the test methods
128+
@pytest.fixture
129+
def addToRegister():
130+
# Add the first set
131+
for record in dataOpMonitoringData:
132+
add_result = dataOpSender.sendData(record, False, False)
133+
assert add_result["OK"]
134+
# Add the second set
135+
for record in delayedDataOpData:
136+
add_result = dataOpSender.sendData(record, False, False)
137+
assert add_result["OK"]
138+
yield addToRegister
139+
140+
141+
# Test all possible options for the class
142+
@pytest.mark.parametrize(("commitFlag, delayedCommit"), [(False, False), (True, False), (True, True), (False, True)])
143+
def test_DataOperationSender(commitFlag, delayedCommit):
144+
for record in dataOpMonitoringData:
145+
result = dataOpSender.sendData(record, commitFlag, delayedCommit)
146+
if not commitFlag and not delayedCommit:
147+
dataOpSender.concludeSending()
148+
assert result["OK"], result["Message"]
149+
150+
151+
def test_delayed_DataOpSender():
152+
# Try to conclude sending of data added to the register by the fixture method addToRegister
153+
result = dataOpSender.concludeSending()
154+
assert result["OK"], result["Message"]

tests/Integration/Monitoring/Test_MonitoringReporter.py

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,12 @@
5757

5858
from DIRAC import gLogger
5959
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
60-
from DIRAC.MonitoringSystem.Client.DataOperationSender import DataOperationSender
6160

6261
gLogger.setLevel("INFO")
6362

6463
wmsMonitoringReporter = MonitoringReporter(monitoringType="WMSHistory")
6564
componentMonitoringReporter = MonitoringReporter(monitoringType="ComponentMonitoring")
6665
pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotSubmissionMonitoring")
67-
<<<<<<< HEAD
68-
=======
69-
dataOpSender = DataOperationSender()
70-
>>>>>>> refactor: Changed DataOperationSender into a class
7166

7267
data = [
7368
{
@@ -857,41 +852,6 @@
857852
},
858853
]
859854

860-
dataOpMonitoringData = [
861-
{
862-
"OperationType": "se.getFile",
863-
"User": "rpozzi",
864-
"ExecutionSite": "",
865-
"Source": "CertificationSandboxSE",
866-
"Destination": "LCG.PIC.es",
867-
"Protocol": "dips",
868-
"FinalStatus": "Successful",
869-
"TransferSize": 3,
870-
"TransferTime": 1458226213,
871-
"RegistrationTime": 1458226213,
872-
"TransferOK": 20,
873-
"TransferTotal": 50,
874-
"RegistrationOK": 10,
875-
"RegistrationTotal": 40,
876-
},
877-
{
878-
"OperationType": "se.getFile",
879-
"User": "fstagni",
880-
"ExecutionSite": "",
881-
"Source": "Failed",
882-
"Destination": "LCG.PIC.es",
883-
"Protocol": "dips",
884-
"FinalStatus": "Failed",
885-
"TransferSize": 3,
886-
"TransferTime": 1458226213,
887-
"RegistrationTime": 1458226213,
888-
"TransferOK": 6,
889-
"TransferTotal": 26,
890-
"RegistrationOK": 3,
891-
"RegistrationTotal": 35,
892-
},
893-
]
894-
895855

896856
def test_addWMSRecords():
897857
for record in data:
@@ -915,12 +875,3 @@ def test_addPilotSubmissionRecords():
915875
result = pilotMonitoringReporter.commit()
916876
assert result["OK"]
917877
assert result["Value"] == len(pilotMonitoringData)
918-
919-
920-
@pytest.mark.parametrize(("commitFlag, delayedCommit"), [(False, False), (True, False), (True, True), (False, True)])
921-
def test_DataOperationSender(commitFlag, delayedCommit):
922-
for record in dataOpMonitoringData:
923-
result = dataOpSender.sendData(record, commitFlag, delayedCommit)
924-
if not commitFlag and not delayedCommit:
925-
dataOpSender.concludeSending()
926-
assert result["OK"], result["Message"]

0 commit comments

Comments
 (0)