Skip to content

Commit e1f09e7

Browse files
committed
feat (ReplicateAndRegister): add activeReplicas cache
1 parent 78ebe2a commit e1f09e7

File tree

2 files changed

+425
-188
lines changed

2 files changed

+425
-188
lines changed

src/DIRAC/DataManagementSystem/Agent/RequestOperations/ReplicateAndRegister.py

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@
4646
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
4747

4848

49-
def filterReplicas(opFile, logger=None, dataManager=None, opSources=None):
49+
def filterReplicas(opFile, logger=None, dataManager=None, opSources=None, activeReplicas=None):
5050
"""filter out banned/invalid source SEs
5151
52-
:param opSources list: list of SE names to which limit the possible sources
52+
:param list opSources: list of SE names to which limit the possible sources
53+
:param dict activeReplicas: the result of dm.getActiveReplicas(*)["Value"]. Used as a cache
5354
5455
:returns: Valid list of SEs valid as source
5556
@@ -63,19 +64,21 @@ def filterReplicas(opFile, logger=None, dataManager=None, opSources=None):
6364
log = logger.getSubLogger("filterReplicas")
6465
result = defaultdict(list)
6566

66-
replicas = dataManager.getActiveReplicas(opFile.LFN, getUrl=False, preferDisk=True)
67-
if not replicas["OK"]:
68-
log.error("Failed to get active replicas", replicas["Message"])
69-
return replicas
67+
if not activeReplicas:
68+
res = dataManager.getActiveReplicas(opFile.LFN, getUrl=False, preferDisk=True)
69+
if not res["OK"]:
70+
log.error("Failed to get active replicas", res["Message"])
71+
return res
72+
activeReplicas = res["Value"]
73+
7074
reNotExists = re.compile(r".*such file.*")
71-
replicas = replicas["Value"]
72-
failed = replicas["Failed"].get(opFile.LFN, "")
75+
failed = activeReplicas["Failed"].get(opFile.LFN, "")
7376
if reNotExists.match(failed.lower()):
7477
opFile.Status = "Failed"
7578
opFile.Error = failed
7679
return S_ERROR(failed)
7780

78-
replicas = replicas["Successful"].get(opFile.LFN, {})
81+
replicas = activeReplicas["Successful"].get(opFile.LFN, {})
7982

8083
# If user set sourceSEs, only consider those replicas
8184
if opSources:
@@ -259,9 +262,15 @@ def _addMetadataToFiles(self, toSchedule):
259262

260263
return S_OK(filesToSchedule)
261264

262-
def _filterReplicas(self, opFile):
265+
def _filterReplicas(self, opFile, activeReplicas):
263266
"""filter out banned/invalid source SEs"""
264-
return filterReplicas(opFile, logger=self.log, dataManager=self.dm, opSources=self.operation.sourceSEList)
267+
return filterReplicas(
268+
opFile,
269+
logger=self.log,
270+
dataManager=self.dm,
271+
opSources=self.operation.sourceSEList,
272+
activeReplicas=activeReplicas,
273+
)
265274

266275
def _checkExistingFTS3Operations(self):
267276
"""
@@ -348,13 +357,22 @@ def fts3Transfer(self):
348357
if self.rmsMonitoring:
349358
self.rmsMonitoringReporter.addRecord(self.createRMSRecord("Attempted", len(self.getWaitingFilesList())))
350359

351-
for opFile in self.getWaitingFilesList():
360+
waitingFiles = self.getWaitingFilesList()
361+
362+
allLFNs = [opFile.LFN for opFile in waitingFiles]
363+
res = self.dm.getActiveReplicas(allLFNs, getUrl=False, preferDisk=True)
364+
if not res["OK"]:
365+
self.log.error("Failed to get active replicas", res["Message"])
366+
return res
367+
allActiveReplicas = res["Value"]
368+
369+
for opFile in waitingFiles:
352370
rmsFilesIds[opFile.FileID] = opFile
353371

354372
opFile.Error = ""
355373

356374
# # check replicas
357-
replicas = self._filterReplicas(opFile)
375+
replicas = self._filterReplicas(opFile, allActiveReplicas)
358376
if not replicas["OK"]:
359377
continue
360378
replicas = replicas["Value"]
@@ -510,6 +528,13 @@ def dmTransfer(self, fromFTS=False):
510528
if self.rmsMonitoring:
511529
self.rmsMonitoringReporter.addRecord(self.createRMSRecord("Attempted", len(waitingFiles)))
512530

531+
allLFNs = [opFile.LFN for opFile in waitingFiles]
532+
res = self.dm.getActiveReplicas(allLFNs, getUrl=False, preferDisk=True)
533+
if not res["OK"]:
534+
self.log.error("Failed to get active replicas", res["Message"])
535+
return res
536+
allActiveReplicas = res["Value"]
537+
513538
for opFile in waitingFiles:
514539
if opFile.Error in (
515540
"Couldn't get metadata",
@@ -524,7 +549,7 @@ def dmTransfer(self, fromFTS=False):
524549
lfn = opFile.LFN
525550

526551
# Check if replica is at the specified source
527-
replicas = self._filterReplicas(opFile)
552+
replicas = self._filterReplicas(opFile, allActiveReplicas)
528553
if not replicas["OK"]:
529554
self.log.error("Failed to check replicas", replicas["Message"])
530555
continue

0 commit comments

Comments
 (0)