|
1 |
| -""" TransformationAgent processes transformations found in the transformation database. |
| 1 | +"""TransformationAgent processes transformations found in the transformation database. |
2 | 2 |
|
3 | 3 | The following options can be set for the TransformationAgent.
|
4 | 4 |
|
|
8 | 8 | :dedent: 2
|
9 | 9 | :caption: TransformationAgent options
|
10 | 10 | """
|
| 11 | + |
11 | 12 | from importlib import import_module
|
12 | 13 |
|
13 | 14 | import time
|
14 | 15 | import os
|
15 | 16 | import datetime
|
16 | 17 | import pickle
|
17 | 18 | import concurrent.futures
|
| 19 | +from pathlib import Path |
18 | 20 |
|
19 | 21 | from DIRAC import S_OK, S_ERROR
|
20 | 22 | from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
|
@@ -127,6 +129,9 @@ def execute(self):
|
127 | 129 | if not res["OK"]:
|
128 | 130 | self._logError("Failed to obtain transformations:", res["Message"])
|
129 | 131 | return S_OK()
|
| 132 | + |
| 133 | + active_trans_ids = [t["TransformationID"] for t in res["Value"]] |
| 134 | + self.cleanOldTransformationCache(active_trans_ids) |
130 | 135 | # Process the transformations
|
131 | 136 | count = 0
|
132 | 137 | future_to_transID = {}
|
@@ -164,6 +169,22 @@ def execute(self):
|
164 | 169 |
|
165 | 170 | return S_OK()
|
166 | 171 |
|
| 172 | + def cleanOldTransformationCache(self, active_trans_ids: list[int]): |
| 173 | + cache_filenames = {Path(self.__cacheFile(tid)) for tid in active_trans_ids} |
| 174 | + existing_caches = set(Path(self.workDirectory).glob("*.pkl")) |
| 175 | + useless_cache_files = existing_caches - cache_filenames |
| 176 | + |
| 177 | + if useless_cache_files: |
| 178 | + self._logInfo(f"Found potentially {len(useless_cache_files)} useless cache files") |
| 179 | + |
| 180 | + # Since idle transformations aren't in active_trans_ids, let's filter it more |
| 181 | + # and take only files that haven't been touched for 2 month |
| 182 | + last_update_threshold = (datetime.datetime.utcnow() - datetime.timedelta(days=60)).timestamp() |
| 183 | + |
| 184 | + for cache_file in useless_cache_files: |
| 185 | + if Path(cache_file).stat().st_mtime < last_update_threshold: |
| 186 | + cache_file.unlink() |
| 187 | + |
167 | 188 | def getTransformations(self):
|
168 | 189 | """Obtain the transformations to be executed - this is executed at the start of every loop (it's really the
|
169 | 190 | only real thing in the execute()
|
|
0 commit comments