Skip to content

Commit 6e4c9ec

Browse files
authored
Merge pull request #7943 from fstagni/80_TwoLevelCache
[8.0] Some refactoring
2 parents dd7065e + 53819ea commit 6e4c9ec

File tree

2 files changed

+154
-154
lines changed

2 files changed

+154
-154
lines changed

src/DIRAC/Core/Utilities/DictCache.py

Lines changed: 105 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
"""
2-
DictCache.
2+
DictCache and TwoLevelCache
33
"""
44
import datetime
55
import threading
66
import weakref
7+
from collections import defaultdict
8+
from collections.abc import Callable
9+
from concurrent.futures import Future, ThreadPoolExecutor, wait
10+
from typing import Any
11+
12+
from cachetools import TTLCache
713

814
# DIRAC
915
from DIRAC.Core.Utilities.LockRing import LockRing
@@ -249,3 +255,101 @@ def _purgeAll(lock, cache, deleteFunction):
249255
finally:
250256
if lock:
251257
lock.release()
258+
259+
260+
class TwoLevelCache:
261+
"""A two-level caching system with soft and hard time-to-live (TTL) expiration.
262+
263+
This cache implements a two-tier caching mechanism to allow for background refresh
264+
of cached values. It uses a soft TTL for quick access and a hard TTL as a fallback,
265+
which helps in reducing latency and maintaining data freshness.
266+
267+
Attributes:
268+
soft_cache (TTLCache): A cache with a shorter TTL for quick access.
269+
hard_cache (TTLCache): A cache with a longer TTL as a fallback.
270+
locks (defaultdict): Thread-safe locks for each cache key.
271+
futures (dict): Stores ongoing asynchronous population tasks.
272+
pool (ThreadPoolExecutor): Thread pool for executing cache population tasks.
273+
274+
Args:
275+
soft_ttl (int): Time-to-live in seconds for the soft cache.
276+
hard_ttl (int): Time-to-live in seconds for the hard cache.
277+
max_workers (int): Maximum number of workers in the thread pool.
278+
max_items (int): Maximum number of items in the cache.
279+
280+
Example:
281+
>>> cache = TwoLevelCache(soft_ttl=60, hard_ttl=300)
282+
>>> def populate_func():
283+
... return "cached_value"
284+
>>> value = cache.get("key", populate_func)
285+
286+
Note:
287+
The cache uses a ThreadPoolExecutor with a maximum of 10 workers to
288+
handle concurrent cache population requests.
289+
"""
290+
291+
def __init__(self, soft_ttl: int, hard_ttl: int, *, max_workers: int = 10, max_items: int = 1_000_000):
292+
"""Initialize the TwoLevelCache with specified TTLs."""
293+
self.soft_cache = TTLCache(max_items, soft_ttl)
294+
self.hard_cache = TTLCache(max_items, hard_ttl)
295+
self.locks = defaultdict(threading.Lock)
296+
self.futures: dict[str, Future] = {}
297+
self.pool = ThreadPoolExecutor(max_workers=max_workers)
298+
299+
def get(self, key: str, populate_func: Callable[[], Any]) -> dict:
300+
"""Retrieve a value from the cache, populating it if necessary.
301+
302+
This method first checks the soft cache for the key. If not found,
303+
it checks the hard cache while initiating a background refresh.
304+
If the key is not in either cache, it waits for the populate_func
305+
to complete and stores the result in both caches.
306+
307+
Locks are used to ensure there is never more than one concurrent
308+
population task for a given key.
309+
310+
Args:
311+
key (str): The cache key to retrieve or populate.
312+
populate_func (Callable[[], Any]): A function to call to populate the cache
313+
if the key is not found.
314+
315+
Returns:
316+
Any: The cached value associated with the key.
317+
318+
Note:
319+
This method is thread-safe and handles concurrent requests for the same key.
320+
"""
321+
if result := self.soft_cache.get(key):
322+
return result
323+
with self.locks[key]:
324+
if key not in self.futures:
325+
self.futures[key] = self.pool.submit(self._work, key, populate_func)
326+
if result := self.hard_cache.get(key):
327+
self.soft_cache[key] = result
328+
return result
329+
# It is critical that ``future`` is waited for outside of the lock as
330+
# _work aquires the lock before filling the caches. This also means
331+
# we can guarantee that the future has not yet been removed from the
332+
# futures dict.
333+
future = self.futures[key]
334+
wait([future])
335+
return self.hard_cache[key]
336+
337+
def _work(self, key: str, populate_func: Callable[[], Any]) -> None:
338+
"""Internal method to execute the populate_func and update caches.
339+
340+
This method is intended to be run in a separate thread. It calls the
341+
populate_func, stores the result in both caches, and cleans up the
342+
associated future.
343+
344+
Args:
345+
key (str): The cache key to populate.
346+
populate_func (Callable[[], Any]): The function to call to get the value.
347+
348+
Note:
349+
This method is not intended to be called directly by users of the class.
350+
"""
351+
result = populate_func()
352+
with self.locks[key]:
353+
self.futures.pop(key)
354+
self.hard_cache[key] = result
355+
self.soft_cache[key] = result

src/DIRAC/WorkloadManagementSystem/Client/Limiter.py

Lines changed: 49 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -2,121 +2,14 @@
22
33
Utilities and classes here are used by the Matcher
44
"""
5-
import threading
6-
from collections import defaultdict
7-
from collections.abc import Callable
8-
from concurrent.futures import ThreadPoolExecutor, wait, Future
95
from functools import partial
10-
from typing import Any
116

12-
from cachetools import TTLCache
13-
14-
from DIRAC import S_OK, S_ERROR
15-
from DIRAC import gLogger
16-
17-
from DIRAC.Core.Utilities.DictCache import DictCache
18-
from DIRAC.Core.Utilities.DErrno import cmpError, ESECTION
7+
from DIRAC import S_ERROR, S_OK, gLogger
198
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
20-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
9+
from DIRAC.Core.Utilities.DErrno import ESECTION, cmpError
10+
from DIRAC.Core.Utilities.DictCache import DictCache, TwoLevelCache
2111
from DIRAC.WorkloadManagementSystem.Client import JobStatus
22-
23-
24-
class TwoLevelCache:
25-
"""A two-level caching system with soft and hard time-to-live (TTL) expiration.
26-
27-
This cache implements a two-tier caching mechanism to allow for background refresh
28-
of cached values. It uses a soft TTL for quick access and a hard TTL as a fallback,
29-
which helps in reducing latency and maintaining data freshness.
30-
31-
Attributes:
32-
soft_cache (TTLCache): A cache with a shorter TTL for quick access.
33-
hard_cache (TTLCache): A cache with a longer TTL as a fallback.
34-
locks (defaultdict): Thread-safe locks for each cache key.
35-
futures (dict): Stores ongoing asynchronous population tasks.
36-
pool (ThreadPoolExecutor): Thread pool for executing cache population tasks.
37-
38-
Args:
39-
soft_ttl (int): Time-to-live in seconds for the soft cache.
40-
hard_ttl (int): Time-to-live in seconds for the hard cache.
41-
max_workers (int): Maximum number of workers in the thread pool.
42-
max_items (int): Maximum number of items in the cache.
43-
44-
Example:
45-
>>> cache = TwoLevelCache(soft_ttl=60, hard_ttl=300)
46-
>>> def populate_func():
47-
... return "cached_value"
48-
>>> value = cache.get("key", populate_func)
49-
50-
Note:
51-
The cache uses a ThreadPoolExecutor with a maximum of 10 workers to
52-
handle concurrent cache population requests.
53-
"""
54-
55-
def __init__(self, soft_ttl: int, hard_ttl: int, *, max_workers: int = 10, max_items: int = 1_000_000):
56-
"""Initialize the TwoLevelCache with specified TTLs."""
57-
self.soft_cache = TTLCache(max_items, soft_ttl)
58-
self.hard_cache = TTLCache(max_items, hard_ttl)
59-
self.locks = defaultdict(threading.Lock)
60-
self.futures: dict[str, Future] = {}
61-
self.pool = ThreadPoolExecutor(max_workers=max_workers)
62-
63-
def get(self, key: str, populate_func: Callable[[], Any]):
64-
"""Retrieve a value from the cache, populating it if necessary.
65-
66-
This method first checks the soft cache for the key. If not found,
67-
it checks the hard cache while initiating a background refresh.
68-
If the key is not in either cache, it waits for the populate_func
69-
to complete and stores the result in both caches.
70-
71-
Locks are used to ensure there is never more than one concurrent
72-
population task for a given key.
73-
74-
Args:
75-
key (str): The cache key to retrieve or populate.
76-
populate_func (Callable[[], Any]): A function to call to populate the cache
77-
if the key is not found.
78-
79-
Returns:
80-
Any: The cached value associated with the key.
81-
82-
Note:
83-
This method is thread-safe and handles concurrent requests for the same key.
84-
"""
85-
if result := self.soft_cache.get(key):
86-
return result
87-
with self.locks[key]:
88-
if key not in self.futures:
89-
self.futures[key] = self.pool.submit(self._work, key, populate_func)
90-
if result := self.hard_cache.get(key):
91-
self.soft_cache[key] = result
92-
return result
93-
# It is critical that ``future`` is waited for outside of the lock as
94-
# _work aquires the lock before filling the caches. This also means
95-
# we can gaurentee that the future has not yet been removed from the
96-
# futures dict.
97-
future = self.futures[key]
98-
wait([future])
99-
return self.hard_cache[key]
100-
101-
def _work(self, key: str, populate_func: Callable[[], Any]) -> None:
102-
"""Internal method to execute the populate_func and update caches.
103-
104-
This method is intended to be run in a separate thread. It calls the
105-
populate_func, stores the result in both caches, and cleans up the
106-
associated future.
107-
108-
Args:
109-
key (str): The cache key to populate.
110-
populate_func (Callable[[], Any]): The function to call to get the value.
111-
112-
Note:
113-
This method is not intended to be called directly by users of the class.
114-
"""
115-
result = populate_func()
116-
with self.locks[key]:
117-
self.futures.pop(key)
118-
self.hard_cache[key] = result
119-
self.soft_cache[key] = result
12+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
12013

12114

12215
class Limiter:
@@ -152,72 +45,76 @@ def getNegativeCond(self):
15245
orCond = self.condCache.get("GLOBAL")
15346
if orCond:
15447
return orCond
155-
negCond = {}
48+
negativeCondition = {}
49+
15650
# Run Limit
15751
result = self.__opsHelper.getSections(self.__runningLimitSection)
158-
sites = []
159-
if result["OK"]:
160-
sites = result["Value"]
161-
for siteName in sites:
52+
if not result["OK"]:
53+
self.log.error("Issue getting running conditions", result["Message"])
54+
sites_with_running_limits = []
55+
else:
56+
sites_with_running_limits = result["Value"]
57+
self.log.verbose(f"Found running conditions for {len(sites_with_running_limits)} sites")
58+
59+
for siteName in sites_with_running_limits:
16260
result = self.__getRunningCondition(siteName)
16361
if not result["OK"]:
164-
continue
165-
data = result["Value"]
166-
if data:
167-
negCond[siteName] = data
62+
self.log.error("Issue getting running conditions", result["Message"])
63+
running_condition = {}
64+
else:
65+
running_condition = result["Value"]
66+
if running_condition:
67+
negativeCondition[siteName] = running_condition
68+
16869
# Delay limit
169-
result = self.__opsHelper.getSections(self.__matchingDelaySection)
170-
sites = []
171-
if result["OK"]:
172-
sites = result["Value"]
173-
for siteName in sites:
174-
result = self.__getDelayCondition(siteName)
70+
if self.__opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
71+
result = self.__opsHelper.getSections(self.__matchingDelaySection)
17572
if not result["OK"]:
176-
continue
177-
data = result["Value"]
178-
if not data:
179-
continue
180-
if siteName in negCond:
181-
negCond[siteName] = self.__mergeCond(negCond[siteName], data)
73+
self.log.error("Issue getting delay conditions", result["Message"])
74+
sites_with_matching_delay = []
18275
else:
183-
negCond[siteName] = data
76+
sites_with_matching_delay = result["Value"]
77+
self.log.verbose(f"Found delay conditions for {len(sites_with_matching_delay)} sites")
78+
79+
for siteName in sites_with_matching_delay:
80+
delay_condition = self.__getDelayCondition(siteName)
81+
if siteName in negativeCondition:
82+
negativeCondition[siteName] = self.__mergeCond(negativeCondition[siteName], delay_condition)
83+
else:
84+
negativeCondition[siteName] = delay_condition
85+
18486
orCond = []
185-
for siteName in negCond:
186-
negCond[siteName]["Site"] = siteName
187-
orCond.append(negCond[siteName])
87+
for siteName in negativeCondition:
88+
negativeCondition[siteName]["Site"] = siteName
89+
orCond.append(negativeCondition[siteName])
18890
self.condCache.add("GLOBAL", 10, orCond)
18991
return orCond
19092

19193
def getNegativeCondForSite(self, siteName, gridCE=None):
19294
"""Generate a negative query based on the limits set on the site"""
19395
# Check if Limits are imposed onto the site
194-
negativeCond = {}
19596
if self.__opsHelper.getValue("JobScheduling/CheckJobLimits", True):
19697
result = self.__getRunningCondition(siteName)
19798
if not result["OK"]:
19899
self.log.error("Issue getting running conditions", result["Message"])
100+
negativeCond = {}
199101
else:
200102
negativeCond = result["Value"]
201-
self.log.verbose(
202-
"Negative conditions for site", f"{siteName} after checking limits are: {str(negativeCond)}"
203-
)
103+
self.log.verbose(
104+
"Negative conditions for site", f"{siteName} after checking limits are: {str(negativeCond)}"
105+
)
204106

205107
if gridCE:
206108
result = self.__getRunningCondition(siteName, gridCE)
207109
if not result["OK"]:
208110
self.log.error("Issue getting running conditions", result["Message"])
209111
else:
210-
negativeCondCE = result["Value"]
211-
negativeCond = self.__mergeCond(negativeCond, negativeCondCE)
112+
negativeCond = self.__mergeCond(negativeCond, result["Value"])
212113

213114
if self.__opsHelper.getValue("JobScheduling/CheckMatchingDelay", True):
214-
result = self.__getDelayCondition(siteName)
215-
if result["OK"]:
216-
delayCond = result["Value"]
217-
self.log.verbose(
218-
"Negative conditions for site", f"{siteName} after delay checking are: {str(delayCond)}"
219-
)
220-
negativeCond = self.__mergeCond(negativeCond, delayCond)
115+
delayCond = self.__getDelayCondition(siteName)
116+
self.log.verbose("Negative conditions for site", f"{siteName} after delay checking are: {str(delayCond)}")
117+
negativeCond = self.__mergeCond(negativeCond, delayCond)
221118

222119
if negativeCond:
223120
self.log.info("Negative conditions for site", f"{siteName} are: {str(negativeCond)}")
@@ -337,14 +234,14 @@ def updateDelayCounters(self, siteName, jid):
337234
def __getDelayCondition(self, siteName):
338235
"""Get extra conditions allowing matching delay"""
339236
if siteName not in self.delayMem:
340-
return S_OK({})
237+
return {}
341238
lastRun = self.delayMem[siteName].getKeys()
342239
negCond = {}
343240
for attName, attValue in lastRun:
344241
if attName not in negCond:
345242
negCond[attName] = []
346243
negCond[attName].append(attValue)
347-
return S_OK(negCond)
244+
return negCond
348245

349246
def _countsByJobType(self, siteName, attName):
350247
result = self.jobDB.getCounters(
@@ -354,6 +251,5 @@ def _countsByJobType(self, siteName, attName):
354251
)
355252
if not result["OK"]:
356253
return result
357-
data = result["Value"]
358-
data = {k[0][attName]: k[1] for k in data}
254+
data = {k[0][attName]: k[1] for k in result["Value"]}
359255
return data

0 commit comments

Comments
 (0)