Skip to content

Commit c52d9fd

Browse files
authored
Merge pull request #6854 from chaen/v8.0_FIX_repAndRegisterCheckSEs
[8.0] ReplicateAndRegister with FTS checks SE status
2 parents 1fa28d1 + 2247c6f commit c52d9fd

File tree

2 files changed

+57
-32
lines changed

2 files changed

+57
-32
lines changed

src/DIRAC/DataManagementSystem/Agent/FTS3Agent.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
:caption: FTS3Agent options
1313
1414
"""
15+
import datetime
1516
import errno
17+
import os
18+
from urllib import parse
1619
import time
1720

1821
# from threading import current_thread
@@ -52,6 +55,10 @@
5255
# when running multiple agents, we rather do it in steps
5356
JOB_MONITORING_BATCH_SIZE = 20
5457

58+
# We do not monitor a job more often
59+
# than MONITORING_DELAY in minutes
60+
MONITORING_DELAY = 10
61+
5562

5663
class FTS3Agent(AgentModule):
5764
"""
@@ -108,6 +115,8 @@ def initialize(self):
108115
# name that will be used in DB for assignment tag
109116
self.assignmentTag = gethostname().split(".")[0]
110117

118+
self.workDirectory = self.am_getWorkDirectory()
119+
111120
res = self.__readConf()
112121

113122
# We multiply by two because of the two threadPools
@@ -165,13 +174,19 @@ def getFTS3Context(self, username, group, ftsServer, threadID):
165174

166175
log.debug(f"UserDN {userDN}")
167176

177+
# Chose a meaningful proxy name for easier debugging
178+
srvName = parse.urlparse(ftsServer).netloc.split(":")[0]
179+
proxyFile = os.path.join(
180+
self.workDirectory, f"{int(time.time())}_{username}_{group}_{srvName}_{threadID}.pem"
181+
)
182+
168183
# We dump the proxy to a file.
169184
# It has to have a lifetime of self.proxyLifetime
170185
# Because the FTS3 servers cache it for 2/3rd of the lifetime
171186
# we should make our cache a bit less than 2/3rd of the lifetime
172187
cacheTime = int(2 * self.proxyLifetime / 3) - 600
173188
res = gProxyManager.downloadVOMSProxyToFile(
174-
userDN, group, requiredTimeLeft=self.proxyLifetime, cacheTime=cacheTime
189+
userDN, group, requiredTimeLeft=self.proxyLifetime, cacheTime=cacheTime, filePath=proxyFile
175190
)
176191
if not res["OK"]:
177192
return res
@@ -289,18 +304,29 @@ def monitorJobsLoop(self):
289304
if mod:
290305
nbOfLoops += 1
291306

307+
# Not only is it pointless to monitor right after submission
308+
# but also we would end up fetching multiple time the same job otherwise
309+
# as we call getActiveJobs by batch
310+
lastMonitor = datetime.datetime.utcnow() - datetime.timedelta(minutes=MONITORING_DELAY)
311+
292312
log.debug("Getting active jobs")
293313

294314
for loopId in range(nbOfLoops):
295315
log.info("Getting next batch of jobs to monitor", f"{loopId}/{nbOfLoops}")
296316
# get jobs from DB
297-
res = self.fts3db.getActiveJobs(limit=JOB_MONITORING_BATCH_SIZE, jobAssignmentTag=self.assignmentTag)
317+
res = self.fts3db.getActiveJobs(
318+
limit=JOB_MONITORING_BATCH_SIZE, lastMonitor=lastMonitor, jobAssignmentTag=self.assignmentTag
319+
)
298320

299321
if not res["OK"]:
300322
log.error("Could not retrieve ftsJobs from the DB", res)
301323
return res
302324

303325
activeJobs = res["Value"]
326+
if not activeJobs:
327+
log.info("No more jobs to monitor")
328+
break
329+
304330
log.info("Jobs queued for monitoring", len(activeJobs))
305331

306332
# We store here the AsyncResult object on which we are going to wait

src/DIRAC/DataManagementSystem/Agent/RequestOperations/ReplicateAndRegister.py

Lines changed: 29 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,35 @@ def __call__(self):
184184
if self.rmsMonitoring:
185185
self.rmsMonitoringReporter = MonitoringReporter(monitoringType="RMSMonitoring")
186186

187+
sourceSE = self.operation.SourceSE if self.operation.SourceSE else None
188+
if sourceSE:
189+
# check sourceSE for read
190+
bannedSource = self.checkSEsRSS(sourceSE, "ReadAccess")
191+
if not bannedSource["OK"]:
192+
if self.rmsMonitoring:
193+
for status in ["Attempted", "Failed"]:
194+
self.rmsMonitoringReporter.addRecord(self.createRMSRecord(status, len(self.operation)))
195+
self.rmsMonitoringReporter.commit()
196+
return bannedSource
197+
198+
if bannedSource["Value"]:
199+
self.operation.Error = f"SourceSE {sourceSE} is banned for reading"
200+
self.log.info(self.operation.Error)
201+
return S_OK(self.operation.Error)
202+
203+
# check targetSEs for write
204+
bannedTargets = self.checkSEsRSS()
205+
if not bannedTargets["OK"]:
206+
if self.rmsMonitoring:
207+
for status in ["Attempted", "Failed"]:
208+
self.rmsMonitoringReporter.addRecord(self.createRMSRecord(status, len(self.operation)))
209+
self.rmsMonitoringReporter.commit()
210+
return bannedTargets
211+
212+
if bannedTargets["Value"]:
213+
self.operation.Error = f"{','.join(bannedTargets['Value'])} targets are banned for writing"
214+
return S_OK(self.operation.Error)
215+
187216
# # check replicas first
188217
checkReplicas = self.__checkReplicas()
189218
if not checkReplicas["OK"]:
@@ -480,36 +509,6 @@ def fts3Transfer(self):
480509

481510
def dmTransfer(self, fromFTS=False):
482511
"""replicate and register using dataManager"""
483-
# # get waiting files. If none just return
484-
# # source SE
485-
sourceSE = self.operation.SourceSE if self.operation.SourceSE else None
486-
if sourceSE:
487-
# # check source se for read
488-
bannedSource = self.checkSEsRSS(sourceSE, "ReadAccess")
489-
if not bannedSource["OK"]:
490-
if self.rmsMonitoring:
491-
for status in ["Attempted", "Failed"]:
492-
self.rmsMonitoringReporter.addRecord(self.createRMSRecord(status, len(self.operation)))
493-
self.rmsMonitoringReporter.commit()
494-
return bannedSource
495-
496-
if bannedSource["Value"]:
497-
self.operation.Error = f"SourceSE {sourceSE} is banned for reading"
498-
self.log.info(self.operation.Error)
499-
return S_OK(self.operation.Error)
500-
501-
# # check targetSEs for write
502-
bannedTargets = self.checkSEsRSS()
503-
if not bannedTargets["OK"]:
504-
if self.rmsMonitoring:
505-
for status in ["Attempted", "Failed"]:
506-
self.rmsMonitoringReporter.addRecord(self.createRMSRecord(status, len(self.operation)))
507-
self.rmsMonitoringReporter.commit()
508-
return bannedTargets
509-
510-
if bannedTargets["Value"]:
511-
self.operation.Error = f"{','.join(bannedTargets['Value'])} targets are banned for writing"
512-
return S_OK(self.operation.Error)
513512

514513
# Can continue now
515514
self.log.verbose("No targets banned for writing")

0 commit comments

Comments
 (0)