Skip to content

Commit ec24b9d

Browse files
committed
fix: adding basic capabilities to InProcessCE
1 parent 334ed4d commit ec24b9d

File tree

2 files changed

+86
-21
lines changed

2 files changed

+86
-21
lines changed

src/DIRAC/Resources/Computing/InProcessComputingElement.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
""" The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)
1+
"""The simplest of the "inner" CEs (meaning it's used by a jobAgent inside a pilot)
22
3-
A "InProcess" CE instance submits jobs in the current process.
4-
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
3+
A "InProcess" CE instance submits jobs in the current process.
4+
This is the standard "inner CE" invoked from the JobAgent, main alternative being the PoolCE
55
"""
6+
67
import os
78
import stat
89

9-
from DIRAC import S_OK, S_ERROR
10-
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
10+
from DIRAC import S_ERROR, S_OK
1111
from DIRAC.Core.Utilities.CGroups2 import CG2Manager
12-
12+
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
1313
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
1414

1515

@@ -21,8 +21,8 @@ def __init__(self, ceUniqueID):
2121
self.submittedJobs = 0
2222
self.runningJobs = 0
2323

24-
self.processors = int(self.ceParameters.get("NumberOfProcessors", 1))
25-
self.maxRAM = int(self.ceParameters.get("MaxRAM", 0))
24+
self.processors = 1
25+
self.maxRAM = 0
2626
self.ceParameters["MaxTotalJobs"] = 1
2727

2828
def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
@@ -34,6 +34,16 @@ def submitJob(self, executableFile, proxy=None, inputs=None, **kwargs):
3434
:param list inputs: dependencies of executableFile
3535
:return: S_OK(payload exit code) / S_ERROR() if submission issue
3636
"""
37+
self.processors = int(self.ceParameters.get("NumberOfProcessors", self.processors))
38+
self.maxRAM = int(self.ceParameters.get("MaxRAM", self.maxRAM))
39+
40+
if "numberOfProcessors" in kwargs:
41+
if self.processors < int(kwargs["numberOfProcessors"]):
42+
return S_ERROR("Requesting processors not available")
43+
if "MaxRAM" in kwargs:
44+
if self.maxRAM < int(kwargs["MaxRAM"]):
45+
return S_ERROR("Requesting RAM not available")
46+
3747
payloadEnv = dict(os.environ)
3848
payloadProxy = ""
3949
renewTask = None

src/DIRAC/Resources/Computing/test/Test_InProcessComputingElement.py

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,41 @@
88

99
import pytest
1010

11-
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import jobScript, _stopJob
12-
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
13-
1411
# sut
1512
from DIRAC.Resources.Computing.InProcessComputingElement import InProcessComputingElement
13+
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import _stopJob, jobScript
14+
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
1615

1716

1817
@pytest.mark.slow
19-
def test_submitJob():
18+
@pytest.mark.parametrize(
19+
"ce_parameters, available_processors, ram",
20+
[
21+
({}, 1, 0),
22+
({"NumberOfProcessors": 8}, 8, 0),
23+
({"MaxRAM": 2048}, 1, 2048),
24+
({"NumberOfProcessors": 8, "MaxRAM": 2048}, 8, 2048),
25+
],
26+
)
27+
def test_submitJob(ce_parameters, available_processors, ram):
28+
# initialization
29+
ce = InProcessComputingElement("InProcessCE")
30+
ce.ceParameters = ce_parameters
31+
32+
# simple
2033
with open("testJob.py", "w") as execFile:
2134
execFile.write(jobScript % "1")
2235
os.chmod("testJob.py", 0o755)
2336

24-
ce = InProcessComputingElement("InProcessCE")
2537
res = ce.submitJob("testJob.py", None)
2638
assert res["OK"] is True
2739
res = ce.getCEStatus()
2840
assert res["OK"] is True
2941
assert res["SubmittedJobs"] == 1
42+
assert res["RunningJobs"] == 0
43+
assert res["WaitingJobs"] == 0
44+
assert res["AvailableProcessors"] == available_processors
45+
assert res["AvailableRAM"] == ram
3046
_stopJob(1)
3147
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
3248
if os.path.isfile(ff):
@@ -44,29 +60,68 @@ def test_submitJob():
4460

4561
wrapperFile = createJobWrapper(
4662
jobID=2, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
47-
)["Value"][
48-
"JobExecutablePath"
49-
] # This is not under test, assuming it works fine
63+
)["Value"]["JobExecutablePath"] # This is not under test, assuming it works fine
5064
res = ce.submitJob(
5165
wrapperFile,
5266
proxy=None,
53-
numberOfProcessors=4,
54-
maxNumberOfProcessors=8,
67+
numberOfProcessors=available_processors,
68+
maxNumberOfProcessors=available_processors,
5569
wholeNode=False,
5670
mpTag=True,
57-
MinRAM=2500,
58-
MaxRAM=4000,
71+
MinRAM=ram,
72+
MaxRAM=ram,
5973
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
6074
)
6175
assert res["OK"] is True
76+
_stopJob(2)
6277

6378
res = ce.getCEStatus()
6479
assert res["OK"] is True
6580
assert res["SubmittedJobs"] == 2
81+
assert res["RunningJobs"] == 0
82+
assert res["WaitingJobs"] == 0
83+
assert res["AvailableProcessors"] == available_processors
84+
assert res["AvailableRAM"] == ram
6685

67-
_stopJob(2)
6886
for ff in ["testJob.py", "stop_job_2", "job.info", "std.out"]:
6987
if os.path.isfile(ff):
7088
os.remove(ff)
7189
if os.path.isdir("job"):
7290
shutil.rmtree("job")
91+
92+
# failing
93+
with open("testJob.py", "w") as execFile:
94+
execFile.write(jobScript % "3")
95+
os.chmod("testJob.py", 0o755)
96+
97+
jobParams = {"JobType": "User", "Executable": "testJob.py"}
98+
resourceParams = {"GridCE": "some_CE"}
99+
optimizerParams = {}
100+
101+
wrapperFile = createJobWrapper(
102+
jobID=3, jobParams=jobParams, resourceParams=resourceParams, optimizerParams=optimizerParams, logLevel="DEBUG"
103+
)["Value"]["JobExecutablePath"] # This is not under test, assuming it works fine
104+
105+
res = ce.submitJob(
106+
wrapperFile,
107+
proxy=None,
108+
numberOfProcessors=4 + available_processors,
109+
maxNumberOfProcessors=8 + available_processors,
110+
wholeNode=False,
111+
mpTag=True,
112+
MinRAM=2500,
113+
MaxRAM=4000,
114+
jobDesc={"jobParams": jobParams, "resourceParams": resourceParams, "optimizerParams": optimizerParams},
115+
)
116+
assert res["OK"] is False
117+
res = ce.getCEStatus()
118+
assert res["OK"] is True
119+
assert res["SubmittedJobs"] == 2
120+
assert res["RunningJobs"] == 0
121+
assert res["WaitingJobs"] == 0
122+
assert res["AvailableProcessors"] == available_processors
123+
assert res["AvailableRAM"] == ram
124+
_stopJob(1)
125+
for ff in ["testJob.py", "stop_job_3", "job.info", "std.out"]:
126+
if os.path.isfile(ff):
127+
os.remove(ff)

0 commit comments

Comments
 (0)