Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions docs/source/AdministratorGuide/Resources/computingelements.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,22 @@ of the *ComputingElement* is located inside the corresponding site section in th
# Site administrative domain
LCG
{
# Site section
# Site section. This is the DIRAC's site name.
LCG.CNAF.it
{
# Site name
# Alternative site name (e.g. site name in GOC DB)
Name = CNAF

# List of valid CEs on the site
CE = ce01.infn.it, ce02.infn.it

# Section describing each CE
CEs
{
# Specific CE description section
# Specific CE description section. This site name is unique.
ce01.infn.it
{
# Type of the CE
# Type of the CE. "HTCondorCE" and "AREX" and "SSH" are the most common types.
CEType = HTCondorCE

# Section to describe various queue in the CE
# Section to describe various (logical) queues in the CE.
Queues
{
long
Expand All @@ -93,7 +90,6 @@ of the *ComputingElement* is located inside the corresponding site section in th

This is the general structure in which specific CE descriptions are inserted.
The CE configuration is part of the general DIRAC configuration
It can be placed in the general Configuration Service or in the local configuration of the DIRAC installation.
Examples of the configuration can be found in the :ref:`full_configuration_example`, in the *Resources/Computing* section.
You can find the options of a specific CE in the code documentation: :mod:`DIRAC.Resources.Computing`.

Expand All @@ -114,7 +110,7 @@ configuration.

Interacting with Grid Sites
@@@@@@@@@@@@@@@@@@@@@@@@@@@
The :mod:`~DIRAC.Resources.Computing.HTCondorCEComputingElement` and the :mod:`~DIRAC.Resources.Computing.ARCComputingElement` eases
The :mod:`~DIRAC.Resources.Computing.HTCondorCEComputingElement` and the :mod:`~DIRAC.Resources.Computing.AREXComputingElement` eases
the interactions with grid sites, by managing pilots using the underlying batch systems.
Instances of such CEs are generally setup by the site administrators.

Expand All @@ -132,11 +128,6 @@ The :mod:`~DIRAC.Resources.Computing.CloudComputingElement` allows submission to
(via the standard SiteDirector agent). The instances are contextualised using cloud-init.


Delegating to BOINC (Volunteering Computing)
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
There exists a :mod:`~DIRAC.Resources.Computing.BOINCComputingElement` to submit pilots to a BOINC server.


Computing Elements within allocated computing resources
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
The :mod:`~DIRAC.Resources.Computing.InProcessComputingElement` is usually invoked by a Pilot-Job (JobAgent agent) to execute user
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,6 @@ using the "setNumberOfProcessors" method of the API::
Calling ``Job().setNumberOfProcessors()``, with a value bigger than 1,
will translate into adding also the "MultiProcessor" tag to the job description.

.. versionadded:: v6r20p5

Users can specify in the job descriptions NumberOfProcessors and WholeNode parameters, e.g.::

NumberOfProcessors = 16;
Expand Down
15 changes: 14 additions & 1 deletion src/DIRAC/Interfaces/API/Job.py
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,19 @@ def setDestination(self, destination):
return S_OK()

#############################################################################
def setRAMRequirements(self, ramRequired: int = 0):
"""Helper function.
Specify the RAM requirements for the job in GB. 0 (default) means no specific requirements.
"""
if ramRequired:
self._addParameter(
self.workflow,
"MaxRAM",
"JDL",
ramRequired,
"GBs of RAM requested",
)

def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=None, maxNumberOfProcessors=None):
"""Helper function.

Expand Down Expand Up @@ -740,7 +753,7 @@ def setTag(self, tags):
Example usage:

>>> job = Job()
>>> job.setTag( ['WholeNode','8GBMemory'] )
>>> job.setTag( ['WholeNode','8GB'] )

:param tags: single tag string or a list of tags
:type tags: str or python:list
Expand Down
5 changes: 4 additions & 1 deletion src/DIRAC/Resources/Computing/ComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import (
getNumberOfProcessors,
getNumberOfGPUs,
getAvailableRAM,
)

INTEGER_PARAMETERS = ["CPUTime", "NumberOfProcessors", "NumberOfPayloadProcessors", "MaxRAM"]
Expand Down Expand Up @@ -235,12 +236,14 @@ def setParameters(self, ceOptions):
generalCEDict.update(self.ceParameters)
self.ceParameters = generalCEDict

# If NumberOfProcessors/GPUs is present in the description but is equal to zero
# If NumberOfProcessors/GPUs/RAM is present in the description but is equal to zero
# interpret it as needing local evaluation
if self.ceParameters.get("NumberOfProcessors", -1) == 0:
self.ceParameters["NumberOfProcessors"] = getNumberOfProcessors()
if self.ceParameters.get("NumberOfGPUs", -1) == 0:
self.ceParameters["NumberOfGPUs"] = getNumberOfGPUs()
if self.ceParameters.get("RAM", -1) == 0:
self.ceParameters["RAM"] = getAvailableRAM()

for key in ceOptions:
if key in INTEGER_PARAMETERS:
Expand Down
71 changes: 40 additions & 31 deletions src/DIRAC/Resources/Computing/PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
LocalCEType = Pool

The Pool Computing Element is specific: it embeds an additional "inner" CE
(`InProcess` by default, `Sudo`, `Singularity`). The "inner" CE can be specified such as::
(`InProcess` by default, or `Singularity`). The "inner" CE can be specified such as::

LocalCEType = Pool/Singularity

Expand All @@ -19,24 +19,18 @@

**Code Documentation**
"""
import functools
import os
import concurrent.futures
import functools

from DIRAC import S_OK, S_ERROR
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.private.ConfigurationData import ConfigurationData

from DIRAC.Resources.Computing.ComputingElement import ComputingElement

from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
from DIRAC.Resources.Computing.SingularityComputingElement import SingularityComputingElement

# Number of unix users to run job payloads with sudo
MAX_NUMBER_OF_SUDO_UNIX_USERS = 32


def executeJob(executableFile, proxy, taskID, inputs, **kwargs):
"""wrapper around ce.submitJob: decides which CE to use (Sudo or InProcess or Singularity)
"""wrapper around ce.submitJob: decides which CE to use (InProcess or Singularity)

:param str executableFile: location of the executable file
:param str proxy: proxy file location to be used for job submission
Expand Down Expand Up @@ -67,6 +61,8 @@ def __init__(self, ceUniqueID):
self.taskID = 0
self.processorsPerTask = {}
self.userNumberPerTask = {}
self.ram = 1024 # Default RAM in GB (this is an arbitrary large value in case of no limit)
self.ramPerTask = {}

# This CE will effectively submit to another "Inner"CE
# (by default to the InProcess CE)
Expand All @@ -80,22 +76,16 @@ def _reset(self):

self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
self.ceParameters["MaxTotalJobs"] = self.processors
max_ram = int(self.ceParameters.get("MaxRAM", 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defining this at the CE level isn't going to work very well (e.g. the HLTFarm has several different hardware configurations with wildly different RAM per core).

Can we inspect the host to have a sensible default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already at the "inner CE" level (this is a very common confusion). So, basically it is at the level of the Worker Node.

if max_ram > 0:
self.ram = max_ram // 1024 # Convert from MB to GB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we specify MaxRAM in GB directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The specification comes from BDII2CSAgent. I guess just "historical". Maybe MB should be used everywhere...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see thanks!
Good question, may be it would be easier to have a single unit indeed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be better GB everywhere. BDII2CSAgent can be updated if necessary.

self.ceParameters["MaxRAM"] = self.ram
# Indicates that the submission is done asynchronously
# The result is not immediately available
self.ceParameters["AsyncSubmission"] = True
self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType)
return S_OK()

def getProcessorsInUse(self):
"""Get the number of currently allocated processor cores

:return: number of processors in use
"""
processorsInUse = 0
for future in self.processorsPerTask:
processorsInUse += self.processorsPerTask[future]
return processorsInUse

#############################################################################
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
"""Method to submit job.
Expand All @@ -118,33 +108,34 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
self.taskID += 1
return S_OK(taskID)

# Now persisting the job limits for later use in pilot.cfg file (pilot 3 default)
memoryForJob = self._getMemoryForJobs(kwargs)
if memoryForJob is None:
self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job")
taskID = self.taskID
self.taskID += 1
return S_OK(taskID)

# Now persisting the job limits for later use in pilot.cfg file
cd = ConfigurationData(loadDefaultCFG=False)
res = cd.loadFile("pilot.cfg")
if not res["OK"]:
self.log.error("Could not load pilot.cfg", res["Message"])
else:
# only NumberOfProcessors for now, but RAM (or other stuff) can also be added
jobID = int(kwargs.get("jobDesc", {}).get("jobID", 0))
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/NumberOfProcessors" % jobID, processorsForJob)
cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/MaxRAM" % jobID, memoryForJob)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention to read this value in the innerCE to pass it to the CG2Manager? (It should just be a case of setting MemoryLimitMB in (a copy of) the ceParameters dictionary just before it's given to systemCall).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intention to read this value in the innerCE to pass it to the CG2Manager?

yes.
But I do not fully understand the rest of the phrase.

res = cd.dumpLocalCFGToFile("pilot.cfg")
if not res["OK"]:
self.log.error("Could not dump cfg to pilot.cfg", res["Message"])

# Here we define task kwargs: adding complex objects like thread.Lock can trigger errors in the task
taskKwargs = {"InnerCESubmissionType": self.innerCESubmissionType}
taskKwargs["jobDesc"] = kwargs.get("jobDesc", {})
if self.innerCESubmissionType == "Sudo":
for nUser in range(MAX_NUMBER_OF_SUDO_UNIX_USERS):
if nUser not in self.userNumberPerTask.values():
break
taskKwargs["NUser"] = nUser
if "USER" in os.environ:
taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}"

# Submission
future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs)
self.processorsPerTask[future] = processorsForJob
self.ramPerTask[future] = memoryForJob
future.add_done_callback(functools.partial(self.finalizeJob, self.taskID))

taskID = self.taskID
Expand All @@ -154,7 +145,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):

def _getProcessorsForJobs(self, kwargs):
"""helper function"""
processorsInUse = self.getProcessorsInUse()
processorsInUse = sum(self.processorsPerTask.values())
availableProcessors = self.processors - processorsInUse

self.log.verbose(
Expand Down Expand Up @@ -191,6 +182,24 @@ def _getProcessorsForJobs(self, kwargs):

return requestedProcessors

def _getMemoryForJobs(self, kwargs):
"""helper function to get the memory that will be allocated for the job

:param kwargs: job parameters
:return: memory in GB or None if not enough memory
"""

# # job requirements
requestedMemory = kwargs.get("MaxRAM", 0)

# # now check what the slot can provide
# Do we have enough memory?
availableMemory = self.ram - sum(self.ramPerTask.values())
if availableMemory < requestedMemory:
return None

return requestedMemory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have overlooked something, do we specify a default RAM value for the jobs?

Because if I understand correctly, getRAMInUse() depends on the value returned by _getMemoryForJobs(), which would be 0 if the tag *GB or *GB_MAX is not specified (?).
So if we have n jobs with no "RAM" tag, we will always have getRAMInUse = 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no obvious default value for RAM (for processors we specify 1 which is a slightly more obvious default value). So, yes, as it stands the default value for RAM is 0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RAM and NumberOfProcessors should be treated in exactly the same way. These are resources with integer values, having default values and summing up for jobs on the same node. So, may be common methods (or class) can be introduced for those, may be other tags can be added eventually, e.g. disk space.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having RAM in GBs isn't fine enough for an integer value, having less than 1GB per core is sensible on some systems and jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be made MB instead?


def finalizeJob(self, taskID, future):
"""Finalize the job by updating the process utilisation counters

Expand Down Expand Up @@ -222,7 +231,7 @@ def getCEStatus(self):
result["WaitingJobs"] = 0

# dealing with processors
processorsInUse = self.getProcessorsInUse()
processorsInUse = sum(self.processorsPerTask.values())
result["UsedProcessors"] = processorsInUse
result["AvailableProcessors"] = self.processors - processorsInUse
return result
Expand Down
43 changes: 26 additions & 17 deletions src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def createAndDelete():
def test_submit_and_shutdown(createAndDelete):
time.sleep(0.5)

ceParameters = {"WholeNode": True, "NumberOfProcessors": 4}
ceParameters = {"WholeNode": True, "NumberOfProcessors": 4, "MaxRAM": 4}
ce = PoolComputingElement("TestPoolCE")
ce.setParameters(ceParameters)

Expand Down Expand Up @@ -371,28 +371,37 @@ def test_executeJob_WholeNodeJobs(createAndDelete):


@pytest.mark.parametrize(
"processorsPerTask, kwargs, expected",
"processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory",
[
(None, {}, 1),
(None, {"mpTag": False}, 1),
(None, {"mpTag": True}, 1),
(None, {"mpTag": True, "wholeNode": True}, 16),
(None, {"mpTag": True, "wholeNode": False}, 1),
(None, {"mpTag": True, "numberOfProcessors": 4}, 4),
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8),
(None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16),
({1: 4}, {"mpTag": True, "wholeNode": True}, 0),
({1: 4}, {"mpTag": True, "wholeNode": False}, 1),
({1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2),
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2}, 2),
({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16}, 12),
(None, None, {}, 1, 0),
(None, None, {"mpTag": False}, 1, 0),
(None, None, {"mpTag": True, "MaxRAM": 8}, 1, 8),
(None, None, {"mpTag": True, "wholeNode": True}, 16, 0),
(None, None, {"mpTag": True, "wholeNode": False}, 1, 0),
(None, None, {"mpTag": True, "numberOfProcessors": 4, "MaxRAM": 4}, 4, 4),
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8, 0),
(None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16, 0),
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": True}, 0, 0),
({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": False}, 1, 0),
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2, "MaxRAM": 8}, 2, 8),
({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 16, "MaxRAM": 12}, 0, 12),
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2, "MaxRAM": 16}, 2, 16),
({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16, "MaxRAM": 32}, 12, None),
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2, 0),
({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 4, 0),
({1: 4, 2: 8, 3: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 0, 0),
],
)
def test__getProcessorsForJobs(processorsPerTask, kwargs, expected):
def test__getLimitsForJobs(processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory):
ce = PoolComputingElement("TestPoolCE")
ce.processors = 16
ce.ram = 32

if processorsPerTask:
ce.processorsPerTask = processorsPerTask
if ramPerTask:
ce.ramPerTask = ramPerTask
res = ce._getProcessorsForJobs(kwargs)
assert res == expected
assert res == expected_processors
res = ce._getMemoryForJobs(kwargs)
assert res == expected_memory
Loading
Loading