Skip to content

Commit 4949af4

Browse files
authored
Merge pull request #8242 from chaen/v9.0_feat_cleanTransfoCache
[9.0] Clean TransformationAgentCache and speedups
2 parents 1532612 + d9d540c commit 4949af4

File tree

5 files changed

+74
-12
lines changed

5 files changed

+74
-12
lines changed

src/DIRAC/ConfigurationSystem/Client/Helpers/Registry.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,53 @@
1-
""" Helper for /Registry section
2-
"""
1+
"""Helper for /Registry section"""
32

43
import errno
4+
import inspect
5+
import sys
56

67
from threading import Lock
8+
from collections.abc import Iterable
79

810
from cachetools import TTLCache, cached
11+
from cachetools.keys import hashkey
912

1013

14+
from typing import Optional
15+
from collections.abc import Iterable
16+
1117
from DIRAC import S_OK, S_ERROR
1218
from DIRAC.ConfigurationSystem.Client.Config import gConfig
1319
from DIRAC.ConfigurationSystem.Client.Helpers.CSGlobals import getVO
1420

1521
ID_DN_PREFIX = "/O=DIRAC/CN="
1622

23+
# 300 is the default CS refresh time
24+
25+
CACHE_REFRESH_TIME = 300
1726
# pylint: disable=missing-docstring
1827

1928
gBaseRegistrySection = "/Registry"
2029

2130

31+
def reset_all_caches():
32+
"""This method is called to clear all caches.
33+
It is necessary to reinitialize them after the central CS
34+
has been loaded
35+
"""
36+
for cache in [
37+
obj
38+
for name, obj in inspect.getmembers(sys.modules[__name__])
39+
if (inspect.isfunction(obj) and hasattr(obj, "cache_clear"))
40+
]:
41+
cache.cache_clear()
42+
43+
44+
def get_username_for_dn_key(dn: str, userList: Optional[Iterable[str]] = None):
45+
if userList:
46+
return hashkey(dn, *sorted(userList))
47+
return hashkey(dn)
48+
49+
50+
@cached(TTLCache(maxsize=1000, ttl=CACHE_REFRESH_TIME), lock=Lock(), key=get_username_for_dn_key)
2251
def getUsernameForDN(dn, usersList=None):
2352
"""Find DIRAC user for DN
2453
@@ -39,6 +68,7 @@ def getUsernameForDN(dn, usersList=None):
3968
return S_ERROR(f"No username found for dn {dn}")
4069

4170

71+
@cached(TTLCache(maxsize=1000, ttl=CACHE_REFRESH_TIME), lock=Lock())
4272
def getDNForUsername(username):
4373
"""Get user DN for user
4474
@@ -419,6 +449,7 @@ def getBannedIPs():
419449
return gConfig.getValue(f"{gBaseRegistrySection}/BannedIPs", [])
420450

421451

452+
@cached(TTLCache(maxsize=1000, ttl=CACHE_REFRESH_TIME), lock=Lock())
422453
def getVOForGroup(group):
423454
"""Search VO name for group
424455
@@ -634,10 +665,7 @@ def getDNProperty(userDN, value, defaultValue=None):
634665
return S_OK(defaultValue)
635666

636667

637-
_cache_getProxyProvidersForDN = TTLCache(maxsize=1000, ttl=60)
638-
639-
640-
@cached(_cache_getProxyProvidersForDN, lock=Lock())
668+
@cached(TTLCache(maxsize=1000, ttl=CACHE_REFRESH_TIME), lock=Lock())
641669
def getProxyProvidersForDN(userDN):
642670
"""Get proxy providers by user DN
643671

src/DIRAC/ConfigurationSystem/Client/LocalConfiguration.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -568,6 +568,9 @@ def enableCS(self):
568568
objLoader = ObjectLoader()
569569
objLoader.reloadRootModules()
570570
self.__initLogger(self.componentName, self.loggingSection, forceInit=True)
571+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import reset_all_caches
572+
573+
reset_all_caches()
571574
return res
572575

573576
def isCSEnabled(self):

src/DIRAC/Core/Security/ProxyInfo.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
"""
2-
Set of utilities to retrieve Information from proxy
2+
Set of utilities to retrieve Information from proxy
33
"""
4+
45
import base64
56

67
from DIRAC import S_ERROR, S_OK, gLogger
78
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
9+
from DIRAC.ConfigurationSystem.Client.Helpers.CSGlobals import getVO
10+
811
from DIRAC.Core.Security import Locations
912
from DIRAC.Core.Security.DiracX import diracxTokenFromPEM
1013
from DIRAC.Core.Security.VOMS import VOMS
@@ -207,10 +210,11 @@ def getVOfromProxyGroup():
207210
"""
208211
Return the VO associated to the group in the proxy
209212
"""
210-
voName = Registry.getVOForGroup("NoneExistingGroup")
213+
211214
ret = getProxyInfo(disableVOMS=True)
212-
if not ret["OK"]:
213-
return S_OK(voName)
214-
if "group" in ret["Value"]:
215+
if not ret["OK"] or "group" not in ret["Value"]:
216+
voName = getVO()
217+
else:
215218
voName = Registry.getVOForGroup(ret["Value"]["group"])
219+
216220
return S_OK(voName)

src/DIRAC/TransformationSystem/Agent/TransformationAgent.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" TransformationAgent processes transformations found in the transformation database.
1+
"""TransformationAgent processes transformations found in the transformation database.
22
33
The following options can be set for the TransformationAgent.
44
@@ -8,13 +8,15 @@
88
:dedent: 2
99
:caption: TransformationAgent options
1010
"""
11+
1112
from importlib import import_module
1213

1314
import time
1415
import os
1516
import datetime
1617
import pickle
1718
import concurrent.futures
19+
from pathlib import Path
1820

1921
from DIRAC import S_OK, S_ERROR
2022
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
@@ -127,6 +129,9 @@ def execute(self):
127129
if not res["OK"]:
128130
self._logError("Failed to obtain transformations:", res["Message"])
129131
return S_OK()
132+
133+
active_trans_ids = [t["TransformationID"] for t in res["Value"]]
134+
self.cleanOldTransformationCache(active_trans_ids)
130135
# Process the transformations
131136
count = 0
132137
future_to_transID = {}
@@ -164,6 +169,22 @@ def execute(self):
164169

165170
return S_OK()
166171

172+
def cleanOldTransformationCache(self, active_trans_ids: list[int]):
173+
cache_filenames = {Path(self.__cacheFile(tid)) for tid in active_trans_ids}
174+
existing_caches = set(Path(self.workDirectory).glob("*.pkl"))
175+
useless_cache_files = existing_caches - cache_filenames
176+
177+
if useless_cache_files:
178+
self._logInfo(f"Found potentially {len(useless_cache_files)} useless cache files")
179+
180+
# Since idle transformations aren't in active_trans_ids, let's filter it more
181+
# and take only files that haven't been touched for 2 month
182+
last_update_threshold = (datetime.datetime.utcnow() - datetime.timedelta(days=60)).timestamp()
183+
184+
for cache_file in useless_cache_files:
185+
if Path(cache_file).stat().st_mtime < last_update_threshold:
186+
cache_file.unlink()
187+
167188
def getTransformations(self):
168189
"""Obtain the transformations to be executed - this is executed at the start of every loop (it's really the
169190
only real thing in the execute()

src/DIRAC/TransformationSystem/Client/Utilities.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import ast
1010
import random
1111

12+
from cachetools import LRUCache, cached
13+
from cachetools.keys import hashkey
1214
from DIRAC import S_OK, S_ERROR, gLogger
1315

1416
from DIRAC.Core.Utilities.List import breakListIntoChunks
@@ -400,6 +402,10 @@ def isSameSE(self, se1, se2):
400402

401403
return StorageElement(se1).isSameSE(StorageElement(se2))
402404

405+
@cached(
406+
LRUCache(maxsize=1024),
407+
key=lambda _, a, b: hashkey(a, *sorted(b)),
408+
)
403409
def isSameSEInList(self, se1, seList):
404410
"""Check if an SE is the same as any in a list"""
405411
if se1 in seList:

0 commit comments

Comments
 (0)