|
12 | 12 | :caption: FTS3Agent options
|
13 | 13 |
|
14 | 14 | """
|
| 15 | +import datetime |
15 | 16 | import errno
|
16 | 17 | import os
|
17 | 18 | from urllib import parse
|
|
54 | 55 | # when running multiple agents, we rather do it in steps
|
55 | 56 | JOB_MONITORING_BATCH_SIZE = 20
|
56 | 57 |
|
| 58 | +# We do not monitor a job more often |
| 59 | +# than MONITORING_DELAY in minutes |
| 60 | +MONITORING_DELAY = 10 |
| 61 | + |
57 | 62 |
|
58 | 63 | class FTS3Agent(AgentModule):
|
59 | 64 | """
|
@@ -299,18 +304,29 @@ def monitorJobsLoop(self):
|
299 | 304 | if mod:
|
300 | 305 | nbOfLoops += 1
|
301 | 306 |
|
| 307 | + # Not only is it pointless to monitor right after submission |
| 308 | + # but also we would end up fetching multiple time the same job otherwise |
| 309 | + # as we call getActiveJobs by batch |
| 310 | + lastMonitor = datetime.datetime.utcnow() - datetime.timedelta(minutes=MONITORING_DELAY) |
| 311 | + |
302 | 312 | log.debug("Getting active jobs")
|
303 | 313 |
|
304 | 314 | for loopId in range(nbOfLoops):
|
305 | 315 | log.info("Getting next batch of jobs to monitor", f"{loopId}/{nbOfLoops}")
|
306 | 316 | # get jobs from DB
|
307 |
| - res = self.fts3db.getActiveJobs(limit=JOB_MONITORING_BATCH_SIZE, jobAssignmentTag=self.assignmentTag) |
| 317 | + res = self.fts3db.getActiveJobs( |
| 318 | + limit=JOB_MONITORING_BATCH_SIZE, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag |
| 319 | + ) |
308 | 320 |
|
309 | 321 | if not res["OK"]:
|
310 | 322 | log.error("Could not retrieve ftsJobs from the DB", res)
|
311 | 323 | return res
|
312 | 324 |
|
313 | 325 | activeJobs = res["Value"]
|
| 326 | + if not activeJobs: |
| 327 | + log.info("No more jobs to monitor") |
| 328 | + break |
| 329 | + |
314 | 330 | log.info("Jobs queued for monitoring", len(activeJobs))
|
315 | 331 |
|
316 | 332 | # We store here the AsyncResult object on which we are going to wait
|
|
0 commit comments