Skip to content

Commit 6efae4b

Browse files
committed
feat: PoolCE can subdivide RAM
1 parent 46c36bd commit 6efae4b

File tree

3 files changed

+77
-32
lines changed

3 files changed

+77
-32
lines changed

src/DIRAC/Interfaces/API/Job.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,6 +522,19 @@ def setDestination(self, destination):
522522
return S_OK()
523523

524524
#############################################################################
525+
def setRAMRequirements(self, ramRequired: int = 0):
526+
"""Helper function.
527+
Specify the RAM requirements for the job in GB. 0 (default) means no specific requirements.
528+
"""
529+
if ramRequired:
530+
self._addParameter(
531+
self.workflow,
532+
"MaxRAM",
533+
"JDL",
534+
ramRequired,
535+
"GBs of RAM requested",
536+
)
537+
525538
def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=None, maxNumberOfProcessors=None):
526539
"""Helper function.
527540
@@ -740,7 +753,7 @@ def setTag(self, tags):
740753
Example usage:
741754
742755
>>> job = Job()
743-
>>> job.setTag( ['WholeNode','8GB'] )
756+
>>> job.setTag( ['WholeNode','8GB', 16GB_MAX] )
744757
745758
:param tags: single tag string or a list of tags
746759
:type tags: str or python:list

src/DIRAC/Resources/Computing/PoolComputingElement.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ def __init__(self, ceUniqueID):
6161
self.taskID = 0
6262
self.processorsPerTask = {}
6363
self.userNumberPerTask = {}
64+
self.ram = 1024 # Default RAM in GB (this is an arbitrary large value in case of no limit)
65+
self.ramPerTask = {}
6466

6567
# This CE will effectively submit to another "Inner"CE
6668
# (by default to the InProcess CE)
@@ -74,22 +76,16 @@ def _reset(self):
7476

7577
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
7678
self.ceParameters["MaxTotalJobs"] = self.processors
79+
max_ram = int(self.ceParameters.get("MaxRAM", 0))
80+
if max_ram > 0:
81+
self.ram = max_ram // 1024 # Convert from MB to GB
82+
self.ceParameters["MaxRAM"] = self.ram
7783
# Indicates that the submission is done asynchronously
7884
# The result is not immediately available
7985
self.ceParameters["AsyncSubmission"] = True
8086
self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType)
8187
return S_OK()
8288

83-
def getProcessorsInUse(self):
84-
"""Get the number of currently allocated processor cores
85-
86-
:return: number of processors in use
87-
"""
88-
processorsInUse = 0
89-
for future in self.processorsPerTask:
90-
processorsInUse += self.processorsPerTask[future]
91-
return processorsInUse
92-
9389
#############################################################################
9490
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
9591
"""Method to submit job.
@@ -112,15 +108,22 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
112108
self.taskID += 1
113109
return S_OK(taskID)
114110

115-
# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
111+
memoryForJob = self._getMemoryForJobs(kwargs)
112+
if memoryForJob is None:
113+
self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job")
114+
taskID = self.taskID
115+
self.taskID += 1
116+
return S_OK(taskID)
117+
118+
# Now persisting the job limits for later use in pilot.cfg file
116119
cd = ConfigurationData(loadDefaultCFG=False)
117120
res = cd.loadFile("pilot.cfg")
118121
if not res["OK"]:
119122
self.log.error("Could not load pilot.cfg", res["Message"])
120123
else:
121-
# only NumberOfProcessors for now, but RAM (or other stuff) can also be added
122124
jobID = int(kwargs.get("jobDesc", {}).get("jobID", 0))
123125
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/NumberOfProcessors" % jobID, processorsForJob)
126+
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/MaxRAM" % jobID, memoryForJob)
124127
res = cd.dumpLocalCFGToFile("pilot.cfg")
125128
if not res["OK"]:
126129
self.log.error("Could not dump cfg to pilot.cfg", res["Message"])
@@ -132,6 +135,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
132135
# Submission
133136
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
134137
self.processorsPerTask[future] = processorsForJob
138+
self.ramPerTask[future] = memoryForJob
135139
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))
136140

137141
taskID = self.taskID
@@ -141,7 +145,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
141145

142146
def _getProcessorsForJobs(self, kwargs):
143147
"""helper function"""
144-
processorsInUse = self.getProcessorsInUse()
148+
processorsInUse = sum(self.processorsPerTask.values())
145149
availableProcessors = self.processors - processorsInUse
146150

147151
self.log.verbose(
@@ -178,6 +182,24 @@ def _getProcessorsForJobs(self, kwargs):
178182

179183
return requestedProcessors
180184

185+
def _getMemoryForJobs(self, kwargs):
186+
"""helper function to get the memory that will be allocated for the job
187+
188+
:param kwargs: job parameters
189+
:return: memory in GB or None if not enough memory
190+
"""
191+
192+
# # job requirements
193+
requestedMemory = kwargs.get("MaxRAM", 0)
194+
195+
# # now check what the slot can provide
196+
# Do we have enough memory?
197+
availableMemory = self.ram - sum(self.ramPerTask.values())
198+
if availableMemory < requestedMemory:
199+
return None
200+
201+
return requestedMemory
202+
181203
def finalizeJob(self, taskID, future):
182204
"""Finalize the job by updating the process utilisation counters
183205
@@ -209,7 +231,7 @@ def getCEStatus(self):
209231
result["WaitingJobs"] = 0
210232

211233
# dealing with processors
212-
processorsInUse = self.getProcessorsInUse()
234+
processorsInUse = sum(self.processorsPerTask.values())
213235
result["UsedProcessors"] = processorsInUse
214236
result["AvailableProcessors"] = self.processors - processorsInUse
215237
return result

src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def createAndDelete():
8383
def test_submit_and_shutdown(createAndDelete):
8484
time.sleep(0.5)
8585

86-
ceParameters = {"WholeNode": True, "NumberOfProcessors": 4}
86+
ceParameters = {"WholeNode": True, "NumberOfProcessors": 4, "MaxRAM": 4}
8787
ce = PoolComputingElement("TestPoolCE")
8888
ce.setParameters(ceParameters)
8989

@@ -371,28 +371,38 @@ def test_executeJob_WholeNodeJobs(createAndDelete):
371371

372372

373373
@pytest.mark.parametrize(
374-
"processorsPerTask, kwargs, expected",
374+
"processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory",
375375
[
376-
(None, {}, 1),
377-
(None, {"mpTag": False}, 1),
378-
(None, {"mpTag": True}, 1),
379-
(None, {"mpTag": True, "wholeNode": True}, 16),
380-
(None, {"mpTag": True, "wholeNode": False}, 1),
381-
(None, {"mpTag": True, "numberOfProcessors": 4}, 4),
382-
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8),
383-
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16),
384-
({1: 4}, {"mpTag": True, "wholeNode": True}, 0),
385-
({1: 4}, {"mpTag": True, "wholeNode": False}, 1),
386-
({1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2),
387-
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2}, 2),
388-
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16}, 12),
376+
(None, None, {}, 1, 0),
377+
(None, None, {"mpTag": False}, 1, 0),
378+
(None, None, {"mpTag": True, "8GB": True}, 1, 8),
379+
(None, None, {"mpTag": True, "wholeNode": True}, 16, 0),
380+
(None, None, {"mpTag": True, "wholeNode": False}, 1, 0),
381+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "4GB": True}, 4, 4),
382+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8, 0),
383+
(None, None, {"mpTag": True, "4GB": True, "8GB_MAX": True}, 1, 8),
384+
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16, 0),
385+
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": True}, 0, 0),
386+
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": False}, 1, 0),
387+
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2, "8GB": True}, 2, 8),
388+
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 16, "12GB": True}, 0, 12),
389+
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2, "16GB": True}, 2, 16),
390+
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16, "32GB": True}, 12, None),
391+
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2, 0),
392+
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 4, 0),
393+
({1: 4, 2: 8, 3: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 0, 0),
389394
],
390395
)
391-
def test__getProcessorsForJobs(processorsPerTask, kwargs, expected):
396+
def test__getLimitsForJobs(processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory):
392397
ce = PoolComputingElement("TestPoolCE")
393398
ce.processors = 16
399+
ce.ram = 32
394400

395401
if processorsPerTask:
396402
ce.processorsPerTask = processorsPerTask
403+
if ramPerTask:
404+
ce.ramPerTask = ramPerTask
397405
res = ce._getProcessorsForJobs(kwargs)
398-
assert res == expected
406+
assert res == expected_processors
407+
res = ce._getMemoryForJobs(kwargs)
408+
assert res == expected_memory

0 commit comments

Comments
 (0)