Skip to content

Commit 4d153b5

Browse files
authored
Merge pull request #8402 from chaen/v9.0_feat_speedupGetReplicas
feat (DataManager): batch getReplicas at the FC level for speedup
2 parents 83e0ec6 + d61c9ba commit 4d153b5

File tree

2 files changed

+23
-17
lines changed

2 files changed

+23
-17
lines changed

src/DIRAC/DataManagementSystem/Client/DataManager.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from DIRAC.Core.Utilities import DErrno
2323
from DIRAC.Core.Utilities.Adler import fileAdler, compareAdler
2424
from DIRAC.Core.Utilities.File import makeGuid, getSize
25-
from DIRAC.Core.Utilities.List import randomize, breakListIntoChunks
25+
from DIRAC.Core.Utilities.List import randomize
2626
from DIRAC.Core.Utilities.ReturnValues import returnSingleResult
2727
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
2828
from DIRAC.Core.Security.ProxyInfo import getVOfromProxyGroup
@@ -1677,18 +1677,15 @@ def getReplicas(
16771677
"""get replicas from catalogue and filter if requested
16781678
Warning: all filters are independent, hence active and preferDisk should be set if using forJobs
16791679
"""
1680-
catalogReplicas = {}
1681-
failed = {}
1680+
16821681
if not protocol:
16831682
protocol = self.registrationProtocol
16841683

1685-
for lfnChunk in breakListIntoChunks(lfns, 1000):
1686-
res = self.fileCatalog.getReplicas(lfnChunk, allStatus=allStatus)
1687-
if res["OK"]:
1688-
catalogReplicas.update(res["Value"]["Successful"])
1689-
failed.update(res["Value"]["Failed"])
1690-
else:
1691-
return res
1684+
res = self.fileCatalog.getReplicas(lfns, allStatus=allStatus)
1685+
if not res["OK"]:
1686+
return res
1687+
catalogReplicas = res["Value"]["Successful"]
1688+
failed = res["Value"]["Failed"]
16921689
if not getUrl:
16931690
for lfn in catalogReplicas:
16941691
catalogReplicas[lfn] = dict.fromkeys(catalogReplicas[lfn], True)

src/DIRAC/Resources/Catalog/FileCatalogClient.py

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
""" The FileCatalogClient is a class representing the client of the DIRAC File Catalog
2-
"""
1+
"""The FileCatalogClient is a class representing the client of the DIRAC File Catalog"""
2+
33
import json
44
import os
55

66
from DIRAC import S_OK, S_ERROR
7+
from DIRAC.Core.Utilities.List import breakListIntoChunks
78
from DIRAC.Core.Tornado.Client.ClientSelector import TransferClientSelector as TransferClient
89

910
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOMSAttributeForGroup, getDNForUsername
1011
from DIRAC.Resources.Catalog.Utilities import checkCatalogArguments
1112
from DIRAC.Resources.Catalog.FileCatalogClientBase import FileCatalogClientBase
1213

14+
GET_REPLICAS_CHUNK_SIZE = 10_000
15+
1316

1417
class FileCatalogClient(FileCatalogClientBase):
1518
"""Client code to the DIRAC File Catalogue"""
@@ -135,14 +138,20 @@ def __init__(self, url=None, **kwargs):
135138
@checkCatalogArguments
136139
def getReplicas(self, lfns, allStatus=False, timeout=120):
137140
"""Get the replicas of the given files"""
138-
rpcClient = self._getRPC(timeout=timeout)
139-
result = rpcClient.getReplicas(lfns, allStatus)
141+
successful = {}
142+
failed = {}
140143

141-
if not result["OK"]:
142-
return result
144+
for chunk in breakListIntoChunks(lfns, GET_REPLICAS_CHUNK_SIZE):
145+
rpcClient = self._getRPC(timeout=timeout)
146+
result = rpcClient.getReplicas(chunk, allStatus)
147+
148+
if not result["OK"]:
149+
return result
150+
successful.update(result["Value"]["Successful"])
151+
failed.update(result["Value"]["Failed"])
143152

144153
# If there is no PFN returned, just set the LFN instead
145-
lfnDict = result["Value"]
154+
lfnDict = {"Successful": successful, "Failed": failed}
146155
for lfn in lfnDict["Successful"]:
147156
for se in lfnDict["Successful"][lfn]:
148157
if not lfnDict["Successful"][lfn][se]:

0 commit comments

Comments
 (0)