Skip to content

Commit 9ad37e1

Browse files
authored
Merge pull request #6794 from chaen/v8.0_FIX_speadupOperations
[8.0] Speed up ReplicateAndRegister operation
2 parents 078603d + cb9c374 commit 9ad37e1

File tree

5 files changed

+770
-19
lines changed

5 files changed

+770
-19
lines changed

pyproject.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@ exclude = [
2828
'/tests/'
2929
]
3030

31+
[tool.pylint.basic]
32+
33+
# We mostly have CamelCase, with a few differences.
34+
# In tests we have quite some snake_case, mostly due to pytest
35+
# We can instruct pylint to understand both, but the problem is that it
36+
# will stick to one style per file (i.e if the first variable is snake,
37+
# all the following must be snake)
38+
# It's not quite the case yet...
39+
# For the time being, I wrote the regex that matches best our code.
40+
# (except for the services with their export_ and types_ ...)
41+
# We will see about tests later...
42+
# See https://pylint.readthedocs.io/en/latest/user_guide/messages/convention/invalid-name.html#multiple-naming-styles-for-custom-regular-expressions
43+
44+
# Camel case with capital letter first
45+
class-rgx = '([A-Z][a-z]*)+$'
46+
module-rgx = '([A-Z][a-z]*)+$'
47+
48+
# Attributes, variables, functions and methods
49+
# are camelCase, but can start with one or two understcore
50+
attr-rgx = '(?:_*[a-z]+([A-Z][a-z]*)*)$'
51+
variable-rgx = '(?:_*[a-z]+([A-Z][a-z]*)*)$'
52+
function-rgx = '(?:_*[a-z]+([A-Z][a-z]*)*)$'
53+
method-rgx = '(?:_*[a-z]+([A-Z][a-z]*)*)$'
54+
55+
argument-naming-style = "camelCase"
3156

3257
[tool.pylint.main]
3358
# Files or directories to be skipped. They should be base names, not paths.

src/DIRAC/DataManagementSystem/Agent/RequestOperations/RegisterReplica.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,17 @@ def __call__(self):
5555
# # loop over files
5656
registerOperations = {}
5757
successReplicas = 0
58+
59+
targetSE = self.operation.targetSEList[0]
60+
replicaTuples = [(opFile.LFN, opFile.PFN, targetSE) for opFile in waitingFiles]
61+
62+
registerReplica = self.dm.registerReplica(replicaTuples, catalogs)
63+
5864
for opFile in waitingFiles:
5965
# # get LFN
6066
lfn = opFile.LFN
6167
# # and others
62-
targetSE = self.operation.targetSEList[0]
63-
replicaTuple = (lfn, opFile.PFN, targetSE)
64-
# # call ReplicaManager
65-
registerReplica = self.dm.registerReplica(replicaTuple, catalogs)
68+
6669
# # check results
6770
if not registerReplica["OK"] or lfn in registerReplica["Value"]["Failed"]:
6871
# There have been some errors

src/DIRAC/DataManagementSystem/Agent/RequestOperations/ReplicateAndRegister.py

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,15 @@
4646
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
4747

4848

49-
def filterReplicas(opFile, logger=None, dataManager=None, opSources=None):
50-
"""filter out banned/invalid source SEs"""
49+
def filterReplicas(opFile, logger=None, dataManager=None, opSources=None, activeReplicas=None):
50+
"""filter out banned/invalid source SEs
51+
52+
:param list opSources: list of SE names to which limit the possible sources
53+
:param dict activeReplicas: the result of dm.getActiveReplicas(*)["Value"]. Used as a cache
54+
55+
:returns: Valid list of SEs valid as source
56+
57+
"""
5158

5259
if logger is None:
5360
logger = gLogger
@@ -57,19 +64,21 @@ def filterReplicas(opFile, logger=None, dataManager=None, opSources=None):
5764
log = logger.getSubLogger("filterReplicas")
5865
result = defaultdict(list)
5966

60-
replicas = dataManager.getActiveReplicas(opFile.LFN, getUrl=False, preferDisk=True)
61-
if not replicas["OK"]:
62-
log.error("Failed to get active replicas", replicas["Message"])
63-
return replicas
67+
if not activeReplicas:
68+
res = dataManager.getActiveReplicas(opFile.LFN, getUrl=False, preferDisk=True)
69+
if not res["OK"]:
70+
log.error("Failed to get active replicas", res["Message"])
71+
return res
72+
activeReplicas = res["Value"]
73+
6474
reNotExists = re.compile(r".*such file.*")
65-
replicas = replicas["Value"]
66-
failed = replicas["Failed"].get(opFile.LFN, "")
75+
failed = activeReplicas["Failed"].get(opFile.LFN, "")
6776
if reNotExists.match(failed.lower()):
6877
opFile.Status = "Failed"
6978
opFile.Error = failed
7079
return S_ERROR(failed)
7180

72-
replicas = replicas["Successful"].get(opFile.LFN, {})
81+
replicas = activeReplicas["Successful"].get(opFile.LFN, {})
7382

7483
# If user set sourceSEs, only consider those replicas
7584
if opSources:
@@ -253,9 +262,15 @@ def _addMetadataToFiles(self, toSchedule):
253262

254263
return S_OK(filesToSchedule)
255264

256-
def _filterReplicas(self, opFile):
265+
def _filterReplicas(self, opFile, activeReplicas):
257266
"""filter out banned/invalid source SEs"""
258-
return filterReplicas(opFile, logger=self.log, dataManager=self.dm, opSources=self.operation.sourceSEList)
267+
return filterReplicas(
268+
opFile,
269+
logger=self.log,
270+
dataManager=self.dm,
271+
opSources=self.operation.sourceSEList,
272+
activeReplicas=activeReplicas,
273+
)
259274

260275
def _checkExistingFTS3Operations(self):
261276
"""
@@ -342,13 +357,22 @@ def fts3Transfer(self):
342357
if self.rmsMonitoring:
343358
self.rmsMonitoringReporter.addRecord(self.createRMSRecord("Attempted", len(self.getWaitingFilesList())))
344359

345-
for opFile in self.getWaitingFilesList():
360+
waitingFiles = self.getWaitingFilesList()
361+
362+
allLFNs = [opFile.LFN for opFile in waitingFiles]
363+
res = self.dm.getActiveReplicas(allLFNs, getUrl=False, preferDisk=True)
364+
if not res["OK"]:
365+
self.log.error("Failed to get active replicas", res["Message"])
366+
return res
367+
allActiveReplicas = res["Value"]
368+
369+
for opFile in waitingFiles:
346370
rmsFilesIds[opFile.FileID] = opFile
347371

348372
opFile.Error = ""
349373

350374
# # check replicas
351-
replicas = self._filterReplicas(opFile)
375+
replicas = self._filterReplicas(opFile, allActiveReplicas)
352376
if not replicas["OK"]:
353377
continue
354378
replicas = replicas["Value"]
@@ -504,6 +528,13 @@ def dmTransfer(self, fromFTS=False):
504528
if self.rmsMonitoring:
505529
self.rmsMonitoringReporter.addRecord(self.createRMSRecord("Attempted", len(waitingFiles)))
506530

531+
allLFNs = [opFile.LFN for opFile in waitingFiles]
532+
res = self.dm.getActiveReplicas(allLFNs, getUrl=False, preferDisk=True)
533+
if not res["OK"]:
534+
self.log.error("Failed to get active replicas", res["Message"])
535+
return res
536+
allActiveReplicas = res["Value"]
537+
507538
for opFile in waitingFiles:
508539
if opFile.Error in (
509540
"Couldn't get metadata",
@@ -518,7 +549,7 @@ def dmTransfer(self, fromFTS=False):
518549
lfn = opFile.LFN
519550

520551
# Check if replica is at the specified source
521-
replicas = self._filterReplicas(opFile)
552+
replicas = self._filterReplicas(opFile, allActiveReplicas)
522553
if not replicas["OK"]:
523554
self.log.error("Failed to check replicas", replicas["Message"])
524555
continue

0 commit comments

Comments
 (0)