Skip to content

Commit b605a2d

Browse files
committed
fix: ComputingElement.initializeParameters() overriding tags instead of concatenating them
1 parent 7c2134c commit b605a2d

File tree

3 files changed

+142
-46
lines changed

3 files changed

+142
-46
lines changed

src/DIRAC/Resources/Computing/ComputingElement.py

Lines changed: 59 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -137,42 +137,41 @@ def initializeParameters(self):
137137

138138
self.log.debug("Initializing the CE parameters")
139139

140-
# Collect global defaults first
141-
for section in ["/Resources/Computing/CEDefaults", f"/Resources/Computing/{self.ceType}"]:
142-
result = gConfig.getOptionsDict(section)
143-
144-
self.log.debug(result)
145-
146-
if result["OK"]:
147-
ceOptions = result["Value"]
148-
for key in ceOptions:
149-
if key in INTEGER_PARAMETERS:
150-
ceOptions[key] = int(ceOptions[key])
151-
if key in FLOAT_PARAMETERS:
152-
ceOptions[key] = float(ceOptions[key])
153-
if key in LIST_PARAMETERS:
154-
ceOptions[key] = gConfig.getValue(os.path.join(section, key), [])
155-
self.ceParameters.update(ceOptions)
156-
157-
# Get local CE configuration
158-
localConfigDict = getCEConfigDict(self.ceName)
159-
self.ceParameters.update(localConfigDict)
160-
161-
# Adds site level parameters
162-
section = "/LocalSite"
163-
result = gConfig.getOptionsDict(section)
164-
if result["OK"] and result["Value"]:
165-
localSiteParameters = result["Value"]
166-
self.log.debug(f"Local site parameters are: {localSiteParameters}")
167-
for option, value in localSiteParameters.items():
168-
if option == "Architecture":
169-
self.ceParameters["Platform"] = value
170-
self.ceParameters["Architecture"] = value
171-
elif option == "LocalSE":
172-
self.ceParameters["LocalSE"] = value.split(", ")
173-
else:
174-
self.ceParameters[option] = value
175-
140+
# Collect global defaults first:
141+
# - /Resources/Computing/CEDefaults and /Resources/Computing/<CEType>
142+
# Then the local CE configuration:
143+
# - /LocalSite/<CEName>
144+
# Finally the site level parameters
145+
# - /LocalSite
146+
for section in [
147+
"/Resources/Computing/CEDefaults",
148+
f"/Resources/Computing/{self.ceType}",
149+
f"/LocalSite/{self.ceName}",
150+
"/LocalSite",
151+
]:
152+
ceParameters = getCEConfigDict(section)
153+
154+
# List parameters cannot be updated as any other fields, they should be concatenated in a set(), not overriden
155+
for listParam in LIST_PARAMETERS:
156+
# If listParam is not present or null, we remove it from ceParameters and continue
157+
if not listParam in ceParameters or not ceParameters[listParam]:
158+
ceParameters.pop(listParam, [])
159+
continue
160+
# Initialize self.ceParameters[listParam] is not done and update the set
161+
if not listParam in self.ceParameters:
162+
self.ceParameters[listParam] = set()
163+
self.ceParameters[listParam].update(set(ceParameters.pop(listParam)))
164+
165+
self.log.debug(f"CE Parameters from {section}:", ceParameters)
166+
self.ceParameters.update(ceParameters)
167+
168+
# Site level adjustments
169+
if "Architecture" in self.ceParameters:
170+
self.ceParameters["Platform"] = self.ceParameters["Architecture"]
171+
if "LocalSE" in self.ceParameters:
172+
self.ceParameters["LocalSE"] = self.ceParameters["LocalSE"].split(", ")
173+
174+
# Add default values if required
176175
self._addCEConfigDefaults()
177176

178177
def isValid(self):
@@ -466,12 +465,14 @@ def getDescription(self):
466465
for option, value in self.ceParameters.items():
467466
if isinstance(value, list):
468467
ceDict[option] = value
468+
elif isinstance(value, set):
469+
ceDict[option] = list(value)
469470
elif isinstance(value, str):
470471
try:
471472
ceDict[option] = int(value)
472473
except ValueError:
473474
ceDict[option] = value
474-
elif isinstance(value, (int,) + (float,)):
475+
elif isinstance(value, (int, float)):
475476
ceDict[option] = value
476477
else:
477478
self.log.warn(f"Type of option {option} = {value} not determined")
@@ -519,11 +520,24 @@ def shutdown(self):
519520
return S_OK(self.taskResults)
520521

521522

522-
def getCEConfigDict(ceName):
523-
"""Look into LocalSite for configuration Parameters for this CE"""
524-
ceConfigDict = {}
525-
if ceName:
526-
result = gConfig.getOptionsDict(f"/LocalSite/{ceName}")
527-
if result["OK"]:
528-
ceConfigDict = result["Value"]
529-
return ceConfigDict
523+
def getCEConfigDict(section: str) -> dict:
524+
"""Look into section for configuration Parameters for this CE
525+
526+
:param section: name of the CFG section to exploit
527+
"""
528+
529+
result = gConfig.getOptionsDict(section)
530+
531+
if not result["OK"]:
532+
return {}
533+
534+
ceOptions = result["Value"]
535+
for key in ceOptions:
536+
if key in INTEGER_PARAMETERS:
537+
ceOptions[key] = int(ceOptions[key])
538+
if key in FLOAT_PARAMETERS:
539+
ceOptions[key] = float(ceOptions[key])
540+
if key in LIST_PARAMETERS:
541+
ceOptions[key] = gConfig.getValue(os.path.join(section, key), [])
542+
543+
return ceOptions

src/DIRAC/Resources/Computing/ComputingElementFactory.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ def getCE(self, ceType="", ceName="", ceParametersDict={}):
2424
self.log.verbose(f"Creating CE for name {ceName}")
2525
ceTypeLocal = ceType if ceType else self.ceType
2626
ceNameLocal = ceName if ceName else ceType
27-
ceConfigDict = getCEConfigDict(ceNameLocal)
27+
ceConfigDict = getCEConfigDict(f"/LocalSite/{ceNameLocal}")
2828
self.log.verbose("CEConfigDict", ceConfigDict)
2929
if "CEType" in ceConfigDict:
3030
ceTypeLocal = ceConfigDict["CEType"]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
from diraccfg import CFG
2+
import pytest
3+
4+
from DIRAC import gConfig
5+
from DIRAC.ConfigurationSystem.Client.ConfigurationData import gConfigurationData
6+
from DIRAC.Resources.Computing.ComputingElement import ComputingElement
7+
8+
9+
pilotCfg1 = """
10+
LocalSite
11+
{
12+
Tag = Token
13+
CPUTime = 500
14+
Test
15+
{
16+
MaxTotalJobs = 20
17+
Tag = WholeNode
18+
}
19+
}
20+
Resources
21+
{
22+
Computing
23+
{
24+
CEDefaults
25+
{
26+
CPUTime = 50
27+
Tag = Token
28+
Tag += /cvmfs/dirac/
29+
}
30+
ComputingElement
31+
{
32+
CPUTime = 5000
33+
Tag = Test
34+
Tag += Token
35+
}
36+
}
37+
}
38+
"""
39+
40+
41+
pilotCfg2 = """
42+
LocalSite
43+
{
44+
Tag =
45+
}
46+
"""
47+
48+
49+
def setupConfig(config):
50+
"""Set up the configuration file
51+
52+
:param str config: configuration content to load
53+
"""
54+
gConfigurationData.localCFG = CFG()
55+
cfg = CFG()
56+
cfg.loadFromBuffer(config)
57+
gConfig.loadCFG(cfg)
58+
59+
60+
@pytest.mark.parametrize(
61+
"config, expectedValue",
62+
[
63+
(
64+
pilotCfg1,
65+
{
66+
"Tag": {"/cvmfs/dirac/", "Token", "WholeNode"},
67+
"CPUTime": 500,
68+
"MaxTotalJobs": 20,
69+
"WaitingToRunningRatio": 0.5,
70+
"MaxWaitingJobs": 1,
71+
},
72+
),
73+
(pilotCfg2, {"MaxTotalJobs": 1, "WaitingToRunningRatio": 0.5, "MaxWaitingJobs": 1}),
74+
],
75+
)
76+
def test_initializeParameters(config, expectedValue):
77+
"""Test the initialization of the CE parameters"""
78+
setupConfig(config)
79+
80+
ce = ComputingElement("Test")
81+
ce.initializeParameters()
82+
assert ce.ceParameters == expectedValue

0 commit comments

Comments
 (0)