Skip to content

Commit ed7ac28

Browse files
author
Andrei Tsaregorodtsev
authored
Merge pull request #5682 from DIRACGridBot/cherry-pick-2-3394339f4-integration
[sweep:integration] sweep Bdii2CSAgent features: bannedCEs, SingleCoreQueues
2 parents e83f20b + 61c0a63 commit ed7ac28

File tree

5 files changed

+93
-12
lines changed

5 files changed

+93
-12
lines changed

docs/source/AdministratorGuide/Resources/computingelements.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,10 @@ of CEs are describe in the subsections below
112112

113113
Note that there's no absolute need to define a 1-to-1 relation between CEs and Queues in DIRAC and "in real".
114114
If for example you want to send, to the same queue, a mix of single processor and multiprocessor Pilots,
115-
you can define two queues identical but for the NumberOfProcessors parameter.
115+
you can define two queues identical but for the NumberOfProcessors parameter. To avoid sending single
116+
processor jobs to multiprocessor queues, add the ``RequiredTag=MultiProcessor`` option to a multiprocessor queue. To
117+
automatically create the equivalent single core queues, see the :mod:`~DIRAC.ConfigurationSystem.Agent.Bdii2CSAgent`
118+
configuration.
116119

117120

118121
CREAM Computing Element

src/DIRAC/ConfigurationSystem/Agent/Bdii2CSAgent.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def __init__(self, *args, **kwargs):
4343
self.voBdiiCEDict = {}
4444
self.voBdiiSEDict = {}
4545
self.host = "cclcgtopbdii01.in2p3.fr:2170"
46-
46+
self.injectSingleCoreQueues = False
4747
self.csAPI = None
4848

4949
# What to get
@@ -61,6 +61,7 @@ def initialize(self):
6161
# Create a list of alternative bdii urls
6262
self.alternativeBDIIs = self.am_getOption("AlternativeBDIIs", self.alternativeBDIIs)
6363
self.host = self.am_getOption("Host", self.host)
64+
self.injectSingleCoreQueues = self.am_getOption("InjectSingleCoreQueues", self.injectSingleCoreQueues)
6465

6566
# Check if the bdii url is appended by a port number, if not append the default 2170
6667
for index, url in enumerate(self.alternativeBDIIs):
@@ -232,13 +233,20 @@ def __updateCEs(self):
232233
"""Update the Site/CE/queue settings in the CS if they were changed in the BDII"""
233234

234235
bdiiChangeSet = set()
236+
bannedCEs = self.am_getOption("BannedCEs", [])
235237

236238
for vo in self.voName:
237239
result = self.__getGlue2CEInfo(vo)
238240
if not result["OK"]:
239241
continue
240242
ceBdiiDict = result["Value"]
241-
result = getSiteUpdates(vo, bdiiInfo=ceBdiiDict, log=self.log)
243+
244+
for _siteName, ceDict in ceBdiiDict.items():
245+
for bannedCE in bannedCEs:
246+
ceDict["CEs"].pop(bannedCE, None)
247+
248+
result = getSiteUpdates(vo, bdiiInfo=ceBdiiDict, log=self.log, onecore=self.injectSingleCoreQueues)
249+
242250
if not result["OK"]:
243251
continue
244252
bdiiChangeSet = bdiiChangeSet.union(result["Value"])

src/DIRAC/ConfigurationSystem/Client/Utilities.py

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
__RCSID__ = "$Id$"
1212

13+
from copy import deepcopy
1314
import socket
1415

1516
from DIRAC import gConfig, gLogger, S_OK, S_ERROR
@@ -103,12 +104,14 @@ def getGridCEs(vo, bdiiInfo=None, ceBlackList=None, hostURL=None):
103104
return result
104105

105106

106-
def getSiteUpdates(vo, bdiiInfo=None, log=None):
107+
def getSiteUpdates(vo, bdiiInfo=None, log=None, onecore=False):
107108
"""Get all the necessary updates for the already defined sites and CEs
108109
109110
:param str vo: VO name
110111
:param dict bdiiInfo: information from DBII
111112
:param object log: logger
113+
:param bool onecore: whether to add single core copies of multicore queues, see the documentation about :ref:`CE`
114+
and the :mod:`~DIRAC.ConfigurationSystem.Agent.Bdii2CSAgent` configuration for details
112115
113116
:result: S_OK(set)/S_ERROR()
114117
"""
@@ -123,6 +126,24 @@ def addToChangeSet(entry, changeSet):
123126
if new_value and new_value != value:
124127
changeSet.add(entry)
125128

129+
def dropTag(tags, tagToDrop):
130+
"""Remove tag from a comma-separated string of tags.
131+
132+
:param str tags: the string of current tags
133+
:param str tagToDrop: the tag to potentially remove
134+
:return: string of comma separated tags
135+
"""
136+
return ",".join(sorted(set(tags.split(",")).difference({tagToDrop}))).strip(",")
137+
138+
def addTag(tags, tagToAdd):
139+
"""Add tag to a comma-separated string of tags.
140+
141+
:param str tags: the string of current tags
142+
:param str tagToAdd: the tag to potentially add
143+
:return: string of comma separated tags
144+
"""
145+
return ",".join(sorted(set(tags.split(",")).union({tagToAdd}))).strip(",")
146+
126147
if log is None:
127148
log = gLogger
128149

@@ -133,6 +154,35 @@ def addToChangeSet(entry, changeSet):
133154
return result
134155
ceBdiiDict = result["Value"]
135156

157+
if onecore:
158+
# If enabled this creates a copy of the queue with multiple processors and sets NumberOfProcessors to 1 for ARC
159+
# and HTCondorCE entries
160+
161+
def makeNewQueueName(queueName, ceType):
162+
"""Create a new queueName for single core queues."""
163+
if ceType == "HTCondorCE":
164+
return queueName + "1core"
165+
# we should have only ARC left, we add 1core to the middle part
166+
queueNameSplit = queueName.split("-", 2)
167+
queueNameSplit[1] = queueNameSplit[1] + "1core"
168+
return "-".join(queueNameSplit)
169+
170+
for siteName, ceDict in ceBdiiDict.items():
171+
for _ceName, ceInfo in ceDict["CEs"].items():
172+
newQueues = dict()
173+
for queueName, queueDict in ceInfo["Queues"].items():
174+
if (
175+
queueDict["GlueCEImplementationName"] not in ("ARC", "HTCondorCE")
176+
or int(queueDict.get("NumberOfProcessors", 1)) == 1
177+
):
178+
continue
179+
newQueueName = makeNewQueueName(queueName, queueDict["GlueCEImplementationName"])
180+
newQueueDict = deepcopy(queueDict)
181+
newQueueDict["NumberOfProcessors"] = 1
182+
newQueues[newQueueName] = newQueueDict
183+
184+
ceInfo["Queues"].update(newQueues)
185+
136186
changeSet = set()
137187
for site in ceBdiiDict:
138188
result = getDIRACSiteName(site)
@@ -266,6 +316,7 @@ def addToChangeSet(entry, changeSet):
266316

267317
# tags, processors, localCEType
268318
tag = queueDict.get("Tag", "")
319+
reqTag = queueDict.get("RequiredTag", "")
269320
# LocalCEType can be empty (equivalent to "InProcess")
270321
# or "Pool", "Singularity", but also "Pool/Singularity"
271322
localCEType = queueDict.get("LocalCEType", "")
@@ -280,12 +331,19 @@ def addToChangeSet(entry, changeSet):
280331
# Adding queue info to the CS
281332
addToChangeSet((queueSection, "maxCPUTime", maxCPUTime, newMaxCPUTime), changeSet)
282333
addToChangeSet((queueSection, "SI00", si00, newSI00), changeSet)
334+
335+
# add RequiredTag if onecore is enabled, do this here for previously created MultiCore queues
336+
if newNOP > 1 and onecore:
337+
addToChangeSet(
338+
(queueSection, "RequiredTag", reqTag, addTag(reqTag, "MultiProcessor")), changeSet
339+
)
340+
283341
if newNOP != numberOfProcessors:
284342
addToChangeSet((queueSection, "NumberOfProcessors", numberOfProcessors, newNOP), changeSet)
285343
if newNOP > 1:
286344
# if larger than one, add MultiProcessor to site tags, and LocalCEType=Pool
287-
newTag = ",".join(sorted(set(tag.split(",")).union({"MultiProcessor"}))).strip(",")
288-
addToChangeSet((queueSection, "Tag", tag, newTag), changeSet)
345+
addToChangeSet((queueSection, "Tag", tag, addTag(tag, "MultiProcessor")), changeSet)
346+
289347
if localCEType_inner:
290348
newLocalCEType = "Pool/" + localCEType_inner
291349
else:
@@ -294,8 +352,10 @@ def addToChangeSet(entry, changeSet):
294352
else:
295353
# if not larger than one, drop MultiProcessor Tag.
296354
# Here we do not change the LocalCEType as Pool CE would still be perfectly valid.
297-
newTag = ",".join(sorted(set(tag.split(",")).difference({"MultiProcessor"}))).strip(",")
298-
changeSet.add((queueSection, "Tag", tag, newTag))
355+
changeSet.add((queueSection, "Tag", tag, dropTag(tag, "MultiProcessor")))
356+
if onecore:
357+
changeSet.add((queueSection, "RequiredTag", reqTag, dropTag(reqTag, "MultiProcessor")))
358+
299359
if maxTotalJobs == "Unknown":
300360
newTotalJobs = min(1000, int(int(queueInfo.get("GlueCEInfoTotalCPUs", 0)) / 2))
301361
newWaitingJobs = max(2, int(newTotalJobs * 0.1))

src/DIRAC/ConfigurationSystem/ConfigTemplate.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ Agents
4242
DryRun = True
4343
# Host to query, must include port
4444
Host = cclcgtopbdii01.in2p3.fr:2170
45+
# If True, add single core queues for each Multi Core Queue and set
46+
# RequiredTag=MultiProcessor for those
47+
InjectSingleCoreQueues = False
4548
}
4649
##END
4750
##BEGIN VOMS2CSAgent

src/DIRAC/ConfigurationSystem/scripts/dirac_admin_add_resources.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,23 @@
2727

2828
def processScriptSwitches():
2929

30-
global vo, dry, doCEs, hostURL
30+
global vo, dry, doCEs, hostURL, onecore
3131

3232
Script.registerSwitch("V:", "vo=", "Virtual Organization")
3333
Script.registerSwitch("D", "dry", "Dry run")
3434
Script.registerSwitch("C", "ce", "Process Computing Elements")
3535
Script.registerSwitch("H:", "host=", "use this url for information querying")
36+
Script.registerSwitch(
37+
"", "onecore", "Add Single Core Queues for each MultiCore Queue, set RequiredTag for those Queues"
38+
)
3639
Script.parseCommandLine(ignoreErrors=True)
3740

3841
vo = ""
3942
dry = False
4043
doCEs = False
4144
hostURL = None
45+
onecore = False
46+
4247
for sw in Script.getUnprocessedSwitches():
4348
if sw[0] in ("V", "vo"):
4449
vo = sw[1]
@@ -48,6 +53,8 @@ def processScriptSwitches():
4853
doCEs = True
4954
if sw[0] in ("H", "host"):
5055
hostURL = sw[1]
56+
if sw[0] in ("onecore",):
57+
onecore = True
5158

5259

5360
ceBdiiDict = None
@@ -100,7 +107,7 @@ def checkUnusedCEs():
100107

101108
inp = input("\nDo you want to add sites ? [default=yes] [yes|no]: ")
102109
inp = inp.strip()
103-
if not inp and inp.lower().startswith("n"):
110+
if inp and inp.lower().startswith("n"):
104111
return
105112

106113
gLogger.notice("\nAdding new sites/CEs interactively\n")
@@ -241,9 +248,9 @@ def updateCS(changeSet):
241248

242249
def updateSites():
243250

244-
global vo, dry, ceBdiiDict
251+
global vo, dry, ceBdiiDict, onecore
245252

246-
result = getSiteUpdates(vo, bdiiInfo=ceBdiiDict)
253+
result = getSiteUpdates(vo, bdiiInfo=ceBdiiDict, onecore=onecore)
247254
if not result["OK"]:
248255
gLogger.error("Failed to get site updates", result["Message"])
249256
DIRACExit(-1)

0 commit comments

Comments
 (0)