Skip to content

Commit 608d57f

Browse files
committed
feat: created a utility function getAvailableRAM
1 parent 66b0165 commit 608d57f

File tree

3 files changed

+71
-41
lines changed

3 files changed

+71
-41
lines changed

src/DIRAC/Resources/Computing/ComputingElement.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import (
5858
getNumberOfProcessors,
5959
getNumberOfGPUs,
60+
getAvailableRAM,
6061
)
6162

6263
INTEGER_PARAMETERS = ["CPUTime", "NumberOfProcessors", "NumberOfPayloadProcessors", "MaxRAM"]
@@ -235,12 +236,14 @@ def setParameters(self, ceOptions):
235236
generalCEDict.update(self.ceParameters)
236237
self.ceParameters = generalCEDict
237238

238-
# If NumberOfProcessors/GPUs is present in the description but is equal to zero
239+
# If NumberOfProcessors/GPUs/RAM is present in the description but is equal to zero
239240
# interpret it as needing local evaluation
240241
if self.ceParameters.get("NumberOfProcessors", -1) == 0:
241242
self.ceParameters["NumberOfProcessors"] = getNumberOfProcessors()
242243
if self.ceParameters.get("NumberOfGPUs", -1) == 0:
243244
self.ceParameters["NumberOfGPUs"] = getNumberOfGPUs()
245+
if self.ceParameters.get("RAM", -1) == 0:
246+
self.ceParameters["RAM"] = getAvailableRAM()
244247

245248
for key in ceOptions:
246249
if key in INTEGER_PARAMETERS:

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 64 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,31 +34,7 @@ def getJobFeatures():
3434
return features
3535

3636

37-
def getProcessorFromMJF():
38-
jobFeatures = getJobFeatures()
39-
if jobFeatures:
40-
try:
41-
return int(jobFeatures["allocated_cpu"])
42-
except KeyError:
43-
gLogger.error(
44-
"MJF is available but allocated_cpu is not an integer", repr(jobFeatures.get("allocated_cpu"))
45-
)
46-
return None
47-
48-
49-
def getMemoryFromMJF():
50-
jobFeatures = getJobFeatures()
51-
if jobFeatures:
52-
try:
53-
return int(jobFeatures["max_rss_bytes"])
54-
except KeyError:
55-
gLogger.error(
56-
"MJF is available but max_rss_bytes is not an integer", repr(jobFeatures.get("max_rss_bytes"))
57-
)
58-
return None
59-
60-
61-
def getMemoryFromProc():
37+
def _getMemoryFromProc():
6238
meminfo = {i.split()[0].rstrip(":"): int(i.split()[1]) for i in open("/proc/meminfo").readlines()}
6339
maxRAM = meminfo["MemTotal"]
6440
if maxRAM:
@@ -72,7 +48,6 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None):
7248
7349
Tries to find it in this order:
7450
1) from the /Resources/Computing/CEDefaults/NumberOfProcessors (which is what the pilot fills up)
75-
2) if not present from JobFeatures
7651
3) if not present looks in CS for "NumberOfProcessors" Queue or CE option
7752
4) if not present but there's WholeNode tag, look what the WN provides using multiprocessing.cpu_count()
7853
5) return 1
@@ -84,14 +59,7 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None):
8459
if numberOfProcessors:
8560
return numberOfProcessors
8661

87-
# 2) from MJF
88-
gLogger.info("Getting numberOfProcessors from MJF")
89-
numberOfProcessors = getProcessorFromMJF()
90-
if numberOfProcessors:
91-
return numberOfProcessors
92-
gLogger.info("NumberOfProcessors could not be found in MJF")
93-
94-
# 3) looks in CS for "NumberOfProcessors" Queue or CE or site option
62+
# 2) looks in CS for "NumberOfProcessors" Queue or CE or site option
9563
if not siteName:
9664
siteName = gConfig.getValue("/LocalSite/Site", "")
9765
if not gridCE:
@@ -237,3 +205,65 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None):
237205
# 3) return 0
238206
gLogger.info("NumberOfGPUs could not be found in CS")
239207
return 0
208+
209+
210+
def getAvailableRAM(siteName=None, gridCE=None, queue=None):
211+
"""Gets the available RAM on a certain CE/queue/node (what the pilot administers)
212+
213+
The siteName/gridCE/queue parameters are normally not necessary.
214+
215+
Tries to find it in this order:
216+
1) from the /Resources/Computing/CEDefaults/AvailableRAM (which is what the pilot might fill up)
217+
2) if not present looks in CS for "AvailableRAM" Queue or CE option
218+
3) if not present but there's WholeNode tag, look what the WN provides using _getMemoryFromProc()
219+
4) return 0
220+
"""
221+
222+
# 1) from /Resources/Computing/CEDefaults/MaxRAM
223+
gLogger.info("Getting MaxRAM from /Resources/Computing/CEDefaults/MaxRAM")
224+
availableRAM = gConfig.getValue("/Resources/Computing/CEDefaults/MaxRAM", None)
225+
if availableRAM:
226+
return availableRAM
227+
228+
# 2) looks in CS for "MaxRAM" Queue or CE or site option
229+
if not siteName:
230+
siteName = gConfig.getValue("/LocalSite/Site", "")
231+
if not gridCE:
232+
gridCE = gConfig.getValue("/LocalSite/GridCE", "")
233+
if not queue:
234+
queue = gConfig.getValue("/LocalSite/CEQueue", "")
235+
if not (siteName and gridCE and queue):
236+
gLogger.error("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0")
237+
return 0
238+
239+
grid = siteName.split(".")[0]
240+
csPaths = [
241+
f"/Resources/Sites/{grid}/{siteName}/CEs/{gridCE}/Queues/{queue}/MaxRAM",
242+
f"/Resources/Sites/{grid}/{siteName}/CEs/{gridCE}/MaxRAM",
243+
f"/Resources/Sites/{grid}/{siteName}/MaxRAM",
244+
]
245+
for csPath in csPaths:
246+
gLogger.info("Looking in", csPath)
247+
availableRAM = gConfig.getValue(csPath, None)
248+
if availableRAM:
249+
return availableRAM
250+
251+
# 3) checks if 'WholeNode' is one of the used tags
252+
# Tags of the CE
253+
tags = fromChar(
254+
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Tag", "")
255+
) + fromChar(gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/Tag", ""))
256+
# Tags of the Queue
257+
tags += fromChar(
258+
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues/{queue}/Tag", "")
259+
) + fromChar(
260+
gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/VMTypes/{queue}/Tag", "")
261+
)
262+
263+
if "WholeNode" in tags:
264+
gLogger.info("Found WholeNode tag, using _getMemoryFromProc()")
265+
return _getMemoryFromProc()
266+
267+
# 4) return 0
268+
gLogger.info("AvailableRAM could not be found in CS, and WholeNode tag not found")
269+
return 0

src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,12 @@ def main():
3939
gLogger.info("Getting number of processors")
4040
numberOfProcessor = JobParameters.getNumberOfProcessors(Site, ceName, Queue)
4141

42-
gLogger.info("Getting memory (RAM) from MJF")
43-
maxRAM = JobParameters.getMemoryFromMJF()
44-
if not maxRAM:
45-
gLogger.info("maxRAM could not be found in MJF, using JobParameters.getMemoryFromProc()")
46-
maxRAM = JobParameters.getMemoryFromProc()
47-
4842
gLogger.info("Getting number of GPUs")
4943
numberOfGPUs = JobParameters.getNumberOfGPUs(Site, ceName, Queue)
5044

45+
gLogger.info("Getting maximum RAM")
46+
maxRAM = JobParameters.getAvailableRAM(Site, ceName, Queue)
47+
5148
# just communicating it back
5249
gLogger.notice(" ".join(str(wnPar) for wnPar in [numberOfProcessor, maxRAM, numberOfGPUs]))
5350

0 commit comments

Comments
 (0)