Skip to content

Commit 64aa874

Browse files
committed
feat: PoolCE can subdivide RAM
1 parent 46c36bd commit 64aa874

File tree

3 files changed

+82
-20
lines changed

3 files changed

+82
-20
lines changed

src/DIRAC/Interfaces/API/Job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,7 +740,7 @@ def setTag(self, tags):
740740
Example usage:
741741
742742
>>> job = Job()
743-
>>> job.setTag( ['WholeNode','8GB'] )
743+
>>> job.setTag( ['WholeNode','8GB', 16GB_MAX] )
744744
745745
:param tags: single tag string or a list of tags
746746
:type tags: str or python:list

src/DIRAC/Resources/Computing/PoolComputingElement.py

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ def __init__(self, ceUniqueID):
6161
self.taskID = 0
6262
self.processorsPerTask = {}
6363
self.userNumberPerTask = {}
64+
self.ram = 1024 # Default RAM in GB (this is an arbitrary large value in case of no limit)
65+
self.ramPerTask = {}
6466

6567
# This CE will effectively submit to another "Inner"CE
6668
# (by default to the InProcess CE)
@@ -74,6 +76,10 @@ def _reset(self):
7476

7577
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
7678
self.ceParameters["MaxTotalJobs"] = self.processors
79+
max_ram = int(self.ceParameters.get("MaxRAM", 0))
80+
if max_ram > 0:
81+
self.ram = max_ram // 1024 # Convert from MB to GB
82+
self.ceParameters["MaxRAM"] = self.ram
7783
# Indicates that the submission is done asynchronously
7884
# The result is not immediately available
7985
self.ceParameters["AsyncSubmission"] = True
@@ -90,6 +96,16 @@ def getProcessorsInUse(self):
9096
processorsInUse += self.processorsPerTask[future]
9197
return processorsInUse
9298

99+
def getRAMInUse(self):
100+
"""Get the amount of RAM in use by the currently running jobs
101+
102+
:return: amount of RAM in use
103+
"""
104+
ramInUse = 0
105+
for future in self.ramPerTask:
106+
ramInUse += self.ramPerTask[future]
107+
return ramInUse
108+
93109
#############################################################################
94110
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
95111
"""Method to submit job.
@@ -112,15 +128,22 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
112128
self.taskID += 1
113129
return S_OK(taskID)
114130

115-
# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
131+
memoryForJob = self._getMemoryForJobs(kwargs)
132+
if memoryForJob is None:
133+
self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job")
134+
taskID = self.taskID
135+
self.taskID += 1
136+
return S_OK(taskID)
137+
138+
# Now persisting the job limits for later use in pilot.cfg file
116139
cd = ConfigurationData(loadDefaultCFG=False)
117140
res = cd.loadFile("pilot.cfg")
118141
if not res["OK"]:
119142
self.log.error("Could not load pilot.cfg", res["Message"])
120143
else:
121-
# only NumberOfProcessors for now, but RAM (or other stuff) can also be added
122144
jobID = int(kwargs.get("jobDesc", {}).get("jobID", 0))
123145
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/NumberOfProcessors" % jobID, processorsForJob)
146+
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/MaxRAM" % jobID, memoryForJob)
124147
res = cd.dumpLocalCFGToFile("pilot.cfg")
125148
if not res["OK"]:
126149
self.log.error("Could not dump cfg to pilot.cfg", res["Message"])
@@ -132,6 +155,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
132155
# Submission
133156
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
134157
self.processorsPerTask[future] = processorsForJob
158+
self.ramPerTask[future] = memoryForJob
135159
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))
136160

137161
taskID = self.taskID
@@ -178,6 +202,34 @@ def _getProcessorsForJobs(self, kwargs):
178202

179203
return requestedProcessors
180204

205+
def _getMemoryForJobs(self, kwargs):
206+
"""helper function to get the memory that will be allocated for the job
207+
208+
:param kwargs: job parameters
209+
:return: memory in GB or None if not enough memory
210+
"""
211+
212+
# # job requirements
213+
requestedMemory = 0
214+
215+
# If there's a memory tag, this should be considered as the requested memory by the job
216+
for job_args in kwargs:
217+
if job_args.endswith("GB"):
218+
requestedMemory = int(job_args.replace("GB", ""))
219+
break
220+
for job_args in kwargs:
221+
if job_args.endswith("GB_MAX"):
222+
requestedMemory = max(int(job_args.replace("GB_MAX", "")), requestedMemory)
223+
break
224+
225+
# # now check what the slot can provide
226+
# Do we have enough memory?
227+
availableMemory = self.ram - self.getRAMInUse()
228+
if availableMemory < requestedMemory:
229+
return None
230+
231+
return requestedMemory
232+
181233
def finalizeJob(self, taskID, future):
182234
"""Finalize the job by updating the process utilisation counters
183235

src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def createAndDelete():
8383
def test_submit_and_shutdown(createAndDelete):
8484
time.sleep(0.5)
8585

86-
ceParameters = {"WholeNode": True, "NumberOfProcessors": 4}
86+
ceParameters = {"WholeNode": True, "NumberOfProcessors": 4, "MaxRAM": 4}
8787
ce = PoolComputingElement("TestPoolCE")
8888
ce.setParameters(ceParameters)
8989

@@ -371,28 +371,38 @@ def test_executeJob_WholeNodeJobs(createAndDelete):
371371

372372

373373
@pytest.mark.parametrize(
374-
"processorsPerTask, kwargs, expected",
374+
"processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory",
375375
[
376-
(None, {}, 1),
377-
(None, {"mpTag": False}, 1),
378-
(None, {"mpTag": True}, 1),
379-
(None, {"mpTag": True, "wholeNode": True}, 16),
380-
(None, {"mpTag": True, "wholeNode": False}, 1),
381-
(None, {"mpTag": True, "numberOfProcessors": 4}, 4),
382-
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8),
383-
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16),
384-
({1: 4}, {"mpTag": True, "wholeNode": True}, 0),
385-
({1: 4}, {"mpTag": True, "wholeNode": False}, 1),
386-
({1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2),
387-
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2}, 2),
388-
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16}, 12),
376+
(None, None, {}, 1, 0),
377+
(None, None, {"mpTag": False}, 1, 0),
378+
(None, None, {"mpTag": True, "8GB": True}, 1, 8),
379+
(None, None, {"mpTag": True, "wholeNode": True}, 16, 0),
380+
(None, None, {"mpTag": True, "wholeNode": False}, 1, 0),
381+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "4GB": True}, 4, 4),
382+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8, 0),
383+
(None, None, {"mpTag": True, "4GB": True, "8GB_MAX": True}, 1, 8),
384+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16, 0),
385+
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": True}, 0, 0),
386+
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": False}, 1, 0),
387+
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2, "8GB": True}, 2, 8),
388+
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 16, "12GB": True}, 0, 12),
389+
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2, "16GB": True}, 2, 16),
390+
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16, "32GB": True}, 12, None),
391+
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2, 0),
392+
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 4, 0),
393+
({1: 4, 2: 8, 3: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 0, 0),
389394
],
390395
)
391-
def test__getProcessorsForJobs(processorsPerTask, kwargs, expected):
396+
def test__getLimitsForJobs(processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory):
392397
ce = PoolComputingElement("TestPoolCE")
393398
ce.processors = 16
399+
ce.ram = 32
394400

395401
if processorsPerTask:
396402
ce.processorsPerTask = processorsPerTask
403+
if ramPerTask:
404+
ce.ramPerTask = ramPerTask
397405
res = ce._getProcessorsForJobs(kwargs)
398-
assert res == expected
406+
assert res == expected_processors
407+
res = ce._getMemoryForJobs(kwargs)
408+
assert res == expected_memory

0 commit comments

Comments
 (0)