Skip to content

Commit 56ad81d

Browse files
committed
refactor: moved initialization of the agent inside the initialize() method
1 parent d5e65d0 commit 56ad81d

File tree

1 file changed

+119
-108
lines changed

1 file changed

+119
-108
lines changed

src/DIRAC/RequestManagementSystem/Agent/RequestExecutingAgent.py

Lines changed: 119 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,17 @@
3333

3434
# # from DIRAC
3535
from DIRAC import S_OK, S_ERROR, gConfig
36-
from DIRAC.Core.Base.AgentModule import AgentModule
3736
from DIRAC.ConfigurationSystem.Client import PathFinder
37+
from DIRAC.Core.Base.AgentModule import AgentModule
38+
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
39+
from DIRAC.Core.Utilities import Time, Network
40+
from DIRAC.Core.Utilities.DErrno import cmpError
3841
from DIRAC.Core.Utilities.ProcessPool import ProcessPool
42+
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
43+
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
3944
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
4045
from DIRAC.RequestManagementSystem.private.RequestTask import RequestTask
4146

42-
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
43-
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
44-
from DIRAC.Core.Utilities import Time, Network
45-
46-
from DIRAC.Core.Utilities.DErrno import cmpError
4747

4848
# # agent name
4949
AGENT_NAME = "RequestManagement/RequestExecutingAgent"
@@ -75,111 +75,37 @@ class RequestExecutingAgent(AgentModule):
7575
request processing agent using ProcessPool, Operation handlers and RequestTask
7676
"""
7777

78-
# # process pool
79-
__processPool = None
80-
# # request cache
81-
__requestCache = {}
82-
# # requests/cycle
83-
__requestsPerCycle = 100
84-
# # minimal nb of subprocess running
85-
__minProcess = 20
86-
# # maximal nb of subprocess executed same time
87-
__maxProcess = 20
88-
# # ProcessPool queue size
89-
__queueSize = 20
90-
# # file timeout
91-
__fileTimeout = 300
92-
# # operation timeout
93-
__operationTimeout = 300
94-
# # ProcessPool finalization timeout
95-
__poolTimeout = 900
96-
# # ProcessPool sleep time
97-
__poolSleep = 5
98-
# # placeholder for RequestClient instance
99-
__requestClient = None
100-
# # Size of the bulk if use of getRequests. If 0, use getRequest
101-
__bulkRequest = 0
102-
# # Send the monitoring data to ES rather than the Framework/Monitoring
103-
__rmsMonitoring = False
104-
10578
def __init__(self, *args, **kwargs):
10679
"""c'tor"""
10780
# # call base class ctor
108-
AgentModule.__init__(self, *args, **kwargs)
109-
# # ProcessPool related stuff
110-
self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle)
111-
self.log.info("Requests/cycle = %d" % self.__requestsPerCycle)
112-
self.__minProcess = self.am_getOption("MinProcess", self.__minProcess)
113-
self.log.info("ProcessPool min process = %d" % self.__minProcess)
114-
self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess)
115-
self.log.info("ProcessPool max process = %d" % self.__maxProcess)
116-
self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize)
117-
self.log.info("ProcessPool queue size = %d" % self.__queueSize)
118-
self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout))
119-
self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout)
120-
self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep))
121-
self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep)
122-
self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest)
123-
self.log.info("Bulk request size = %d" % self.__bulkRequest)
124-
self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring)
125-
self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring)
126-
127-
# # keep config path and agent name
128-
self.agentName = self.am_getModuleParam("fullName")
129-
self.__configPath = PathFinder.getAgentSection(self.agentName)
130-
131-
# # operation handlers over here
132-
opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers")
133-
opHandlers = gConfig.getSections(opHandlersPath)
134-
if not opHandlers["OK"]:
135-
self.log.error(opHandlers["Message"])
136-
raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath)
137-
opHandlers = opHandlers["Value"]
138-
139-
self.timeOuts = dict()
140-
141-
# # handlers dict
142-
self.handlersDict = dict()
143-
for opHandler in opHandlers:
144-
opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler)
145-
opLocation = gConfig.getValue(opHandlerPath, "")
146-
if not opLocation:
147-
self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler))
148-
continue
149-
self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout}
150-
151-
opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0)
152-
if opTimeout:
153-
self.timeOuts[opHandler]["PerOperation"] = opTimeout
154-
fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0)
155-
if fileTimeout:
156-
self.timeOuts[opHandler]["PerFile"] = fileTimeout
157-
158-
self.handlersDict[opHandler] = opLocation
159-
160-
self.log.info("Operation handlers:")
161-
for item in enumerate(self.handlersDict.items()):
162-
opHandler = item[1][0]
163-
self.log.info(
164-
"[%s] %s: %s (timeout: %d s + %d s per file)"
165-
% (
166-
item[0],
167-
item[1][0],
168-
item[1][1],
169-
self.timeOuts[opHandler]["PerOperation"],
170-
self.timeOuts[opHandler]["PerFile"],
171-
)
172-
)
173-
174-
if self.__rmsMonitoring:
175-
self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring")
176-
gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting)
177-
178-
# # create request dict
179-
self.__requestCache = dict()
180-
181-
# ?? Probably should be removed
182-
self.FTSMode = self.am_getOption("FTSMode", False)
81+
super().__init__(*args, **kwargs)
82+
83+
# # process pool
84+
self.__processPool = None
85+
# # request cache
86+
self.__requestCache = {}
87+
# # requests/cycle
88+
self.__requestsPerCycle = 100
89+
# # minimal nb of subprocess running
90+
self.__minProcess = 20
91+
# # maximal nb of subprocess executed same time
92+
self.__maxProcess = 20
93+
# # ProcessPool queue size
94+
self.__queueSize = 20
95+
# # file timeout
96+
self.__fileTimeout = 300
97+
# # operation timeout
98+
self.__operationTimeout = 300
99+
# # ProcessPool finalization timeout
100+
self.__poolTimeout = 900
101+
# # ProcessPool sleep time
102+
self.__poolSleep = 5
103+
# # placeholder for RequestClient instance
104+
self.__requestClient = None
105+
# # Size of the bulk if use of getRequests. If 0, use getRequest
106+
self.__bulkRequest = 0
107+
# # Send the monitoring data to ES rather than the Framework/Monitoring
108+
self.__rmsMonitoring = False
183109

184110
def processPool(self):
185111
"""facade for ProcessPool"""
@@ -270,6 +196,91 @@ def putAllRequests(self):
270196

271197
def initialize(self):
272198
"""initialize agent"""
199+
200+
# # ProcessPool related stuff
201+
self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle)
202+
self.log.info("Requests/cycle = %d" % self.__requestsPerCycle)
203+
self.__minProcess = self.am_getOption("MinProcess", self.__minProcess)
204+
self.log.info("ProcessPool min process = %d" % self.__minProcess)
205+
self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess)
206+
self.log.info("ProcessPool max process = %d" % self.__maxProcess)
207+
self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize)
208+
self.log.info("ProcessPool queue size = %d" % self.__queueSize)
209+
self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout))
210+
self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout)
211+
self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep))
212+
self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep)
213+
self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest)
214+
self.log.info("Bulk request size = %d" % self.__bulkRequest)
215+
self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring)
216+
self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring)
217+
218+
# # keep config path and agent name
219+
self.agentName = self.am_getModuleParam("fullName")
220+
self.__configPath = PathFinder.getAgentSection(self.agentName)
221+
222+
# # operation handlers over here
223+
opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers")
224+
opHandlers = gConfig.getSections(opHandlersPath)
225+
if not opHandlers["OK"]:
226+
self.log.error(opHandlers["Message"])
227+
raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath)
228+
opHandlers = opHandlers["Value"]
229+
230+
self.timeOuts = dict()
231+
232+
# # handlers dict
233+
self.handlersDict = dict()
234+
for opHandler in opHandlers:
235+
opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler)
236+
opLocation = gConfig.getValue(opHandlerPath, "")
237+
if not opLocation:
238+
self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler))
239+
continue
240+
self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout}
241+
242+
opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0)
243+
if opTimeout:
244+
self.timeOuts[opHandler]["PerOperation"] = opTimeout
245+
fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0)
246+
if fileTimeout:
247+
self.timeOuts[opHandler]["PerFile"] = fileTimeout
248+
249+
self.handlersDict[opHandler] = opLocation
250+
251+
self.log.info("Operation handlers:")
252+
for item in enumerate(self.handlersDict.items()):
253+
opHandler = item[1][0]
254+
self.log.info(
255+
"[%s] %s: %s (timeout: %d s + %d s per file)"
256+
% (
257+
item[0],
258+
item[1][0],
259+
item[1][1],
260+
self.timeOuts[opHandler]["PerOperation"],
261+
self.timeOuts[opHandler]["PerFile"],
262+
)
263+
)
264+
265+
if self.__rmsMonitoring:
266+
self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring")
267+
gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting)
268+
else:
269+
# # common monitor activity
270+
gMonitor.registerActivity("Iteration", "Agent Loops", "RequestExecutingAgent", "Loops/min", gMonitor.OP_SUM)
271+
gMonitor.registerActivity(
272+
"Processed", "Request Processed", "RequestExecutingAgent", "Requests/min", gMonitor.OP_SUM
273+
)
274+
gMonitor.registerActivity(
275+
"Done", "Request Completed", "RequestExecutingAgent", "Requests/min", gMonitor.OP_SUM
276+
)
277+
278+
# # create request dict
279+
self.__requestCache = dict()
280+
281+
# ?? Probably should be removed
282+
self.FTSMode = self.am_getOption("FTSMode", False)
283+
273284
return S_OK()
274285

275286
def execute(self):

0 commit comments

Comments
 (0)