Skip to content

Commit 915653a

Browse files
authored
Merge pull request #6530 from DIRACGridBot/cherry-pick-2-79fa433b7-integration
[sweep:integration] fix (resources): a few fixes related to ARC
2 parents 2d9a512 + 6c317ce commit 915653a

File tree

3 files changed

+47
-26
lines changed

3 files changed

+47
-26
lines changed

src/DIRAC/Resources/Computing/ARC6ComputingElement.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,19 @@ def __init__(self, ceUniqueID):
2323
# This should be reconstructed in getARCJob() to retrieve the outputs.
2424
self.restUrlPart = "rest/1.0/jobs/"
2525

26+
# Default ComputingInfo Endpoint, used to get details about the queues
27+
self.computingInfoEndpoint = "org.nordugrid.ldapglue2"
28+
29+
def _reset(self):
30+
super()._reset()
31+
# ComputingInfoEndpoint to get information about queues
32+
# https://www.nordugrid.org/arc/arc6/users/client_tools.html?#arcinfo
33+
# Expected values are: [
34+
# org.nordugrid.ldapng, org.nordugrid.ldapglue2, org.nordugrid.arcrest, org.ogf.glue.emies.resourceinfo
35+
# ]
36+
self.computingInfoEndpoint = self.ceParameters.get("ComputingInfoEndpoint", self.computingInfoEndpoint)
37+
return S_OK()
38+
2639
def _getARCJob(self, jobID):
2740
"""Create an ARC Job with all the needed / possible parameters defined.
2841
By the time we come here, the environment variable X509_USER_PROXY should already be set
@@ -82,7 +95,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
8295
stampDict = {}
8396

8497
# Creating an endpoint
85-
endpoint = arc.Endpoint(self.ceHost, arc.Endpoint.COMPUTINGINFO, "org.nordugrid.ldapglue2")
98+
endpoint = arc.Endpoint(self.ceHost, arc.Endpoint.COMPUTINGINFO, self.computingInfoEndpoint)
8699

87100
# Get the ExecutionTargets of the ComputingElement (Can be REST, EMI-ES or GRIDFTP)
88101
retriever = arc.ComputingServiceRetriever(self.usercfg, [endpoint])
@@ -163,7 +176,7 @@ def getCEStatus(self):
163176
self.usercfg.ProxyPath(os.environ["X509_USER_PROXY"])
164177

165178
# Creating an endpoint
166-
endpoint = arc.Endpoint(self.ceHost, arc.Endpoint.COMPUTINGINFO, "org.nordugrid.ldapglue2")
179+
endpoint = arc.Endpoint(self.ceHost, arc.Endpoint.COMPUTINGINFO, self.computingInfoEndpoint)
167180

168181
# Get the ExecutionTargets of the ComputingElement (Can be REST, EMI-ES or GRIDFTP)
169182
retriever = arc.ComputingServiceRetriever(self.usercfg, [endpoint])

src/DIRAC/Resources/Computing/ARCComputingElement.py

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@
3737
"""
3838
import os
3939
import stat
40+
import sys
4041

4142
import arc # Has to work if this module is called #pylint: disable=import-error
42-
from DIRAC import S_OK, S_ERROR, gConfig
43-
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping
43+
from DIRAC import S_OK, S_ERROR
4444
from DIRAC.Core.Utilities.Subprocess import shellCall
4545
from DIRAC.Core.Utilities.File import makeGuid
4646
from DIRAC.Core.Utilities.List import breakListIntoChunks
@@ -50,13 +50,6 @@
5050
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
5151

5252

53-
# Uncomment the following 5 lines for getting verbose ARC api output (debugging)
54-
# import sys
55-
# logstdout = arc.LogStream(sys.stdout)
56-
# logstdout.setFormat(arc.ShortFormat)
57-
# arc.Logger_getRootLogger().addDestination(logstdout)
58-
# arc.Logger_getRootLogger().setThreshold(arc.VERBOSE)
59-
6053
MANDATORY_PARAMETERS = ["Queue"] # Mandatory for ARC CEs in GLUE2?
6154
STATES_MAP = {
6255
"Accepted": PilotStatus.WAITING,
@@ -77,6 +70,8 @@
7770

7871
class ARCComputingElement(ComputingElement):
7972

73+
_arcLevels = ["DEBUG", "VERBOSE", "INFO", "WARNING", "ERROR", "FATAL"]
74+
8075
#############################################################################
8176
def __init__(self, ceUniqueID):
8277
"""Standard constructor."""
@@ -117,14 +112,14 @@ def _getARCJob(self, jobID):
117112
j.JobStatusURL = arc.URL(str(statURL))
118113
j.JobStatusInterfaceName = "org.nordugrid.ldapng"
119114

120-
mangURL = "gsiftp://%s:2811/jobs/" % (self.ceHost)
115+
mangURL = f"gsiftp://{self.ceHost}:2811/jobs/"
121116
j.JobManagementURL = arc.URL(str(mangURL))
122117
j.JobManagementInterfaceName = "org.nordugrid.gridftpjob"
123118

124119
j.ServiceInformationURL = j.JobManagementURL
125120
j.ServiceInformationInterfaceName = "org.nordugrid.ldapng"
126121
else:
127-
commonURL = "https://%s:8443/arex" % self.ceHost
122+
commonURL = f"https://{self.ceHost}:8443/arex"
128123
j.JobStatusURL = arc.URL(str(commonURL))
129124
j.JobStatusInterfaceName = "org.ogf.glue.emies.activitymanagement"
130125

@@ -195,7 +190,7 @@ def _writeXRSL(self, executableFile, inputs=None, outputs=None, executables=None
195190
if not isinstance(outputs, list):
196191
outputs = [outputs]
197192
for outputFile in outputs:
198-
xrslOutputs += '(%s "")' % (outputFile)
193+
xrslOutputs += f'({outputFile} "")'
199194

200195
xrsl = """
201196
&(executable="{executable}")
@@ -244,6 +239,23 @@ def _reset(self):
244239
self.log.warn("Unknown ARC endpoint, change to default", self.endpointType)
245240
else:
246241
self.endpointType = endpointType
242+
243+
# ARCLogLevel to enable/disable logs coming from the ARC client
244+
# Because the ARC logger works independently from the standard logging library,
245+
# it needs a specific initialization flag
246+
# Expected values are: ["", "DEBUG", "VERBOSE", "INFO", "WARNING", "ERROR" and "FATAL"]
247+
# Modifying the ARCLogLevel of an ARCCE instance would impact all existing instances within a same process.
248+
logLevel = self.ceParameters.get("ARCLogLevel", "")
249+
if logLevel:
250+
arc.Logger_getRootLogger().removeDestinations()
251+
if logLevel not in self._arcLevels:
252+
self.log.warn("ARCLogLevel input is not known:", f"{logLevel} not in {self._arcLevels}")
253+
else:
254+
logstdout = arc.LogStream(sys.stdout)
255+
logstdout.setFormat(arc.ShortFormat)
256+
arc.Logger_getRootLogger().addDestination(logstdout)
257+
arc.Logger_getRootLogger().setThreshold(getattr(arc, logLevel))
258+
247259
return S_OK()
248260

249261
#############################################################################
@@ -259,7 +271,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
259271
return result
260272
self.usercfg.ProxyPath(os.environ["X509_USER_PROXY"])
261273

262-
self.log.verbose("Executable file path: %s" % executableFile)
274+
self.log.verbose(f"Executable file path: {executableFile}")
263275
if not os.access(executableFile, 5):
264276
os.chmod(executableFile, stat.S_IRWXU | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + stat.S_IXOTH)
265277

@@ -286,8 +298,8 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
286298
jobdescs = arc.JobDescriptionList()
287299
# Get the job into the ARC way
288300
xrslString, diracStamp = self._writeXRSL(executableFile, inputs, outputs, executables)
289-
self.log.debug("XRSL string submitted : %s" % xrslString)
290-
self.log.debug("DIRAC stamp for job : %s" % diracStamp)
301+
self.log.debug(f"XRSL string submitted : {xrslString}")
302+
self.log.debug(f"DIRAC stamp for job : {diracStamp}")
291303
# The arc bindings don't accept unicode objects in Python 2 so xrslString must be explicitly cast
292304
result = arc.JobDescription.Parse(str(xrslString), jobdescs)
293305
if not result:

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
language is used in both cases. So, we retain the xrslExtraString and xrslMPExtraString strings.
55
"""
66

7-
8-
__RCSID__ = "$Id$"
9-
107
import os
118
import json
129
import requests
@@ -59,13 +56,12 @@ def _reset(self):
5956
Note : This is not run from __init__ as the design of DIRAC means that ceParameters is
6057
filled with CEDefaults only at the time this class is initialised for the given CE
6158
"""
62-
# super()._reset()
59+
super()._reset()
6360
self.log.debug("Testing if the REST interface is available", "for %s" % self.ceName)
6461

6562
# Get options from the ceParameters dictionary
6663
self.port = self.ceParameters.get("Port", self.port)
6764
self.restVersion = self.ceParameters.get("RESTVersion", self.restVersion)
68-
self.queue = self.ceParameters.get("Queue", self.queue)
6965

7066
self.proxyTimeLeftBeforeRenewal = self.ceParameters.get(
7167
"proxyTimeLeftBeforeRenewal", self.proxyTimeLeftBeforeRenewal
@@ -432,10 +428,10 @@ def getCEStatus(self):
432428
result["SubmittedJobs"] = 0
433429

434430
magic = self.queue + "_" + vo.lower()
435-
for i in range(len(queueInfo)):
436-
if queueInfo[i]["ID"].endswith(magic):
437-
result["RunningJobs"] = queueInfo[i]["RunningJobs"]
438-
result["WaitingJobs"] = queueInfo[i]["WaitingJobs"]
431+
for _, qi in enumerate(queueInfo):
432+
if qi["ID"].endswith(magic):
433+
result["RunningJobs"] = qi["RunningJobs"]
434+
result["WaitingJobs"] = qi["WaitingJobs"]
439435
break # Pick the first (should be only ...) matching queue + VO
440436

441437
return result

0 commit comments

Comments
 (0)