|
46 | 46 |
|
47 | 47 | # # agent name
|
48 | 48 | AGENT_NAME = "RequestManagement/RequestExecutingAgent"
|
| 49 | +# # requests/cycle |
| 50 | +REQUESTSPERCYCLE = 100 |
| 51 | +# # minimal nb of subprocess running |
| 52 | +MINPROCESS = 20 |
| 53 | +# # maximal nb of subprocess executed same time |
| 54 | +MAXPROCESS = 20 |
| 55 | +# # ProcessPool queue size |
| 56 | +QUEUESIZE = 20 |
| 57 | +# # file timeout |
| 58 | +FILETIMEOUT = 300 |
| 59 | +# # operation timeout |
| 60 | +OPERATIONTIMEOUT = 300 |
| 61 | +# # ProcessPool finalization timeout |
| 62 | +POOLTIMEOUT = 900 |
| 63 | +# # ProcessPool sleep time |
| 64 | +POOLSLEEP = 5 |
49 | 65 |
|
50 | 66 |
|
51 | 67 | class AgentConfigError(Exception):
|
@@ -77,34 +93,23 @@ class RequestExecutingAgent(AgentModule):
|
77 | 93 | def __init__(self, *args, **kwargs):
|
78 | 94 | """c'tor"""
|
79 | 95 | # # call base class ctor
|
80 |
| - super().__init__(*args, **kwargs) |
81 |
| - |
82 |
| - # # process pool |
83 |
| - self.__processPool = None |
84 |
| - # # request cache |
85 |
| - self.__requestCache = {} |
86 |
| - # # requests/cycle |
87 |
| - self.__requestsPerCycle = 100 |
88 |
| - # # minimal nb of subprocess running |
89 |
| - self.__minProcess = 20 |
90 |
| - # # maximal nb of subprocess executed same time |
91 |
| - self.__maxProcess = 20 |
92 |
| - # # ProcessPool queue size |
93 |
| - self.__queueSize = 20 |
94 |
| - # # file timeout |
95 |
| - self.__fileTimeout = 300 |
96 |
| - # # operation timeout |
97 |
| - self.__operationTimeout = 300 |
98 |
| - # # ProcessPool finalization timeout |
99 |
| - self.__poolTimeout = 900 |
100 |
| - # # ProcessPool sleep time |
101 |
| - self.__poolSleep = 5 |
102 |
| - # # placeholder for RequestClient instance |
103 |
| - self.__requestClient = None |
104 |
| - # # Size of the bulk if use of getRequests. If 0, use getRequest |
105 |
| - self.__bulkRequest = 0 |
106 |
| - # # Send the monitoring data to ES rather than the Framework/Monitoring |
107 |
| - self.__rmsMonitoring = False |
| 96 | + super().__init__(*args, **kwargs) |
| 97 | + |
| 98 | + self.__processPool = None |
| 99 | + self.__requestCache = {} |
| 100 | + self.__requestsPerCycle = REQUESTSPERCYCLE |
| 101 | + self.__minProcess = MINPROCESS |
| 102 | + self.__maxProcess = MAXPROCESS |
| 103 | + self.__queueSize = QUEUESIZE |
| 104 | + self.__fileTimeout = FILETIMEOUT |
| 105 | + self.__operationTimeout = OPERATIONTIMEOUT |
| 106 | + self.__poolTimeout = POOLTIMEOUT |
| 107 | + self.__poolSleep = POOLSLEEP |
| 108 | + self.__requestClient = None |
| 109 | + # Size of the bulk if use of getRequests. If 0, use getRequest |
| 110 | + self.__bulkRequest = 0 |
| 111 | + # Send the monitoring data to ES rather than the Framework/Monitoring |
| 112 | + self.__rmsMonitoring = False |
108 | 113 |
|
109 | 114 | def processPool(self):
|
110 | 115 | """facade for ProcessPool"""
|
@@ -196,79 +201,79 @@ def putAllRequests(self):
|
196 | 201 | def initialize(self):
|
197 | 202 | """initialize agent"""
|
198 | 203 |
|
199 |
| - # # ProcessPool related stuff |
200 |
| - self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle) |
201 |
| - self.log.info("Requests/cycle = %d" % self.__requestsPerCycle) |
202 |
| - self.__minProcess = self.am_getOption("MinProcess", self.__minProcess) |
203 |
| - self.log.info("ProcessPool min process = %d" % self.__minProcess) |
204 |
| - self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess) |
205 |
| - self.log.info("ProcessPool max process = %d" % self.__maxProcess) |
206 |
| - self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize) |
207 |
| - self.log.info("ProcessPool queue size = %d" % self.__queueSize) |
208 |
| - self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout)) |
209 |
| - self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout) |
210 |
| - self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep)) |
211 |
| - self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep) |
212 |
| - self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest) |
213 |
| - self.log.info("Bulk request size = %d" % self.__bulkRequest) |
214 |
| - self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring) |
215 |
| - self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring) |
216 |
| - |
217 |
| - # # keep config path and agent name |
218 |
| - self.agentName = self.am_getModuleParam("fullName") |
219 |
| - self.__configPath = PathFinder.getAgentSection(self.agentName) |
220 |
| - |
221 |
| - # # operation handlers over here |
222 |
| - opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers") |
223 |
| - opHandlers = gConfig.getSections(opHandlersPath) |
224 |
| - if not opHandlers["OK"]: |
225 |
| - self.log.error(opHandlers["Message"]) |
226 |
| - raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath) |
227 |
| - opHandlers = opHandlers["Value"] |
228 |
| - |
229 |
| - self.timeOuts = dict() |
230 |
| - |
231 |
| - # # handlers dict |
232 |
| - self.handlersDict = dict() |
233 |
| - for opHandler in opHandlers: |
234 |
| - opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler) |
235 |
| - opLocation = gConfig.getValue(opHandlerPath, "") |
236 |
| - if not opLocation: |
237 |
| - self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler)) |
238 |
| - continue |
239 |
| - self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout} |
240 |
| - |
241 |
| - opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0) |
242 |
| - if opTimeout: |
243 |
| - self.timeOuts[opHandler]["PerOperation"] = opTimeout |
244 |
| - fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0) |
245 |
| - if fileTimeout: |
246 |
| - self.timeOuts[opHandler]["PerFile"] = fileTimeout |
247 |
| - |
248 |
| - self.handlersDict[opHandler] = opLocation |
249 |
| - |
250 |
| - self.log.info("Operation handlers:") |
251 |
| - for item in enumerate(self.handlersDict.items()): |
252 |
| - opHandler = item[1][0] |
253 |
| - self.log.info( |
254 |
| - "[%s] %s: %s (timeout: %d s + %d s per file)" |
255 |
| - % ( |
256 |
| - item[0], |
257 |
| - item[1][0], |
258 |
| - item[1][1], |
259 |
| - self.timeOuts[opHandler]["PerOperation"], |
260 |
| - self.timeOuts[opHandler]["PerFile"], |
261 |
| - ) |
262 |
| - ) |
263 |
| - |
264 |
| - if self.__rmsMonitoring: |
265 |
| - self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring") |
266 |
| - gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting) |
267 |
| - |
268 |
| - # # create request dict |
269 |
| - self.__requestCache = dict() |
270 |
| - |
271 |
| - return S_OK() |
| 204 | + # # ProcessPool related stuff |
| 205 | + self.__requestsPerCycle = self.am_getOption("RequestsPerCycle", self.__requestsPerCycle) |
| 206 | + self.log.info("Requests/cycle = %d" % self.__requestsPerCycle) |
| 207 | + self.__minProcess = self.am_getOption("MinProcess", self.__minProcess) |
| 208 | + self.log.info("ProcessPool min process = %d" % self.__minProcess) |
| 209 | + self.__maxProcess = self.am_getOption("MaxProcess", self.__maxProcess) |
| 210 | + self.log.info("ProcessPool max process = %d" % self.__maxProcess) |
| 211 | + self.__queueSize = self.am_getOption("ProcessPoolQueueSize", self.__queueSize) |
| 212 | + self.log.info("ProcessPool queue size = %d" % self.__queueSize) |
| 213 | + self.__poolTimeout = int(self.am_getOption("ProcessPoolTimeout", self.__poolTimeout)) |
| 214 | + self.log.info("ProcessPool timeout = %d seconds" % self.__poolTimeout) |
| 215 | + self.__poolSleep = int(self.am_getOption("ProcessPoolSleep", self.__poolSleep)) |
| 216 | + self.log.info("ProcessPool sleep time = %d seconds" % self.__poolSleep) |
| 217 | + self.__bulkRequest = self.am_getOption("BulkRequest", self.__bulkRequest) |
| 218 | + self.log.info("Bulk request size = %d" % self.__bulkRequest) |
| 219 | + self.__rmsMonitoring = self.am_getOption("EnableRMSMonitoring", self.__rmsMonitoring) |
| 220 | + self.log.info("Enable ES RMS Monitoring = %s" % self.__rmsMonitoring) |
| 221 | + |
| 222 | + # # keep config path and agent name |
| 223 | + self.agentName = self.am_getModuleParam("fullName") |
| 224 | + self.__configPath = PathFinder.getAgentSection(self.agentName) |
| 225 | + |
| 226 | + # # operation handlers over here |
| 227 | + opHandlersPath = "%s/%s" % (self.__configPath, "OperationHandlers") |
| 228 | + opHandlers = gConfig.getSections(opHandlersPath) |
| 229 | + if not opHandlers["OK"]: |
| 230 | + self.log.error(opHandlers["Message"]) |
| 231 | + raise AgentConfigError("OperationHandlers section not found in CS under %s" % self.__configPath) |
| 232 | + opHandlers = opHandlers["Value"] |
| 233 | + |
| 234 | + self.timeOuts = dict() |
| 235 | + |
| 236 | + # # handlers dict |
| 237 | + self.handlersDict = dict() |
| 238 | + for opHandler in opHandlers: |
| 239 | + opHandlerPath = "%s/%s/Location" % (opHandlersPath, opHandler) |
| 240 | + opLocation = gConfig.getValue(opHandlerPath, "") |
| 241 | + if not opLocation: |
| 242 | + self.log.error("%s not set for %s operation handler" % (opHandlerPath, opHandler)) |
| 243 | + continue |
| 244 | + self.timeOuts[opHandler] = {"PerFile": self.__fileTimeout, "PerOperation": self.__operationTimeout} |
| 245 | + |
| 246 | + opTimeout = gConfig.getValue("%s/%s/TimeOut" % (opHandlersPath, opHandler), 0) |
| 247 | + if opTimeout: |
| 248 | + self.timeOuts[opHandler]["PerOperation"] = opTimeout |
| 249 | + fileTimeout = gConfig.getValue("%s/%s/TimeOutPerFile" % (opHandlersPath, opHandler), 0) |
| 250 | + if fileTimeout: |
| 251 | + self.timeOuts[opHandler]["PerFile"] = fileTimeout |
| 252 | + |
| 253 | + self.handlersDict[opHandler] = opLocation |
| 254 | + |
| 255 | + self.log.info("Operation handlers:") |
| 256 | + for item in enumerate(self.handlersDict.items()): |
| 257 | + opHandler = item[1][0] |
| 258 | + self.log.info( |
| 259 | + "[%s] %s: %s (timeout: %d s + %d s per file)" |
| 260 | + % ( |
| 261 | + item[0], |
| 262 | + item[1][0], |
| 263 | + item[1][1], |
| 264 | + self.timeOuts[opHandler]["PerOperation"], |
| 265 | + self.timeOuts[opHandler]["PerFile"], |
| 266 | + ) |
| 267 | + ) |
| 268 | + |
| 269 | + if self.__rmsMonitoring: |
| 270 | + self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring") |
| 271 | + gThreadScheduler.addPeriodicTask(100, self.__rmsMonitoringReporting) |
| 272 | + |
| 273 | + # # create request dict |
| 274 | + self.__requestCache = dict() |
| 275 | + |
| 276 | + return S_OK() |
272 | 277 |
|
273 | 278 | def execute(self):
|
274 | 279 | """read requests from RequestClient and enqueue them into ProcessPool"""
|
|
0 commit comments