diff --git a/docs/source/AdministratorGuide/Resources/computingelements.rst b/docs/source/AdministratorGuide/Resources/computingelements.rst index 76d2874c901..f33497dc7d2 100644 --- a/docs/source/AdministratorGuide/Resources/computingelements.rst +++ b/docs/source/AdministratorGuide/Resources/computingelements.rst @@ -57,25 +57,22 @@ of the *ComputingElement* is located inside the corresponding site section in th # Site administrative domain LCG { - # Site section + # Site section. This is the DIRAC's site name. LCG.CNAF.it { - # Site name + # Alternative site name (e.g. site name in GOC DB) Name = CNAF - # List of valid CEs on the site - CE = ce01.infn.it, ce02.infn.it - # Section describing each CE CEs { - # Specific CE description section + # Specific CE description section. This site name is unique. ce01.infn.it { - # Type of the CE + # Type of the CE. "HTCondorCE" and "AREX" and "SSH" are the most common types. CEType = HTCondorCE - # Section to describe various queue in the CE + # Section to describe various (logical) queues in the CE. Queues { long @@ -93,7 +90,6 @@ of the *ComputingElement* is located inside the corresponding site section in th This is the general structure in which specific CE descriptions are inserted. The CE configuration is part of the general DIRAC configuration -It can be placed in the general Configuration Service or in the local configuration of the DIRAC installation. Examples of the configuration can be found in the :ref:`full_configuration_example`, in the *Resources/Computing* section. You can find the options of a specific CE in the code documentation: :mod:`DIRAC.Resources.Computing`. @@ -114,7 +110,7 @@ configuration. Interacting with Grid Sites @@@@@@@@@@@@@@@@@@@@@@@@@@@ -The :mod:`~DIRAC.Resources.Computing.HTCondorCEComputingElement` and the :mod:`~DIRAC.Resources.Computing.ARCComputingElement` eases +The :mod:`~DIRAC.Resources.Computing.HTCondorCEComputingElement` and the :mod:`~DIRAC.Resources.Computing.AREXComputingElement` eases the interactions with grid sites, by managing pilots using the underlying batch systems. Instances of such CEs are generally setup by the site administrators. @@ -132,11 +128,6 @@ The :mod:`~DIRAC.Resources.Computing.CloudComputingElement` allows submission to (via the standard SiteDirector agent). The instances are contextualised using cloud-init. -Delegating to BOINC (Volunteering Computing) -@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ -There exists a :mod:`~DIRAC.Resources.Computing.BOINCComputingElement` to submit pilots to a BOINC server. - - Computing Elements within allocated computing resources @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ The :mod:`~DIRAC.Resources.Computing.InProcessComputingElement` is usually invoked by a Pilot-Job (JobAgent agent) to execute user diff --git a/docs/source/UserGuide/Tutorials/JobManagementAdvanced/index.rst b/docs/source/UserGuide/Tutorials/JobManagementAdvanced/index.rst index 0b70d79c050..a939114b72a 100644 --- a/docs/source/UserGuide/Tutorials/JobManagementAdvanced/index.rst +++ b/docs/source/UserGuide/Tutorials/JobManagementAdvanced/index.rst @@ -345,8 +345,6 @@ using the "setNumberOfProcessors" method of the API:: Calling ``Job().setNumberOfProcessors()``, with a value bigger than 1, will translate into adding also the "MultiProcessor" tag to the job description. -.. versionadded:: v6r20p5 - Users can specify in the job descriptions NumberOfProcessors and WholeNode parameters, e.g.:: NumberOfProcessors = 16; diff --git a/src/DIRAC/Interfaces/API/Job.py b/src/DIRAC/Interfaces/API/Job.py index 870c49b62e6..17d8af0550e 100755 --- a/src/DIRAC/Interfaces/API/Job.py +++ b/src/DIRAC/Interfaces/API/Job.py @@ -522,6 +522,19 @@ def setDestination(self, destination): return S_OK() ############################################################################# + def setRAMRequirements(self, ramRequired: int = 0): + """Helper function. + Specify the RAM requirements for the job in GB. 0 (default) means no specific requirements. + """ + if ramRequired: + self._addParameter( + self.workflow, + "MaxRAM", + "JDL", + ramRequired, + "GBs of RAM requested", + ) + def setNumberOfProcessors(self, numberOfProcessors=None, minNumberOfProcessors=None, maxNumberOfProcessors=None): """Helper function. @@ -740,7 +753,7 @@ def setTag(self, tags): Example usage: >>> job = Job() - >>> job.setTag( ['WholeNode','8GBMemory'] ) + >>> job.setTag( ['WholeNode','8GB'] ) :param tags: single tag string or a list of tags :type tags: str or python:list diff --git a/src/DIRAC/Resources/Computing/ComputingElement.py b/src/DIRAC/Resources/Computing/ComputingElement.py index 4b73626eba9..a11e3466830 100755 --- a/src/DIRAC/Resources/Computing/ComputingElement.py +++ b/src/DIRAC/Resources/Computing/ComputingElement.py @@ -57,6 +57,7 @@ from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import ( getNumberOfProcessors, getNumberOfGPUs, + getAvailableRAM, ) INTEGER_PARAMETERS = ["CPUTime", "NumberOfProcessors", "NumberOfPayloadProcessors", "MaxRAM"] @@ -235,12 +236,14 @@ def setParameters(self, ceOptions): generalCEDict.update(self.ceParameters) self.ceParameters = generalCEDict - # If NumberOfProcessors/GPUs is present in the description but is equal to zero + # If NumberOfProcessors/GPUs/RAM is present in the description but is equal to zero # interpret it as needing local evaluation if self.ceParameters.get("NumberOfProcessors", -1) == 0: self.ceParameters["NumberOfProcessors"] = getNumberOfProcessors() if self.ceParameters.get("NumberOfGPUs", -1) == 0: self.ceParameters["NumberOfGPUs"] = getNumberOfGPUs() + if self.ceParameters.get("RAM", -1) == 0: + self.ceParameters["RAM"] = getAvailableRAM() for key in ceOptions: if key in INTEGER_PARAMETERS: diff --git a/src/DIRAC/Resources/Computing/PoolComputingElement.py b/src/DIRAC/Resources/Computing/PoolComputingElement.py index 231136d995b..b3f4fb26c39 100644 --- a/src/DIRAC/Resources/Computing/PoolComputingElement.py +++ b/src/DIRAC/Resources/Computing/PoolComputingElement.py @@ -10,7 +10,7 @@ LocalCEType = Pool The Pool Computing Element is specific: it embeds an additional "inner" CE - (`InProcess` by default, `Sudo`, `Singularity`). The "inner" CE can be specified such as:: + (`InProcess` by default, or `Singularity`). The "inner" CE can be specified such as:: LocalCEType = Pool/Singularity @@ -19,24 +19,18 @@ **Code Documentation** """ -import functools -import os import concurrent.futures +import functools -from DIRAC import S_OK, S_ERROR +from DIRAC import S_ERROR, S_OK from DIRAC.ConfigurationSystem.private.ConfigurationData import ConfigurationData - from DIRAC.Resources.Computing.ComputingElement import ComputingElement - from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement from DIRAC.Resources.Computing.SingularityComputingElement import SingularityComputingElement -# Number of unix users to run job payloads with sudo -MAX_NUMBER_OF_SUDO_UNIX_USERS = 32 - def executeJob(executableFile, proxy, taskID, inputs, **kwargs): - """wrapper around ce.submitJob: decides which CE to use (Sudo or InProcess or Singularity) + """wrapper around ce.submitJob: decides which CE to use (InProcess or Singularity) :param str executableFile: location of the executable file :param str proxy: proxy file location to be used for job submission @@ -67,6 +61,8 @@ def __init__(self, ceUniqueID): self.taskID = 0 self.processorsPerTask = {} self.userNumberPerTask = {} + self.ram = 1024 # Default RAM in GB (this is an arbitrary large value in case of no limit) + self.ramPerTask = {} # This CE will effectively submit to another "Inner"CE # (by default to the InProcess CE) @@ -80,22 +76,16 @@ def _reset(self): self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors)) self.ceParameters["MaxTotalJobs"] = self.processors + max_ram = int(self.ceParameters.get("MaxRAM", 0)) + if max_ram > 0: + self.ram = max_ram // 1024 # Convert from MB to GB + self.ceParameters["MaxRAM"] = self.ram # Indicates that the submission is done asynchronously # The result is not immediately available self.ceParameters["AsyncSubmission"] = True self.innerCESubmissionType = self.ceParameters.get("InnerCESubmissionType", self.innerCESubmissionType) return S_OK() - def getProcessorsInUse(self): - """Get the number of currently allocated processor cores - - :return: number of processors in use - """ - processorsInUse = 0 - for future in self.processorsPerTask: - processorsInUse += self.processorsPerTask[future] - return processorsInUse - ############################################################################# def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): """Method to submit job. @@ -118,15 +108,22 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): self.taskID += 1 return S_OK(taskID) - # Now persisting the job limits for later use in pilot.cfg file (pilot 3 default) + memoryForJob = self._getMemoryForJobs(kwargs) + if memoryForJob is None: + self.taskResults[self.taskID] = S_ERROR("Not enough memory for the job") + taskID = self.taskID + self.taskID += 1 + return S_OK(taskID) + + # Now persisting the job limits for later use in pilot.cfg file cd = ConfigurationData(loadDefaultCFG=False) res = cd.loadFile("pilot.cfg") if not res["OK"]: self.log.error("Could not load pilot.cfg", res["Message"]) else: - # only NumberOfProcessors for now, but RAM (or other stuff) can also be added jobID = int(kwargs.get("jobDesc", {}).get("jobID", 0)) cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/NumberOfProcessors" % jobID, processorsForJob) + cd.setOptionInCFG("/Resources/Computing/JobLimits/%d/MaxRAM" % jobID, memoryForJob) res = cd.dumpLocalCFGToFile("pilot.cfg") if not res["OK"]: self.log.error("Could not dump cfg to pilot.cfg", res["Message"]) @@ -134,17 +131,11 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): # Here we define task kwargs: adding complex objects like thread.Lock can trigger errors in the task taskKwargs = {"InnerCESubmissionType": self.innerCESubmissionType} taskKwargs["jobDesc"] = kwargs.get("jobDesc", {}) - if self.innerCESubmissionType == "Sudo": - for nUser in range(MAX_NUMBER_OF_SUDO_UNIX_USERS): - if nUser not in self.userNumberPerTask.values(): - break - taskKwargs["NUser"] = nUser - if "USER" in os.environ: - taskKwargs["PayloadUser"] = os.environ["USER"] + f"p{str(nUser).zfill(2)}" # Submission future = self.pPool.submit(executeJob, executableFile, proxy, self.taskID, inputs, **taskKwargs) self.processorsPerTask[future] = processorsForJob + self.ramPerTask[future] = memoryForJob future.add_done_callback(functools.partial(self.finalizeJob, self.taskID)) taskID = self.taskID @@ -154,7 +145,7 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs): def _getProcessorsForJobs(self, kwargs): """helper function""" - processorsInUse = self.getProcessorsInUse() + processorsInUse = sum(self.processorsPerTask.values()) availableProcessors = self.processors - processorsInUse self.log.verbose( @@ -191,6 +182,24 @@ def _getProcessorsForJobs(self, kwargs): return requestedProcessors + def _getMemoryForJobs(self, kwargs): + """helper function to get the memory that will be allocated for the job + + :param kwargs: job parameters + :return: memory in GB or None if not enough memory + """ + + # # job requirements + requestedMemory = kwargs.get("MaxRAM", 0) + + # # now check what the slot can provide + # Do we have enough memory? + availableMemory = self.ram - sum(self.ramPerTask.values()) + if availableMemory < requestedMemory: + return None + + return requestedMemory + def finalizeJob(self, taskID, future): """Finalize the job by updating the process utilisation counters @@ -222,7 +231,7 @@ def getCEStatus(self): result["WaitingJobs"] = 0 # dealing with processors - processorsInUse = self.getProcessorsInUse() + processorsInUse = sum(self.processorsPerTask.values()) result["UsedProcessors"] = processorsInUse result["AvailableProcessors"] = self.processors - processorsInUse return result diff --git a/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py index 57aab6b4a86..ef40888a02e 100644 --- a/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_PoolComputingElement.py @@ -83,7 +83,7 @@ def createAndDelete(): def test_submit_and_shutdown(createAndDelete): time.sleep(0.5) - ceParameters = {"WholeNode": True, "NumberOfProcessors": 4} + ceParameters = {"WholeNode": True, "NumberOfProcessors": 4, "MaxRAM": 4} ce = PoolComputingElement("TestPoolCE") ce.setParameters(ceParameters) @@ -371,28 +371,37 @@ def test_executeJob_WholeNodeJobs(createAndDelete): @pytest.mark.parametrize( - "processorsPerTask, kwargs, expected", + "processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory", [ - (None, {}, 1), - (None, {"mpTag": False}, 1), - (None, {"mpTag": True}, 1), - (None, {"mpTag": True, "wholeNode": True}, 16), - (None, {"mpTag": True, "wholeNode": False}, 1), - (None, {"mpTag": True, "numberOfProcessors": 4}, 4), - (None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8), - (None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16), - ({1: 4}, {"mpTag": True, "wholeNode": True}, 0), - ({1: 4}, {"mpTag": True, "wholeNode": False}, 1), - ({1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2), - ({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2}, 2), - ({1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16}, 12), + (None, None, {}, 1, 0), + (None, None, {"mpTag": False}, 1, 0), + (None, None, {"mpTag": True, "MaxRAM": 8}, 1, 8), + (None, None, {"mpTag": True, "wholeNode": True}, 16, 0), + (None, None, {"mpTag": True, "wholeNode": False}, 1, 0), + (None, None, {"mpTag": True, "numberOfProcessors": 4, "MaxRAM": 4}, 4, 4), + (None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 8}, 8, 0), + (None, None, {"mpTag": True, "numberOfProcessors": 4, "maxNumberOfProcessors": 32}, 16, 0), + ({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": True}, 0, 0), + ({1: 4}, {1: 4}, {"mpTag": True, "wholeNode": False}, 1, 0), + ({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2, "MaxRAM": 8}, 2, 8), + ({1: 4}, {1: 4}, {"mpTag": True, "numberOfProcessors": 16, "MaxRAM": 12}, 0, 12), + ({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 2, "MaxRAM": 16}, 2, 16), + ({1: 4}, {1: 4}, {"mpTag": True, "maxNumberOfProcessors": 16, "MaxRAM": 32}, 12, None), + ({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 2}, 2, 0), + ({1: 4, 2: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 4, 0), + ({1: 4, 2: 8, 3: 8}, {1: 4}, {"mpTag": True, "numberOfProcessors": 4}, 0, 0), ], ) -def test__getProcessorsForJobs(processorsPerTask, kwargs, expected): +def test__getLimitsForJobs(processorsPerTask, ramPerTask, kwargs, expected_processors, expected_memory): ce = PoolComputingElement("TestPoolCE") ce.processors = 16 + ce.ram = 32 if processorsPerTask: ce.processorsPerTask = processorsPerTask + if ramPerTask: + ce.ramPerTask = ramPerTask res = ce._getProcessorsForJobs(kwargs) - assert res == expected + assert res == expected_processors + res = ce._getMemoryForJobs(kwargs) + assert res == expected_memory diff --git a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py index 774456f8ba9..f5dbd2626e4 100644 --- a/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py +++ b/src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py @@ -1,10 +1,10 @@ """ DIRAC Workload Management System utility module to get available memory and processors from mjf """ -import os import multiprocessing +import os from urllib.request import urlopen -from DIRAC import gLogger, gConfig +from DIRAC import gConfig, gLogger from DIRAC.Core.Utilities.List import fromChar @@ -34,31 +34,7 @@ def getJobFeatures(): return features -def getProcessorFromMJF(): - jobFeatures = getJobFeatures() - if jobFeatures: - try: - return int(jobFeatures["allocated_cpu"]) - except KeyError: - gLogger.error( - "MJF is available but allocated_cpu is not an integer", repr(jobFeatures.get("allocated_cpu")) - ) - return None - - -def getMemoryFromMJF(): - jobFeatures = getJobFeatures() - if jobFeatures: - try: - return int(jobFeatures["max_rss_bytes"]) - except KeyError: - gLogger.error( - "MJF is available but max_rss_bytes is not an integer", repr(jobFeatures.get("max_rss_bytes")) - ) - return None - - -def getMemoryFromProc(): +def _getMemoryFromProc(): meminfo = {i.split()[0].rstrip(":"): int(i.split()[1]) for i in open("/proc/meminfo").readlines()} maxRAM = meminfo["MemTotal"] if maxRAM: @@ -72,7 +48,6 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): Tries to find it in this order: 1) from the /Resources/Computing/CEDefaults/NumberOfProcessors (which is what the pilot fills up) - 2) if not present from JobFeatures 3) if not present looks in CS for "NumberOfProcessors" Queue or CE option 4) if not present but there's WholeNode tag, look what the WN provides using multiprocessing.cpu_count() 5) return 1 @@ -84,14 +59,7 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): if numberOfProcessors: return numberOfProcessors - # 2) from MJF - gLogger.info("Getting numberOfProcessors from MJF") - numberOfProcessors = getProcessorFromMJF() - if numberOfProcessors: - return numberOfProcessors - gLogger.info("NumberOfProcessors could not be found in MJF") - - # 3) looks in CS for "NumberOfProcessors" Queue or CE or site option + # 2) looks in CS for "NumberOfProcessors" Queue or CE or site option if not siteName: siteName = gConfig.getValue("/LocalSite/Site", "") if not gridCE: @@ -116,8 +84,8 @@ def getNumberOfProcessors(siteName=None, gridCE=None, queue=None): if numberOfProcessors: return numberOfProcessors - # 4) looks in CS for tags - gLogger.info(f"Getting tagsfor {siteName}: {gridCE}: {queue}") + # 3) looks in CS for tags + gLogger.info(f"Getting tags for {siteName}: {gridCE}: {queue}") # Tags of the CE tags = fromChar( gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Tag", "") @@ -237,3 +205,65 @@ def getNumberOfGPUs(siteName=None, gridCE=None, queue=None): # 3) return 0 gLogger.info("NumberOfGPUs could not be found in CS") return 0 + + +def getAvailableRAM(siteName=None, gridCE=None, queue=None): + """Gets the available RAM on a certain CE/queue/node (what the pilot administers) + + The siteName/gridCE/queue parameters are normally not necessary. + + Tries to find it in this order: + 1) from the /Resources/Computing/CEDefaults/AvailableRAM (which is what the pilot might fill up) + 2) if not present looks in CS for "AvailableRAM" Queue or CE option + 3) if not present but there's WholeNode tag, look what the WN provides using _getMemoryFromProc() + 4) return 0 + """ + + # 1) from /Resources/Computing/CEDefaults/MaxRAM + gLogger.info("Getting MaxRAM from /Resources/Computing/CEDefaults/MaxRAM") + availableRAM = gConfig.getValue("/Resources/Computing/CEDefaults/MaxRAM", None) + if availableRAM: + return availableRAM + + # 2) looks in CS for "MaxRAM" Queue or CE or site option + if not siteName: + siteName = gConfig.getValue("/LocalSite/Site", "") + if not gridCE: + gridCE = gConfig.getValue("/LocalSite/GridCE", "") + if not queue: + queue = gConfig.getValue("/LocalSite/CEQueue", "") + if not (siteName and gridCE and queue): + gLogger.error("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0") + return 0 + + grid = siteName.split(".")[0] + csPaths = [ + f"/Resources/Sites/{grid}/{siteName}/CEs/{gridCE}/Queues/{queue}/MaxRAM", + f"/Resources/Sites/{grid}/{siteName}/CEs/{gridCE}/MaxRAM", + f"/Resources/Sites/{grid}/{siteName}/MaxRAM", + ] + for csPath in csPaths: + gLogger.info("Looking in", csPath) + availableRAM = gConfig.getValue(csPath, None) + if availableRAM: + return availableRAM + + # 3) checks if 'WholeNode' is one of the used tags + # Tags of the CE + tags = fromChar( + gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Tag", "") + ) + fromChar(gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/Tag", "")) + # Tags of the Queue + tags += fromChar( + gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues/{queue}/Tag", "") + ) + fromChar( + gConfig.getValue(f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/Cloud/{gridCE}/VMTypes/{queue}/Tag", "") + ) + + if "WholeNode" in tags: + gLogger.info("Found WholeNode tag, using _getMemoryFromProc()") + return _getMemoryFromProc() + + # 4) return 0 + gLogger.info("AvailableRAM could not be found in CS, and WholeNode tag not found") + return 0 diff --git a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py index 07a1042f88a..5b3b2497b6c 100755 --- a/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py +++ b/src/DIRAC/WorkloadManagementSystem/scripts/dirac_wms_get_wn_parameters.py @@ -39,15 +39,12 @@ def main(): gLogger.info("Getting number of processors") numberOfProcessor = JobParameters.getNumberOfProcessors(Site, ceName, Queue) - gLogger.info("Getting memory (RAM) from MJF") - maxRAM = JobParameters.getMemoryFromMJF() - if not maxRAM: - gLogger.info("maxRAM could not be found in MJF, using JobParameters.getMemoryFromProc()") - maxRAM = JobParameters.getMemoryFromProc() - gLogger.info("Getting number of GPUs") numberOfGPUs = JobParameters.getNumberOfGPUs(Site, ceName, Queue) + gLogger.info("Getting maximum RAM") + maxRAM = JobParameters.getAvailableRAM(Site, ceName, Queue) + # just communicating it back gLogger.notice(" ".join(str(wnPar) for wnPar in [numberOfProcessor, maxRAM, numberOfGPUs])) diff --git a/src/DIRAC/tests/Utilities/testJobDefinitions.py b/src/DIRAC/tests/Utilities/testJobDefinitions.py index a4df06c23a6..e31be9fbd04 100644 --- a/src/DIRAC/tests/Utilities/testJobDefinitions.py +++ b/src/DIRAC/tests/Utilities/testJobDefinitions.py @@ -232,9 +232,9 @@ def mpJob(): def mp3Job(): - """simple hello world job, with 2 to 4 processors""" + """simple hello world job, with 3 processors""" - J = baseToAllJobs("min2max4Job") + J = baseToAllJobs("min3Job") try: J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]]) except IndexError: @@ -282,6 +282,42 @@ def wholeNodeJob(): return endOfAllJobs(J) +def memory_4GB(): + """simple hello world job, with a memory requirement of 4 GB and MultiProcessor tags""" + + J = baseToAllJobs("memory_4GB") + try: + J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]]) + except IndexError: + try: + J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]]) + except IndexError: # we are in Jenkins + J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]]) + + J.setExecutable("mpTest.py") + J.setNumberOfProcessors(numberOfProcessors=2) + J.setTag("4GB") + return endOfAllJobs(J) + + +def memory_2_to4GB(): + """simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags""" + + J = baseToAllJobs("memory_2_to_4GB") + try: + J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]]) + except IndexError: + try: + J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]]) + except IndexError: # we are in Jenkins + J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]]) + + J.setExecutable("mpTest.py") + J.setNumberOfProcessors(numberOfProcessors=2) + J.setTag(["2GB", "4GB_MAX"]) + return endOfAllJobs(J) + + def parametricJob(): """Creates a parametric job with 3 subjobs which are simple hello world jobs""" diff --git a/tests/System/unitTestUserJobs.py b/tests/System/unitTestUserJobs.py index e529e441c95..f67708559be 100644 --- a/tests/System/unitTestUserJobs.py +++ b/tests/System/unitTestUserJobs.py @@ -106,6 +106,14 @@ def test_submit(self): self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"]) + res = memory_4GB() + self.assertTrue(res["OK"]) + jobsSubmittedList.append(res["Value"]) + + res = memory_2_to4GB() + self.assertTrue(res["OK"]) + jobsSubmittedList.append(res["Value"]) + res = parametricJob() self.assertTrue(res["OK"]) jobsSubmittedList.append(res["Value"])