Skip to content

Commit 38a2923

Browse files
authored
Merge pull request #5984 from DIRACGridBot/cherry-pick-2-34702c9db-integration
[sweep:integration] Fix race condition in FTS3DB.getNonFinishedOperations
2 parents 956dba1 + 86b3890 commit 38a2923

File tree

3 files changed

+247
-45
lines changed

3 files changed

+247
-45
lines changed

src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@
4747
# Lifetime in seconds of the proxy we download for submission
4848
PROXY_LIFETIME = 43200 # 12 hours
4949

50+
# Instead of querying many jobs at once,
51+
# which maximizes the possibility of race condition
52+
# when running multiple agents, we rather do it in steps
53+
JOB_MONITORING_BATCH_SIZE = 20
54+
5055

5156
class FTS3Agent(AgentModule):
5257
"""
@@ -279,35 +284,49 @@ def monitorJobsLoop(self):
279284
log = gLogger.getSubLogger("monitorJobs")
280285
log.debug("Size of the context cache %s" % len(self._globalContextCache))
281286

287+
# Find the number of loops
288+
nbOfLoops, mod = divmod(self.jobBulkSize, JOB_MONITORING_BATCH_SIZE)
289+
if mod:
290+
nbOfLoops += 1
291+
282292
log.debug("Getting active jobs")
283-
# get jobs from DB
284-
res = self.fts3db.getActiveJobs(limit=self.jobBulkSize, jobAssignmentTag=self.assignmentTag)
285293

286-
if not res["OK"]:
287-
log.error("Could not retrieve ftsJobs from the DB", res)
288-
return res
294+
for loopId in range(nbOfLoops):
289295

290-
activeJobs = res["Value"]
291-
log.info("%s jobs to queue for monitoring" % len(activeJobs))
296+
log.info("Getting next batch of jobs to monitor", "%s/%s" % (loopId, nbOfLoops))
297+
# get jobs from DB
298+
res = self.fts3db.getActiveJobs(limit=JOB_MONITORING_BATCH_SIZE, jobAssignmentTag=self.assignmentTag)
292299

293-
# We store here the AsyncResult object on which we are going to wait
294-
applyAsyncResults = []
300+
if not res["OK"]:
301+
log.error("Could not retrieve ftsJobs from the DB", res)
302+
return res
295303

296-
# Starting the monitoring threads
297-
for ftsJob in activeJobs:
298-
log.debug("Queuing executing of ftsJob %s" % ftsJob.jobID)
299-
# queue the execution of self._monitorJob( ftsJob ) in the thread pool
300-
# The returned value is passed to _monitorJobCallback
301-
applyAsyncResults.append(
302-
self.jobsThreadPool.apply_async(self._monitorJob, (ftsJob,), callback=self._monitorJobCallback)
303-
)
304+
activeJobs = res["Value"]
305+
log.info("Jobs queued for monitoring", len(activeJobs))
304306

305-
log.debug("All execution queued")
307+
# We store here the AsyncResult object on which we are going to wait
308+
applyAsyncResults = []
306309

307-
# Waiting for all the monitoring to finish
308-
while not all([r.ready() for r in applyAsyncResults]):
309-
log.debug("Not all the tasks are finished")
310-
time.sleep(0.5)
310+
# Starting the monitoring threads
311+
for ftsJob in activeJobs:
312+
log.debug("Queuing executing of ftsJob %s" % ftsJob.jobID)
313+
# queue the execution of self._monitorJob( ftsJob ) in the thread pool
314+
# The returned value is passed to _monitorJobCallback
315+
applyAsyncResults.append(
316+
self.jobsThreadPool.apply_async(self._monitorJob, (ftsJob,), callback=self._monitorJobCallback)
317+
)
318+
319+
log.debug("All execution queued")
320+
321+
# Waiting for all the monitoring to finish
322+
while not all([r.ready() for r in applyAsyncResults]):
323+
log.debug("Not all the tasks are finished")
324+
time.sleep(0.5)
325+
326+
# If we got less to monitor than what we asked,
327+
# stop looping
328+
if len(activeJobs) < JOB_MONITORING_BATCH_SIZE:
329+
break
311330

312331
log.debug("All the tasks have completed")
313332
return S_OK()

src/DIRAC/DataManagementSystem/DB/FTS3DB.py

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,18 @@
3939

4040
metadata = MetaData()
4141

42+
# Define the default utc_timestampfunction.
43+
# We overwrite it in the case of sqlite in the tests
44+
# because sqlite does not know UTC_TIMESTAMP
45+
utc_timestamp = func.utc_timestamp
4246

4347
fts3FileTable = Table(
4448
"Files",
4549
metadata,
4650
Column("fileID", Integer, primary_key=True),
4751
Column("operationID", Integer, ForeignKey("Operations.operationID", ondelete="CASCADE"), nullable=False),
4852
Column("attempt", Integer, server_default="0"),
49-
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
53+
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
5054
Column("rmsFileID", Integer, server_default="0"),
5155
Column("lfn", String(1024)),
5256
Column("checksum", String(255)),
@@ -67,7 +71,7 @@
6771
Column("jobID", Integer, primary_key=True),
6872
Column("operationID", Integer, ForeignKey("Operations.operationID", ondelete="CASCADE"), nullable=False),
6973
Column("submitTime", DateTime),
70-
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
74+
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
7175
Column("lastMonitor", DateTime),
7276
Column("completeness", Float),
7377
# Could be fetched from Operation, but bad for perf
@@ -99,7 +103,7 @@
99103
Column("activity", String(255)),
100104
Column("priority", SmallInteger),
101105
Column("creationTime", DateTime),
102-
Column("lastUpdate", DateTime, onupdate=func.utc_timestamp()),
106+
Column("lastUpdate", DateTime, onupdate=utc_timestamp()),
103107
Column("status", Enum(*FTS3Operation.ALL_STATES), server_default=FTS3Operation.INIT_STATE, index=True),
104108
Column("error", String(1024)),
105109
Column("type", String(255)),
@@ -176,7 +180,7 @@ def __getDBConnectionInfo(self, fullname):
176180
self.dbPass = dbParameters["Password"]
177181
self.dbName = dbParameters["DBName"]
178182

179-
def __init__(self, pool_size=15):
183+
def __init__(self, pool_size=15, url=None):
180184
"""c'tor
181185
182186
:param self: self reference
@@ -185,12 +189,16 @@ def __init__(self, pool_size=15):
185189
"""
186190

187191
self.log = gLogger.getSubLogger("FTS3DB")
188-
# Initialize the connection info
189-
self.__getDBConnectionInfo("DataManagement/FTS3DB")
192+
193+
if not url:
194+
# Initialize the connection info
195+
self.__getDBConnectionInfo("DataManagement/FTS3DB")
196+
197+
url = "mysql://%s:%s@%s:%s/%s" % (self.dbUser, self.dbPass, self.dbHost, self.dbPort, self.dbName)
190198

191199
runDebug = gLogger.getLevel() == "DEBUG"
192200
self.engine = create_engine(
193-
"mysql://%s:%s@%s:%s/%s" % (self.dbUser, self.dbPass, self.dbHost, self.dbPort, self.dbName),
201+
url,
194202
echo=runDebug,
195203
pool_size=pool_size,
196204
pool_recycle=3600,
@@ -221,7 +229,7 @@ def persistOperation(self, operation):
221229
# so that another agent can work on the request
222230
operation.assignment = None
223231
# because of the merge we have to explicitely set lastUpdate
224-
operation.lastUpdate = func.utc_timestamp()
232+
operation.lastUpdate = utc_timestamp()
225233
try:
226234

227235
# Merge it in case it already is in the DB
@@ -286,7 +294,6 @@ def getActiveJobs(self, limit=20, lastMonitor=None, jobAssignmentTag="Assigned")
286294
:returns: list of FTS3Jobs
287295
288296
"""
289-
290297
session = self.dbSession(expire_on_commit=False)
291298

292299
try:
@@ -435,7 +442,7 @@ def updateJobStatus(self, jobStatusDict):
435442
updateDict[FTS3Job.completeness] = valueDict["completeness"]
436443

437444
if valueDict.get("lastMonitor"):
438-
updateDict[FTS3Job.lastMonitor] = func.utc_timestamp()
445+
updateDict[FTS3Job.lastMonitor] = utc_timestamp()
439446

440447
updateDict[FTS3Job.assignment] = None
441448

@@ -530,12 +537,13 @@ def getNonFinishedOperations(self, limit=20, operationAssignmentTag="Assigned"):
530537
ftsOperations = []
531538

532539
# We need to do the select in two times because the join clause that makes the limit difficult
540+
# We get the list of operations ID that have associated jobs assigned
541+
opIDsWithJobAssigned = session.query(FTS3Job.operationID).filter(~FTS3Job.assignment.is_(None)).subquery()
533542
operationIDsQuery = (
534543
session.query(FTS3Operation.operationID)
535-
.outerjoin(FTS3Job)
536544
.filter(FTS3Operation.status.in_(["Active", "Processed"]))
537545
.filter(FTS3Operation.assignment.is_(None))
538-
.filter(FTS3Job.assignment.is_(None))
546+
.filter(~FTS3Operation.operationID.in_(opIDsWithJobAssigned))
539547
.order_by(FTS3Operation.lastUpdate.asc())
540548
.limit(limit)
541549
.distinct()
@@ -591,8 +599,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
591599
ftsOps = (
592600
session.query(FTS3Operation.operationID)
593601
.filter(
594-
FTS3Operation.lastUpdate
595-
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
602+
FTS3Operation.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
596603
)
597604
.filter(~FTS3Operation.assignment.is_(None))
598605
.limit(limit)
@@ -607,7 +614,7 @@ def kickStuckOperations(self, limit=20, kickDelay=2):
607614
.where(FTS3Operation.operationID.in_(opIDs))
608615
.where(
609616
FTS3Operation.lastUpdate
610-
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
617+
< (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
611618
)
612619
.values({"assignment": None})
613620
.execution_options(synchronize_session=False) # see comment about synchronize_session
@@ -641,9 +648,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
641648

642649
ftsJobs = (
643650
session.query(FTS3Job.jobID)
644-
.filter(
645-
FTS3Job.lastUpdate < (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
646-
)
651+
.filter(FTS3Job.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay))))
647652
.filter(~FTS3Job.assignment.is_(None))
648653
.limit(limit)
649654
)
@@ -655,9 +660,7 @@ def kickStuckJobs(self, limit=20, kickDelay=2):
655660
result = session.execute(
656661
update(FTS3Job)
657662
.where(FTS3Job.jobID.in_(jobIDs))
658-
.where(
659-
FTS3Job.lastUpdate < (func.date_sub(func.utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay)))
660-
)
663+
.where(FTS3Job.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d HOUR" % kickDelay))))
661664
.values({"assignment": None})
662665
.execution_options(synchronize_session=False) # see comment about synchronize_session
663666
)
@@ -689,8 +692,7 @@ def deleteFinalOperations(self, limit=20, deleteDelay=180):
689692
ftsOps = (
690693
session.query(FTS3Operation.operationID)
691694
.filter(
692-
FTS3Operation.lastUpdate
693-
< (func.date_sub(func.utc_timestamp(), text("INTERVAL %d DAY" % deleteDelay)))
695+
FTS3Operation.lastUpdate < (func.date_sub(utc_timestamp(), text("INTERVAL %d DAY" % deleteDelay)))
694696
)
695697
.filter(FTS3Operation.status.in_(FTS3Operation.FINAL_STATES))
696698
.limit(limit)

0 commit comments

Comments
 (0)