Skip to content

Commit efe4d1f

Browse files
authored
Merge pull request #5960 from fstagni/80_fixesMonitoring
[8.0] moved REA initialization inside the initialize() method
2 parents 1e5cb5f + a1d267b commit efe4d1f

File tree

9 files changed

+119
-129
lines changed

9 files changed

+119
-129
lines changed

src/DIRAC/Core/Base/AgentModule.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,7 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties={}):
9898
the configuration Option 'shifterProxy' must be set, a default may be given
9999
in the initialize() method.
100100
"""
101-
if baseAgentName and agentName == baseAgentName:
102-
self.log = gLogger
103-
standaloneModule = True
104-
else:
105-
self.log = gLogger.getSubLogger(agentName, child=False)
106-
standaloneModule = False
101+
self.log = gLogger.getSubLogger(agentName, child=False)
107102

108103
self.__basePath = gConfig.getValue("/LocalSite/InstancePath", rootPath)
109104
self.__agentModule = None
@@ -115,7 +110,6 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties={}):
115110
"loadName": loadName,
116111
"section": PathFinder.getAgentSection(agentName),
117112
"loadSection": PathFinder.getAgentSection(loadName),
118-
"standalone": standaloneModule,
119113
"cyclesDone": 0,
120114
"totalElapsedTime": 0,
121115
"setup": gConfig.getValue("/DIRAC/Setup", "Unknown"),
@@ -171,6 +165,9 @@ def am_initialize(self, *initArgs):
171165
This is called by the AgentReactor, should not be overridden.
172166
"""
173167
agentName = self.am_getModuleParam("fullName")
168+
169+
self.__initializeMonitor()
170+
174171
result = self.initialize(*initArgs)
175172
if not isReturnStructure(result):
176173
return S_ERROR("initialize must return S_OK/S_ERROR")
@@ -182,8 +179,6 @@ def am_initialize(self, *initArgs):
182179
# Set the work directory in an environment variable available to subprocesses if needed
183180
os.environ["AGENT_WORKDIRECTORY"] = workDirectory
184181

185-
self.__initializeMonitor()
186-
187182
self.__moduleProperties["shifterProxy"] = self.am_getOption("shifterProxy")
188183
if len(self.__moduleProperties["executors"]) < 1:
189184
return S_ERROR("At least one executor method has to be defined")
@@ -295,6 +290,7 @@ def __initializeMonitor(self):
295290
# if the "EnableActivityMonitoring" flag in "yes" or "true" in the cfg file.
296291
self.activityMonitoring = Operations().getValue("EnableActivityMonitoring", False)
297292
if self.activityMonitoring:
293+
self.log.debug("Using activity monitoring")
298294
# The import needs to be here because of the CS must be initialized before importing
299295
# this class (see https://github.com/DIRACGrid/DIRAC/issues/4793)
300296
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
@@ -460,5 +456,4 @@ def __activityMonitoringReporting(self):
460456
461457
:return: True / False
462458
"""
463-
result = self.activityMonitoringReporter.commit()
464-
return result["OK"]
459+
return self.activityMonitoringReporter.commit()["OK"]

src/DIRAC/DataManagementSystem/Agent/RequestOperations/PhysicalRemoval.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: PhysicalRemoval.py
43
54
# Date: 2013/04/02 11:56:10

src/DIRAC/DataManagementSystem/Agent/RequestOperations/PutAndRegister.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: PutAndRegister.py
43
54
# Date: 2013/03/25 07:43:24

src/DIRAC/DataManagementSystem/Agent/RequestOperations/ReTransfer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: ReTransfer.py
43
54
# Date: 2013/04/02 14:24:21

src/DIRAC/RequestManagementSystem/Agent/CleanReqDBAgent.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: CleanReqDBAgent.py
43
54
# Date: 2013/05/17 08:31:26
@@ -28,7 +27,6 @@
2827
from DIRAC import S_OK
2928
from DIRAC.Core.Base.AgentModule import AgentModule
3029
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
31-
from DIRAC.RequestManagementSystem.Client.Request import Request
3230

3331
AGENT_NAME = "RequestManagement/CleanReqDBAgent"
3432

src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py

Lines changed: 111 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -33,20 +33,35 @@
3333

3434
# # from DIRAC
3535
from DIRAC import S_OK, S_ERROR, gConfig
36-
from DIRAC.Core.Base.AgentModule import AgentModule
3736
from DIRAC.ConfigurationSystem.Client import PathFinder
37+
from DIRAC.Core.Base.AgentModule import AgentModule
38+
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
39+
from DIRAC.Core.Utilities import Time, Network
40+
from DIRAC.Core.Utilities.DErrno import cmpError
3841
from DIRAC.Core.Utilities.ProcessPool import ProcessPool
42+
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
3943
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
4044
from DIRAC.RequestManagementSystem.private.RequestTask import RequestTask
4145

42-
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
43-
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
44-
from DIRAC.Core.Utilities import Time, Network
45-
46-
from DIRAC.Core.Utilities.DErrno import cmpError
4746

4847
# # agent name
4948
AGENT_NAME = "RequestManagement/RequestExecutingAgent"
49+
# # requests/cycle
50+
REQUESTSPERCYCLE = 100
51+
# # minimal nb of subprocess running
52+
MINPROCESS = 20
53+
# # maximal nb of subprocess executed same time
54+
MAXPROCESS = 20
55+
# # ProcessPool queue size
56+
QUEUESIZE = 20
57+
# # file timeout
58+
FILETIMEOUT = 300
59+
# # operation timeout
60+
OPERATIONTIMEOUT = 300
61+
# # ProcessPool finalization timeout
62+
POOLTIMEOUT = 900
63+
# # ProcessPool sleep time
64+
POOLSLEEP = 5
5065

5166

5267
class AgentConfigError(Exception):
@@ -75,111 +90,26 @@ class RequestExecutingAgent(AgentModule):
7590
request processing agent using ProcessPool, Operation handlers and RequestTask
7691
"""
7792

78-
# # process pool
79-
__processPool = None
80-
# # request cache
81-
__requestCache = {}
82-
# # requests/cycle
83-
__requestsPerCycle = 100
84-
# # minimal nb of subprocess running
85-
__minProcess = 20
86-
# # maximal nb of subprocess executed same time
87-
__maxProcess = 20
88-
# # ProcessPool queue size
89-
__queueSize = 20
90-
# # file timeout
91-
__fileTimeout = 300
92-
# # operation timeout
93-
__operationTimeout = 300
94-
# # ProcessPool finalization timeout
95-
__poolTimeout = 900
96-
# # ProcessPool sleep time
97-
__poolSleep = 5
98-
# # placeholder for RequestClient instance
99-
__requestClient = None
100-
# # Size of the bulk if use of getRequests. If 0, use getRequest
101-
__bulkRequest = 0
102-
# # Send the monitoring data to ES rather than the Framework/Monitoring
103-
__rmsMonitoring = False
104-
10593
def __init__(self, *args, **kwargs):
10694
"""c'tor"""
10795
# # call base class ctor
108-
AgentModule.__init__(self, *args, **kwargs)
109-
# # ProcessPool related stuff
110-
self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle)
111-
self.log.info("Requests/cycle = %d" % self.__requestsPerCycle)
112-
self.__minProcess = self.am_getOption("MinProcess", self.__minProcess)
113-
self.log.info("ProcessPool min process = %d" % self.__minProcess)
114-
self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess)
115-
self.log.info("ProcessPool max process = %d" % self.__maxProcess)
116-
self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize)
117-
self.log.info("ProcessPool queue size = %d" % self.__queueSize)
118-
self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout))
119-
self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout)
120-
self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep))
121-
self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep)
122-
self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest)
123-
self.log.info("Bulk request size = %d" % self.__bulkRequest)
124-
self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring)
125-
self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring)
126-
127-
# # keep config path and agent name
128-
self.agentName = self.am_getModuleParam("fullName")
129-
self.__configPath = PathFinder.getAgentSection(self.agentName)
130-
131-
# # operation handlers over here
132-
opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers")
133-
opHandlers = gConfig.getSections(opHandlersPath)
134-
if not opHandlers["OK"]:
135-
self.log.error(opHandlers["Message"])
136-
raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath)
137-
opHandlers = opHandlers["Value"]
138-
139-
self.timeOuts = dict()
140-
141-
# # handlers dict
142-
self.handlersDict = dict()
143-
for opHandler in opHandlers:
144-
opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler)
145-
opLocation = gConfig.getValue(opHandlerPath, "")
146-
if not opLocation:
147-
self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler))
148-
continue
149-
self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout}
150-
151-
opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0)
152-
if opTimeout:
153-
self.timeOuts[opHandler]["PerOperation"] = opTimeout
154-
fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0)
155-
if fileTimeout:
156-
self.timeOuts[opHandler]["PerFile"] = fileTimeout
157-
158-
self.handlersDict[opHandler] = opLocation
159-
160-
self.log.info("Operation handlers:")
161-
for item in enumerate(self.handlersDict.items()):
162-
opHandler = item[1][0]
163-
self.log.info(
164-
"[%s] %s: %s (timeout: %d s + %d s per file)"
165-
% (
166-
item[0],
167-
item[1][0],
168-
item[1][1],
169-
self.timeOuts[opHandler]["PerOperation"],
170-
self.timeOuts[opHandler]["PerFile"],
171-
)
172-
)
173-
174-
if self.__rmsMonitoring:
175-
self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring")
176-
gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting)
177-
178-
# # create request dict
179-
self.__requestCache = dict()
180-
181-
# ?? Probably should be removed
182-
self.FTSMode = self.am_getOption("FTSMode", False)
96+
super().__init__(*args, **kwargs)
97+
98+
self.__processPool = None
99+
self.__requestCache = {}
100+
self.__requestsPerCycle = REQUESTSPERCYCLE
101+
self.__minProcess = MINPROCESS
102+
self.__maxProcess = MAXPROCESS
103+
self.__queueSize = QUEUESIZE
104+
self.__fileTimeout = FILETIMEOUT
105+
self.__operationTimeout = OPERATIONTIMEOUT
106+
self.__poolTimeout = POOLTIMEOUT
107+
self.__poolSleep = POOLSLEEP
108+
self.__requestClient = None
109+
# Size of the bulk if use of getRequests. If 0, use getRequest
110+
self.__bulkRequest = 0
111+
# Send the monitoring data to ES rather than the Framework/Monitoring
112+
self.__rmsMonitoring = False
183113

184114
def processPool(self):
185115
"""facade for ProcessPool"""
@@ -270,6 +200,79 @@ def putAllRequests(self):
270200

271201
def initialize(self):
272202
"""initialize agent"""
203+
204+
# # ProcessPool related stuff
205+
self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle)
206+
self.log.info("Requests/cycle = %d" % self.__requestsPerCycle)
207+
self.__minProcess = self.am_getOption("MinProcess", self.__minProcess)
208+
self.log.info("ProcessPool min process = %d" % self.__minProcess)
209+
self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess)
210+
self.log.info("ProcessPool max process = %d" % self.__maxProcess)
211+
self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize)
212+
self.log.info("ProcessPool queue size = %d" % self.__queueSize)
213+
self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout))
214+
self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout)
215+
self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep))
216+
self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep)
217+
self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest)
218+
self.log.info("Bulk request size = %d" % self.__bulkRequest)
219+
self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring)
220+
self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring)
221+
222+
# # keep config path and agent name
223+
self.agentName = self.am_getModuleParam("fullName")
224+
self.__configPath = PathFinder.getAgentSection(self.agentName)
225+
226+
# # operation handlers over here
227+
opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers")
228+
opHandlers = gConfig.getSections(opHandlersPath)
229+
if not opHandlers["OK"]:
230+
self.log.error(opHandlers["Message"])
231+
raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath)
232+
opHandlers = opHandlers["Value"]
233+
234+
self.timeOuts = dict()
235+
236+
# # handlers dict
237+
self.handlersDict = dict()
238+
for opHandler in opHandlers:
239+
opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler)
240+
opLocation = gConfig.getValue(opHandlerPath, "")
241+
if not opLocation:
242+
self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler))
243+
continue
244+
self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout}
245+
246+
opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0)
247+
if opTimeout:
248+
self.timeOuts[opHandler]["PerOperation"] = opTimeout
249+
fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0)
250+
if fileTimeout:
251+
self.timeOuts[opHandler]["PerFile"] = fileTimeout
252+
253+
self.handlersDict[opHandler] = opLocation
254+
255+
self.log.info("Operation handlers:")
256+
for item in enumerate(self.handlersDict.items()):
257+
opHandler = item[1][0]
258+
self.log.info(
259+
"[%s] %s: %s (timeout: %d s + %d s per file)"
260+
% (
261+
item[0],
262+
item[1][0],
263+
item[1][1],
264+
self.timeOuts[opHandler]["PerOperation"],
265+
self.timeOuts[opHandler]["PerFile"],
266+
)
267+
)
268+
269+
if self.__rmsMonitoring:
270+
self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring")
271+
gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting)
272+
273+
# # create request dict
274+
self.__requestCache = dict()
275+
273276
return S_OK()
274277

275278
def execute(self):

src/DIRAC/RequestManagementSystem/Agent/RequestOperations/ForwardDISET.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: ForwardDISET.py
43
54
# Date: 2013/03/22 12:40:06

src/DIRAC/RequestManagementSystem/Agent/RequestOperations/test/ForwardDISETTests.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: ForwardDISETTests.py
43
54
# Date: 2013/04/18 09:23:05
@@ -31,7 +30,7 @@
3130
# # SUT
3231
from DIRAC.RequestManagementSystem.Agent.RequestOperations.ForwardDISET import ForwardDISET
3332

34-
########################################################################
33+
3534
class ForwardDISETTests(unittest.TestCase):
3635
"""
3736
.. class:: ForwardDISETTests

src/DIRAC/RequestManagementSystem/Service/test/OperationHandlerBaseTests.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
########################################################################
2-
# $HeadURL $
32
# File: OperationHandlerBaseTests.py
43
54
# Date: 2013/03/25 08:09:08
@@ -27,7 +26,7 @@
2726
from DIRAC.RequestManagementSystem.Client.Operation import Operation
2827
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
2928

30-
########################################################################
29+
3130
class OperationHandlerBaseTests(unittest.TestCase):
3231
"""
3332
.. class:: OperationHandlerBaseTests

0 commit comments

Comments
 (0)