Skip to content

Commit 53f617c

Browse files
committed
feat: getting batch system info from pilot
1 parent b1f5e2b commit 53f617c

14 files changed

+268
-316
lines changed

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/HTCondorResourceUsage.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ class HTCondorResourceUsage(ResourceUsage):
1717
allow us to get an estimation of the resources usage.
1818
"""
1919

20-
def __init__(self):
20+
def __init__(self, jobID, parameters):
2121
"""Standard constructor"""
22-
super().__init__("HTCondor", "_CONDOR_JOB_AD")
22+
super().__init__("HTCondor", jobID, parameters)
2323

2424
def getResourceUsage(self):
2525
"""Returns S_OK with a dictionary containing the entries WallClock, WallClockLimit, and Unit for current slot."""
@@ -29,8 +29,7 @@ def getResourceUsage(self):
2929
# only present on some Sites
3030
# - CurrentTime: current time
3131
# - JobCurrentStartDate: start of the job execution
32-
jobDescription = os.environ.get("_CONDOR_JOB_AD")
33-
cmd = f"condor_status -ads {jobDescription} -af MaxRuntime CurrentTime-JobCurrentStartDate"
32+
cmd = f"condor_status -ads {self.info_path} -af MaxRuntime CurrentTime-JobCurrentStartDate"
3433
result = runCommand(cmd)
3534
if not result["OK"]:
3635
return S_ERROR("Current batch system is not supported")

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/LSFResourceUsage.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,14 @@ class LSFResourceUsage(ResourceUsage):
1919
This is the LSF plugin of the TimeLeft Utility
2020
"""
2121

22-
def __init__(self):
22+
def __init__(self, jobID, parameters):
2323
"""Standard constructor"""
24-
super().__init__("LSF", "LSB_JOBID")
24+
super().__init__("LSF", jobID, parameters)
2525

26-
self.queue = os.environ.get("LSB_QUEUE")
27-
self.bin = os.environ.get("LSF_BINDIR")
28-
self.host = os.environ.get("LSB_HOSTS")
2926
self.year = time.strftime("%Y", time.gmtime())
3027
self.log.verbose(
3128
"LSB_JOBID={}, LSB_QUEUE={}, LSF_BINDIR={}, LSB_HOSTS={}".format(
32-
self.jobID, self.queue, self.bin, self.host
29+
self.jobID, self.queue, self.binary_path, self.host
3330
)
3431
)
3532

@@ -39,7 +36,7 @@ def __init__(self):
3936
self.wallClockLimit = None
4037
self.hostNorm = None
4138

42-
cmd = f"{self.bin}/bqueues -l {self.queue}"
39+
cmd = f"{self.binary_path}/bqueues -l {self.queue}"
4340
result = runCommand(cmd)
4441
if not result["OK"]:
4542
return
@@ -73,7 +70,7 @@ def __init__(self):
7370
# Now try to get the CPU_FACTOR for this reference CPU,
7471
# it must be either a Model, a Host or the largest Model
7572

76-
cmd = f"{self.bin}/lshosts -w {self.cpuRef}"
73+
cmd = f"{self.binary_path}/lshosts -w {self.cpuRef}"
7774
result = runCommand(cmd)
7875
if result["OK"]:
7976
# At CERN this command will return an error since there is no host defined
@@ -97,7 +94,7 @@ def __init__(self):
9794

9895
if not self.normRef:
9996
# Try if there is a model define with the name of cpuRef
100-
cmd = f"{self.bin}/lsinfo -m"
97+
cmd = f"{self.binary_path}/lsinfo -m"
10198
result = runCommand(cmd)
10299
if result["OK"]:
103100
lines = str(result["Value"]).split("\n")
@@ -120,7 +117,7 @@ def __init__(self):
120117
if not self.normRef:
121118
# Now parse LSF configuration files
122119
if not os.path.isfile("./lsf.sh"):
123-
os.symlink(os.path.join(os.environ["LSF_ENVDIR"], "lsf.conf"), "./lsf.sh")
120+
os.symlink(os.path.join(self.info_path, "lsf.conf"), "./lsf.sh")
124121
# As the variables are not exported, we must force it
125122
ret = sourceEnv(10, ["./lsf", "&& export LSF_CONFDIR"])
126123
if ret["OK"]:
@@ -170,7 +167,7 @@ def __init__(self):
170167

171168
# Now get the Normalization for the current Host
172169
if self.host:
173-
cmd = f"{self.bin}/lshosts -w {self.host}"
170+
cmd = f"{self.binary_path}/lshosts -w {self.host}"
174171
result = runCommand(cmd)
175172
if result["OK"]:
176173
lines = str(result["Value"]).split("\n")
@@ -201,15 +198,15 @@ def getResourceUsage(self):
201198
"""Returns S_OK with a dictionary containing the entries CPU, CPULimit,
202199
WallClock, WallClockLimit, and Unit for current slot.
203200
"""
204-
if not self.bin:
201+
if not self.binary_path:
205202
return S_ERROR("Could not determine bin directory for LSF")
206203
if not self.hostNorm:
207204
return S_ERROR("Could not determine host Norm factor")
208205

209206
cpu = None
210207
wallClock = None
211208

212-
cmd = f"{self.bin}/bjobs -W {self.jobID}"
209+
cmd = f"{self.binary_path}/bjobs -W {self.jobID}"
213210
result = runCommand(cmd)
214211
if not result["OK"]:
215212
return result

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/MJFResourceUsage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ class MJFResourceUsage(ResourceUsage):
1515
"""
1616

1717
#############################################################################
18-
def __init__(self):
18+
def __init__(self, jobID, parameters):
1919
"""Standard constructor"""
20-
super().__init__("MJF", "JOB_ID")
20+
super().__init__("MJF", jobID, parameters)
2121

2222
self.queue = os.environ.get("QUEUE")
2323

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/PBSResourceUsage.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,12 @@ class PBSResourceUsage(ResourceUsage):
1515
This is the PBS plugin of the TimeLeft Utility
1616
"""
1717

18-
def __init__(self):
18+
def __init__(self, jobID, parameters):
1919
"""Standard constructor"""
20-
super().__init__("PBS", "PBS_JOBID")
20+
super().__init__("PBS", jobID, parameters)
2121

22-
self.queue = os.environ.get("PBS_O_QUEUE")
23-
pbsPath = os.environ.get("PBS_O_PATH")
24-
if pbsPath:
25-
os.environ["PATH"] += ":" + pbsPath
22+
if self.binary_path and self.binary_path != "Unknown":
23+
os.environ["PATH"] += ":" + self.binary_path
2624

2725
self.log.verbose(f"PBS_JOBID={self.jobID}, PBS_O_QUEUE={self.queue}")
2826
self.startTime = time.time()

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/ResourceUsage.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,16 @@ class ResourceUsage:
1212
(e.g. getting the time left in a Pilot)
1313
"""
1414

15-
def __init__(self, batchSystemName, jobIdEnvVar):
15+
def __init__(self, batchSystemName, jobID, parameters):
1616
"""Standard constructor"""
1717
self.log = gLogger.getSubLogger(f"{batchSystemName}ResourceUsage")
18-
self.jobID = os.environ.get(jobIdEnvVar)
18+
self.jobID = jobID
19+
20+
# Parameters
21+
self.binary_path = parameters.get("BinaryPath")
22+
self.info_path = parameters.get("InfoPath")
23+
self.host = parameters.get("Host")
24+
self.queue = parameters.get("Queue")
1925

2026
def getResourceUsage(self):
2127
"""Returns S_OK with a dictionary that can contain entries:

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SGEResourceUsage.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@ class SGEResourceUsage(ResourceUsage):
1616
This is the SGE plugin of the TimeLeft Utility
1717
"""
1818

19-
def __init__(self):
19+
def __init__(self, jobID, parameters):
2020
"""Standard constructor"""
21-
super().__init__("SGE", "JOB_ID")
21+
super().__init__("SGE", jobID, parameters)
2222

23-
self.queue = os.environ.get("QUEUE")
24-
sgePath = os.environ.get("SGE_BINARY_PATH")
25-
if sgePath:
26-
os.environ["PATH"] += ":" + sgePath
23+
if self.binary_path and self.binary_path != "Unknown":
24+
os.environ["PATH"] += ":" + self.binary_path
2725

2826
self.log.verbose(f"JOB_ID={self.jobID}, QUEUE={self.queue}")
2927
self.startTime = time.time()

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/SLURMResourceUsage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ class SLURMResourceUsage(ResourceUsage):
1111
This is the SLURM plugin of the TimeLeft Utility
1212
"""
1313

14-
def __init__(self):
14+
def __init__(self, jobID, parameters):
1515
"""Standard constructor"""
16-
super().__init__("SLURM", "SLURM_JOB_ID")
16+
super().__init__("SLURM", jobID, parameters)
1717

1818
self.log.verbose(f"JOB_ID={self.jobID}")
1919

src/DIRAC/Resources/Computing/BatchSystems/TimeLeft/TimeLeft.py

Lines changed: 22 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
With this information the utility can calculate in normalized units the
1010
CPU time remaining for a given slot.
1111
"""
12-
import os
1312
import shlex
1413

1514
import DIRAC
1615

1716
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
17+
from DIRAC.Core.Utilities import DErrno
18+
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1819
from DIRAC.Core.Utilities.Subprocess import systemCall
1920

2021

@@ -29,7 +30,7 @@ def __init__(self):
2930
if not self.cpuPower:
3031
self.log.warn(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")
3132

32-
result = self.__getBatchSystemPlugin()
33+
result = self._getBatchSystemPlugin()
3334
if result["OK"]:
3435
self.batchPlugin = result["Value"]
3536
else:
@@ -65,7 +66,9 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
6566
"""
6667
# Quit if no norm factor available
6768
if not self.cpuPower:
68-
return S_ERROR(f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}")
69+
return S_ERROR(
70+
DErrno.ESECTION, f"/LocalSite/CPUNormalizationFactor not defined for site {DIRAC.siteName()}"
71+
)
6972

7073
if not self.batchPlugin:
7174
return S_ERROR(self.batchError)
@@ -126,54 +129,24 @@ def getTimeLeft(self, cpuConsumed=0.0, processors=1):
126129
self.log.verbose(f"Remaining CPU in normalized units is: {cpuWorkLeft:.02f}")
127130
return S_OK(cpuWorkLeft)
128131

129-
def __getBatchSystemPlugin(self):
132+
def _getBatchSystemPlugin(self):
130133
"""Using the name of the batch system plugin, will return an instance of the plugin class."""
131-
batchSystems = {
132-
"LSF": "LSB_JOBID",
133-
"PBS": "PBS_JOBID",
134-
"BQS": "QSUB_REQNAME",
135-
"SGE": "SGE_TASK_ID",
136-
"SLURM": "SLURM_JOB_ID",
137-
"HTCondor": "_CONDOR_JOB_AD",
138-
} # more to be added later
139-
name = None
140-
for batchSystem, envVar in batchSystems.items():
141-
if envVar in os.environ:
142-
name = batchSystem
143-
break
144-
145-
if name is None and "MACHINEFEATURES" in os.environ and "JOBFEATURES" in os.environ:
146-
# Only use MJF if legacy batch system information not available for now
147-
name = "MJF"
148-
149-
if name is None:
134+
batchSystemInfo = gConfig.getSections("/LocalSite/BatchSystem")
135+
type = batchSystemInfo.get("Type")
136+
jobID = batchSystemInfo.get("JobID")
137+
parameters = batchSystemInfo.get("Parameters")
138+
139+
if not type or type == "Unknown":
150140
self.log.warn(f"Batch system type for site {DIRAC.siteName()} is not currently supported")
151-
return S_ERROR("Current batch system is not supported")
152-
153-
self.log.debug(f"Creating plugin for {name} batch system")
154-
try:
155-
batchSystemName = f"{name}ResourceUsage"
156-
batchPlugin = __import__(
157-
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.%s"
158-
% batchSystemName, # pylint: disable=unused-variable
159-
globals(),
160-
locals(),
161-
[batchSystemName],
162-
)
163-
except ImportError as x:
164-
msg = f"Could not import DIRAC.Resources.Computing.BatchSystems.TimeLeft.{batchSystemName}"
165-
self.log.warn(x)
166-
self.log.warn(msg)
167-
return S_ERROR(msg)
168-
169-
try:
170-
batchStr = f"batchPlugin.{batchSystemName}()"
171-
batchInstance = eval(batchStr)
172-
except Exception as x: # pylint: disable=broad-except
173-
msg = f"Could not instantiate {batchSystemName}()"
174-
self.log.warn(x)
175-
self.log.warn(msg)
176-
return S_ERROR(msg)
141+
return S_ERROR(DErrno.ERESUNK, "Current batch system is not supported")
142+
143+
self.log.debug(f"Creating plugin for {type} batch system")
144+
145+
result = ObjectLoader().loadObject(f"DIRAC.Resources.Computing.BatchSystems.TimeLeft.{type}ResourceUsage")
146+
if not result["OK"]:
147+
return result
148+
batchClass = result["Value"]
149+
batchInstance = batchClass(jobID, parameters)
177150

178151
return S_OK(batchInstance)
179152

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
""" Test class for SGEResourceUsage utility
2+
"""
3+
4+
import pytest
5+
6+
from DIRAC import S_OK
7+
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage import HTCondorResourceUsage
8+
9+
10+
HTCONDOR_OUT_0 = "86400 3600"
11+
HTCONDOR_OUT_1 = "undefined 3600"
12+
HTCONDOR_OUT_2 = ""
13+
14+
15+
def test_getResourceUsage(mocker):
16+
mocker.patch(
17+
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.HTCondorResourceUsage.runCommand",
18+
side_effect=[S_OK(HTCONDOR_OUT_0), S_OK(HTCONDOR_OUT_1), S_OK(HTCONDOR_OUT_2)],
19+
)
20+
21+
# First test: everything is fine
22+
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
23+
res = htcondorResourceUsage.getResourceUsage()
24+
assert res["OK"], res["Message"]
25+
assert res["Value"]["WallClock"] == 3600
26+
assert res["Value"]["WallClockLimit"] == 86400
27+
28+
# Second test: MaxRuntime is undefined
29+
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
30+
res = htcondorResourceUsage.getResourceUsage()
31+
assert not res["OK"]
32+
assert res["Message"] == "Current batch system is not supported"
33+
34+
# Third test: empty output
35+
htcondorResourceUsage = HTCondorResourceUsage("1234", {"Queue": "Test", "InfoPath": "/path/to/condor_ad"})
36+
res = htcondorResourceUsage.getResourceUsage()
37+
assert not res["OK"]
38+
assert res["Message"] == "Current batch system is not supported"

0 commit comments

Comments
 (0)