Skip to content

Commit dc2ad34

Browse files
authored
Merge pull request #8406 from fstagni/90_ram_patches
[9.0] patches for RAM calculations
2 parents bdcf074 + 201ae88 commit dc2ad34

File tree

7 files changed

+148
-63
lines changed

7 files changed

+148
-63
lines changed

.github/workflows/semantic.yml

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# type: docs, feat, fix, refactor, style or test
55
# scope (optional): any extra info, (like DMS or whatever)
66

7-
name: 'Commit Message Check'
7+
name: "Commit Message Check"
88
on: pull_request
99

1010
jobs:
@@ -15,19 +15,19 @@ jobs:
1515
- name: Check Commit Format
1616
uses: gsactions/commit-message-checker@v2
1717
with:
18-
pattern: '^((docs|feat|fix|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$'
19-
excludeDescription: 'true' # optional: this excludes the description body of a pull request
20-
excludeTitle: 'true' # optional: this excludes the title of a pull request
21-
checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request
18+
pattern: '^((docs|feat|fix|chore|refactor|style|test|sweep)( ?\(.*\))?: .+|Revert ".+")$'
19+
excludeDescription: "true" # optional: this excludes the description body of a pull request
20+
excludeTitle: "true" # optional: this excludes the title of a pull request
21+
checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request
2222
accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true
23-
flags: 'gim'
23+
flags: "gim"
2424
error: 'Your commit has to follow the format "<type>(<scope>): <subject>"".'
2525
- name: Check Commit Length
2626
uses: gsactions/commit-message-checker@v2
2727
with:
28-
pattern: '^.{20,150}$'
29-
error: 'Commit messages should be between 20 and 150 chars'
30-
excludeDescription: 'true' # optional: this excludes the description body of a pull request
31-
excludeTitle: 'true' # optional: this excludes the title of a pull request
32-
checkAllCommitMessages: 'true' # optional: this checks all commits associated with a pull request
28+
pattern: "^.{20,150}$"
29+
error: "Commit messages should be between 20 and 150 chars"
30+
excludeDescription: "true" # optional: this excludes the description body of a pull request
31+
excludeTitle: "true" # optional: this excludes the title of a pull request
32+
checkAllCommitMessages: "true" # optional: this checks all commits associated with a pull request
3333
accessToken: ${{ secrets.GITHUB_TOKEN }} # github access token is only required if checkAllCommitMessages is true

AGENTS.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# DIRAC Agent Guidelines
2+
3+
## Build/Lint/Test Commands
4+
- **Build**: `pip install -e .`
5+
- **Lint**: `ruff check src/ && pylint src/`
6+
- **Test**: `pytest tests/`
7+
- **Single test**: `pytest src/DIRAC/path/to/test.py::test_function`
8+
9+
## Code Style Guidelines
10+
- **Formatting**: Use `black` with line length 120 (configured in pyproject.toml)
11+
- **Imports**: Absolute imports only; sort with `isort` (black profile)
12+
- **Naming**: CamelCase for classes, snake_case for functions/variables
13+
- **Types**: Use type hints; run `mypy` for strict checking
14+
- **Error handling**: Return `S_OK(result)` or `S_ERROR(message)` from DIRAC.Core.Utilities.ReturnValues
15+
- **Logging**: Use `gLogger.info/warn/error` (from DIRAC import gLogger)
16+
- **Docstrings**: Follow Google/NumPy style where present
17+
- **Security**: Never log secrets; validate inputs

src/DIRAC/Resources/Computing/InProcessComputingElement.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
""" The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)
1+
"""The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)
22
3-
A "InProcess" CE instance submits jobs in the current process.
4-
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
3+
A "InProcess" CE instance submits jobs in the current process.
4+
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
55
"""
6+
67
import os
78
import stat
89

9-
from DIRAC import S_OK, S_ERROR
10-
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
10+
from DIRAC import S_ERROR, S_OK
1111
from DIRAC.Core.Utilities.CGroups2 import CG2Manager
12-
12+
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
1313
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
1414

1515

@@ -21,8 +21,8 @@ def __init__(self, ceUniqueID):
2121
self.submittedJobs = 0
2222
self.runningJobs = 0
2323

24-
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
25-
self.maxRAM = int(self.ceParameters.get("MaxRAM", 0))
24+
self.processors = 1
25+
self.maxRAM = 0
2626
self.ceParameters["MaxTotalJobs"] = 1
2727

2828
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
@@ -34,6 +34,16 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
3434
:param list inputs: dependencies of executableFile
3535
:return: S_OK(payload exit code) / S_ERROR() if submission issue
3636
"""
37+
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
38+
self.maxRAM = int(self.ceParameters.get("MaxRAM", self.maxRAM))
39+
40+
if "numberOfProcessors" in kwargs:
41+
if self.processors < int(kwargs["numberOfProcessors"]):
42+
return S_ERROR("Requesting processors not available")
43+
if "MaxRAM" in kwargs:
44+
if self.maxRAM < int(kwargs["MaxRAM"]):
45+
return S_ERROR("Requesting RAM not available")
46+
3747
payloadEnv = dict(os.environ)
3848
payloadProxy = ""
3949
renewTask = None

src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,41 @@
88

99
import pytest
1010

11-
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import jobScript, _stopJob
12-
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
13-
1411
# sut
1512
from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
13+
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import _stopJob, jobScript
14+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
1615

1716

1817
@pytest.mark.slow
19-
def test_submitJob():
18+
@pytest.mark.parametrize(
19+
"ce_parameters, available_processors, ram",
20+
[
21+
({}, 1, 0),
22+
({"NumberOfProcessors": 8}, 8, 0),
23+
({"MaxRAM": 2048}, 1, 2048),
24+
({"NumberOfProcessors": 8, "MaxRAM": 2048}, 8, 2048),
25+
],
26+
)
27+
def test_submitJob(ce_parameters, available_processors, ram):
28+
# initialization
29+
ce = InProcessComputingElement("InProcessCE")
30+
ce.ceParameters = ce_parameters
31+
32+
# simple
2033
with open("testJob.py", "w") as execFile:
2134
execFile.write(jobScript % "1")
2235
os.chmod("testJob.py", 0o755)
2336

24-
ce = InProcessComputingElement("InProcessCE")
2537
res = ce.submitJob("testJob.py", None)
2638
assert res["OK"] is True
2739
res = ce.getCEStatus()
2840
assert res["OK"] is True
2941
assert res["SubmittedJobs"] == 1
42+
assert res["RunningJobs"] == 0
43+
assert res["WaitingJobs"] == 0
44+
assert res["AvailableProcessors"] == available_processors
45+
assert res["AvailableRAM"] == ram
3046
_stopJob(1)
3147
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
3248
if os.path.isfile(ff):
@@ -50,23 +66,66 @@ def test_submitJob():
5066
res = ce.submitJob(
5167
wrapperFile,
5268
proxy=None,
53-
numberOfProcessors=4,
54-
maxNumberOfProcessors=8,
69+
numberOfProcessors=available_processors,
70+
maxNumberOfProcessors=available_processors,
5571
wholeNode=False,
5672
mpTag=True,
57-
MinRAM=2500,
58-
MaxRAM=4000,
73+
MinRAM=ram,
74+
MaxRAM=ram,
5975
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
6076
)
6177
assert res["OK"] is True
78+
_stopJob(2)
6279

6380
res = ce.getCEStatus()
6481
assert res["OK"] is True
6582
assert res["SubmittedJobs"] == 2
83+
assert res["RunningJobs"] == 0
84+
assert res["WaitingJobs"] == 0
85+
assert res["AvailableProcessors"] == available_processors
86+
assert res["AvailableRAM"] == ram
6687

67-
_stopJob(2)
6888
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
6989
if os.path.isfile(ff):
7090
os.remove(ff)
7191
if os.path.isdir("job"):
7292
shutil.rmtree("job")
93+
94+
# failing
95+
with open("testJob.py", "w") as execFile:
96+
execFile.write(jobScript % "3")
97+
os.chmod("testJob.py", 0o755)
98+
99+
jobParams = {"JobType": "User", "Executable": "testJob.py"}
100+
resourceParams = {"GridCE": "some_CE"}
101+
optimizerParams = {}
102+
103+
wrapperFile = createJobWrapper(
104+
jobID=3, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
105+
)["Value"][
106+
"JobExecutablePath"
107+
] # This is not under test, assuming it works fine
108+
109+
res = ce.submitJob(
110+
wrapperFile,
111+
proxy=None,
112+
numberOfProcessors=4 + available_processors,
113+
maxNumberOfProcessors=8 + available_processors,
114+
wholeNode=False,
115+
mpTag=True,
116+
MinRAM=2500,
117+
MaxRAM=4000,
118+
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
119+
)
120+
assert res["OK"] is False
121+
res = ce.getCEStatus()
122+
assert res["OK"] is True
123+
assert res["SubmittedJobs"] == 2
124+
assert res["RunningJobs"] == 0
125+
assert res["WaitingJobs"] == 0
126+
assert res["AvailableProcessors"] == available_processors
127+
assert res["AvailableRAM"] == ram
128+
_stopJob(1)
129+
for ff in ["testJob.py", "stop_job_3", "job.info", "std.out"]:
130+
if os.path.isfile(ff):
131+
os.remove(ff)

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ def getAvailableRAM(siteName=None, gridCE=None, queue=None):
229229
if not queue:
230230
queue = gConfig.getValue("/LocalSite/CEQueue", "")
231231
if not (siteName and gridCE and queue):
232-
gLogger.error("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0")
232+
gLogger.warn("Could not find AvailableRAM: missing siteName or gridCE or queue. Returning 0")
233233
return 0
234234

235235
grid = siteName.split(".")[0]

src/DIRAC/tests/Utilities/testJobDefinitions.py

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -239,22 +239,6 @@ def helloWorldSSHBatch():
239239
return endOfAllJobs(J)
240240

241241

242-
def helloWorldARM():
243-
"""simple hello world job to DIRAC.ARM.ch"""
244-
245-
J = baseToAllJobs("helloWorldARM")
246-
try:
247-
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
248-
except IndexError:
249-
try:
250-
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
251-
except IndexError: # we are in Jenkins
252-
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])
253-
J.setExecutable("exe-script.py", "", "helloWorld.log")
254-
J.setDestination("DIRAC.ARM.ch")
255-
return endOfAllJobs(J)
256-
257-
258242
def helloWorldCloudCE():
259243
"""simple hello world job to Cloud at Imperial College using SiteDirector"""
260244

@@ -340,39 +324,54 @@ def wholeNodeJob():
340324

341325

342326
def memory_4GB():
343-
"""simple hello world job, with a memory requirement of 4 GB and MultiProcessor tags"""
327+
"""simple hello world job, with a memory requirement of max 4 GB"""
344328

345329
J = baseToAllJobs("memory_4GB")
346330
try:
347-
J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]])
331+
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
348332
except IndexError:
349333
try:
350-
J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]])
334+
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
351335
except IndexError: # we are in Jenkins
352-
J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]])
353-
354-
J.setExecutable("mpTest.py")
355-
J.setNumberOfProcessors(numberOfProcessors=2)
356-
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
336+
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])
357337

338+
J.setExecutable("exe-script.py", "", "helloWorld.log")
339+
J.setRAMRequirements(maxRAM=4000)
358340
return endOfAllJobs(J)
359341

360342

361343
def memory_2_to4GB():
362-
"""simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags"""
344+
"""simple hello world job, with a memory requirement of 2 to 4 GB"""
363345

364346
J = baseToAllJobs("memory_2_to_4GB")
347+
try:
348+
J.setInputSandbox([find_all("exe-script.py", rootPath, "DIRAC/tests/Workflow")[0]])
349+
except IndexError:
350+
try:
351+
J.setInputSandbox([find_all("exe-script.py", ".", "DIRAC/tests/Workflow")[0]])
352+
except IndexError: # we are in Jenkins
353+
J.setInputSandbox([find_all("exe-script.py", "/home/dirac", "DIRAC/tests/Workflow")[0]])
354+
355+
J.setExecutable("exe-script.py")
356+
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
357+
return endOfAllJobs(J)
358+
359+
360+
def memory_2_to4GB_MP():
361+
"""simple hello world job, with a memory requirement of 2 to 4 GB and MultiProcessor tags"""
362+
363+
J = baseToAllJobs("memory_2_to_4GB_MP")
365364
try:
366365
J.setInputSandbox([find_all("mpTest.py", rootPath, "DIRAC/tests/Utilities")[0]])
367366
except IndexError:
368367
try:
369368
J.setInputSandbox([find_all("mpTest.py", ".", "DIRAC/tests/Utilities")[0]])
370369
except IndexError: # we are in Jenkins
371-
J.setInputSandbox([find_all("mpTest.py", os.environ["WORKSPACE"], "DIRAC/tests/Utilities")[0]])
370+
J.setInputSandbox([find_all("mpTest.py", "/home/dirac", "DIRAC/tests/Utilities")[0]])
372371

373372
J.setExecutable("mpTest.py")
374373
J.setNumberOfProcessors(numberOfProcessors=2)
375-
J.setRAMRequirements(ramRequired=4000, maxRAM=4000)
374+
J.setRAMRequirements(ramRequired=2500, maxRAM=4000)
376375
return endOfAllJobs(J)
377376

378377

tests/System/unitTestUserJobs.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
""" Collection of user jobs for testing purposes
2-
"""
1+
"""Collection of user jobs for testing purposes"""
2+
33
# pylint: disable=wrong-import-position, invalid-name
44
import sys
55
import time
@@ -102,10 +102,6 @@ def test_submit(self):
102102
self.assertTrue(res["OK"])
103103
jobsSubmittedList.append(res["Value"])
104104

105-
res = helloWorldARM()
106-
self.assertTrue(res["OK"])
107-
jobsSubmittedList.append(res["Value"])
108-
109105
res = mpJob()
110106
self.assertTrue(res["OK"])
111107
jobsSubmittedList.append(res["Value"])
@@ -130,6 +126,10 @@ def test_submit(self):
130126
self.assertTrue(res["OK"])
131127
jobsSubmittedList.append(res["Value"])
132128

129+
res = memory_2_to4GB_MP()
130+
self.assertTrue(res["OK"])
131+
jobsSubmittedList.append(res["Value"])
132+
133133
res = parametricJob()
134134
self.assertTrue(res["OK"])
135135
jobsSubmittedList.append(res["Value"])

0 commit comments

Comments
 (0)