Skip to content

Commit 837deb2

Browse files
aldbrweb-flow
authored andcommitted
sweep: #6551 fix (resources): AREX CE further fixes
1 parent fbd89cd commit 837deb2

File tree

1 file changed

+60
-59
lines changed

1 file changed

+60
-59
lines changed

src/DIRAC/Resources/Computing/AREXComputingElement.py

Lines changed: 60 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def _reset(self):
5757
filled with CEDefaults only at the time this class is initialised for the given CE
5858
"""
5959
super()._reset()
60-
self.log.debug("Testing if the REST interface is available", "for %s" % self.ceName)
60+
self.log.debug("Testing if the REST interface is available", f"for {self.ceName}")
6161

6262
# Get options from the ceParameters dictionary
6363
self.port = self.ceParameters.get("Port", self.port)
@@ -119,7 +119,7 @@ def _DiracToArcID(self, diracJobID):
119119
def _UrlJoin(self, words):
120120
# Return a full URL. The base_url is already defined.
121121
if not isinstance(words, list):
122-
return "Unknown input : %s" % words
122+
return f"Unknown input : {words}"
123123
b_url = self.base_url.strip()
124124
q_url = b_url if b_url.endswith("/") else b_url + "/"
125125
for word in words:
@@ -130,7 +130,7 @@ def _UrlJoin(self, words):
130130

131131
#############################################################################
132132

133-
def _getDelegation(self, jobID):
133+
def _getDelegation(self, jobID=None):
134134
"""Here we handle the delegations (Nordugrid language) = Proxy (Dirac language)
135135
136136
If the jobID is empty:
@@ -155,18 +155,18 @@ def _getDelegation(self, jobID):
155155
"""
156156
# Create a delegation
157157
if not jobID:
158-
# Prepare the command
158+
# Prepare the command: starts a new delegation process
159159
command = "delegations"
160160
params = {"action": "new"}
161161
query = self._UrlJoin([command])
162162
if query.startswith("Unknown"):
163-
return S_ERROR("Problem creating REST query %s" % query)
163+
return S_ERROR(f"Problem creating REST query {query}")
164164

165165
# Get a proxy
166166
proxy = X509Chain()
167167
result = proxy.loadProxyFromFile(self.session.cert)
168168
if not result["OK"]:
169-
return S_ERROR("Can't load {}: {} ".format(self.session.cert, result["Message"]))
169+
return S_ERROR(f"Can't load {self.session.cert}: {result['Message']}")
170170

171171
# Submit a POST request
172172
response = self.session.post(
@@ -176,30 +176,33 @@ def _getDelegation(self, jobID):
176176
params=params,
177177
timeout=self.arcRESTTimeout,
178178
)
179-
delegationID = ""
180-
if response.ok:
181-
delegationURL = response.headers.get("location", "")
182-
if delegationURL:
183-
delegationID = delegationURL.split("new/")[-1]
184-
# Prepare the command
185-
command = "delegations/" + delegationID
186-
query = self._UrlJoin([command])
187-
if query.startswith("Unknown"):
188-
return S_ERROR("Problem creating REST query %s" % query)
189-
190-
# Submit the proxy
191-
response = self.session.put(
192-
query,
193-
data=response.text,
194-
headers=self.headers,
195-
timeout=self.arcRESTTimeout,
196-
)
197-
if not response.ok:
198-
self.log.warn(
199-
"Issue while interacting with the delegation",
200-
f"{response.status_code} - {response.reason}",
201-
)
202-
delegationID = ""
179+
if not response.ok:
180+
return S_ERROR(f"Failed to get a delegation ID: {response.status_code} {response.reason}")
181+
182+
# Extract delegationID from response
183+
delegationURL = response.headers.get("location", "")
184+
if not delegationURL:
185+
return S_ERROR(f"Cannot extract delegation ID from the response: {response.headers}")
186+
187+
delegationID = delegationURL.split("new/")[-1]
188+
189+
# Prepare the command:
190+
command = "delegations/" + delegationID
191+
query = self._UrlJoin([command])
192+
if query.startswith("Unknown"):
193+
return S_ERROR(f"Problem creating REST query {query}")
194+
195+
# Submit the proxy
196+
response = self.session.put(
197+
query,
198+
data=response.text,
199+
headers=self.headers,
200+
timeout=self.arcRESTTimeout,
201+
)
202+
if not response.ok:
203+
return S_ERROR(
204+
f"Issue while interacting with the delegation {response.status_code} - {response.reason}"
205+
)
203206

204207
return S_OK(delegationID)
205208

@@ -210,7 +213,7 @@ def _getDelegation(self, jobID):
210213
params = {"action": "delegations"}
211214
query = self._UrlJoin([command])
212215
if query.startswith("Unknown"):
213-
return S_ERROR("Problem creating REST query %s" % query)
216+
return S_ERROR(f"Problem creating REST query {query}")
214217

215218
# Submit the POST request to get the delegation
216219
jobsJson = {"job": [{"id": jobID}]}
@@ -236,7 +239,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
236239
"""
237240
if not self.session:
238241
return S_ERROR("REST interface not initialised. Cannot submit jobs.")
239-
self.log.verbose("Executable file path: %s" % executableFile)
242+
self.log.verbose("Executable file path:", executableFile)
240243

241244
# Get the name of the queue: nordugrid-<batchsystem>-<queue>
242245
self.arcQueue = self.queue.split("-", 2)[2]
@@ -253,16 +256,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
253256
params = {"action": "new"}
254257
query = self._UrlJoin([command])
255258
if query.startswith("Unknown"):
256-
return S_ERROR("Problem creating REST query %s" % query)
259+
return S_ERROR(f"Problem creating REST query {query}")
257260

258261
# Get a "delegation" and use the same delegation for all the jobs
259262
delegation = ""
260-
result = self._getDelegation("")
263+
result = self._getDelegation()
261264
if not result["OK"]:
262-
self.log.warn("Could not get a delegation", "For CE %s" % self.ceHost)
265+
self.log.warn(f"Could not get a delegation", f"For CE {self.ceHost}")
263266
self.log.warn("Continue without a delegation")
264267
else:
265-
delegation = "(delegationid=%s)" % result["Value"]
268+
delegation = f"(delegationid={result['Value']})"
266269

267270
# Submit multiple jobs sequentially.
268271
# Bulk submission would not be significantly faster than multiple single submission.
@@ -274,8 +277,8 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
274277
# Get the job into the ARC way
275278
xrslString, diracStamp = self._writeXRSL(executableFile)
276279
xrslString += delegation
277-
self.log.debug("XRSL string submitted", "is %s" % xrslString)
278-
self.log.debug("DIRAC stamp for job", "is %s" % diracStamp)
280+
self.log.debug("XRSL string submitted", f"is {xrslString}")
281+
self.log.debug("DIRAC stamp for job", f"is {diracStamp}")
279282

280283
# Submit the POST request
281284
response = self.session.post(
@@ -288,17 +291,15 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
288291
if not response.ok:
289292
self.log.warn(
290293
"Failed to submit job",
291-
"to CE %s with error - %s - and messages : %s"
292-
% (self.ceHost, response.status_code, response.reason),
294+
f"to CE {self.ceHost} with error - {response.status_code} - and messages : {response.reason}",
293295
)
294296
break
295297

296298
responseJob = response.json()["job"]
297299
if responseJob["status-code"] > "400":
298300
self.log.warn(
299301
"Failed to submit job",
300-
"to CE %s with error - %s - and messages: %s"
301-
% (self.ceHost, responseJob["status-code"], responseJob["reason"]),
302+
f"to CE {self.ceHost} with error - {response['status-code']} - and messages: {responseJob['reason']}",
302303
)
303304
break
304305

@@ -316,7 +317,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
316317
command = "jobs/" + jobID + "/session/" + os.path.basename(executableFile)
317318
query = self._UrlJoin([command])
318319
if query.startswith("Unknown"):
319-
return S_ERROR("Problem creating REST query %s" % query)
320+
return S_ERROR(f"Problem creating REST query {query}")
320321

321322
# Extract the content of the file
322323
with open(executableFile) as f:
@@ -361,7 +362,7 @@ def killJob(self, jobIDList):
361362
params = {"action": "kill"}
362363
query = self._UrlJoin([command])
363364
if query.startswith("Unknown"):
364-
return S_ERROR("Problem creating REST query %s" % query)
365+
return S_ERROR(f"Problem creating REST query {query}")
365366

366367
# Killing jobs should be fast
367368
response = self.session.post(
@@ -374,7 +375,7 @@ def killJob(self, jobIDList):
374375
if not response.ok:
375376
return S_ERROR(f"Failed to kill all these jobs: {response.status_code} {response.reason}")
376377

377-
self.log.debug("Successfully deleted jobs %s " % (response.json()))
378+
self.log.debug("Successfully deleted jobs", response.json())
378379
return S_OK()
379380

380381
#############################################################################
@@ -409,12 +410,12 @@ def getCEStatus(self):
409410
params = {"schema": "glue2"}
410411
query = self._UrlJoin([command])
411412
if query.startswith("Unknown"):
412-
return S_ERROR("Problem creating REST query %s" % query)
413+
return S_ERROR(f"Problem creating REST query {query}")
413414

414415
# Submit the GET request
415416
response = self.session.get(query, headers=self.headers, params=params, timeout=self.arcRESTTimeout)
416417
if not response.ok:
417-
res = S_ERROR("Unknown failure for CE %s. Is the CE down?" % self.ceHost)
418+
res = S_ERROR(f"Unknown failure for CE {self.ceHost}. Is the CE down?")
418419
return res
419420
ceData = response.json()
420421

@@ -447,7 +448,7 @@ def _renewJobs(self, jobList):
447448
# First get the delegation (proxy)
448449
result = self._getDelegation(job)
449450
if not result["OK"]:
450-
self.log.warn("Could not get a delegation from", "Job %s" % job)
451+
self.log.warn("Could not get a delegation from", f"Job {job}")
451452
continue
452453
delegationID = result["Value"]
453454

@@ -456,7 +457,7 @@ def _renewJobs(self, jobList):
456457
params = {"action": "get"}
457458
query = self._UrlJoin([command])
458459
if query.startswith("Unknown"):
459-
return S_ERROR("Problem creating REST query %s" % query)
460+
return S_ERROR(f"Problem creating REST query {query}")
460461

461462
# Submit the POST request to get the proxy
462463
response = self.session.post(query, headers=self.headers, params=params, timeout=self.arcRESTTimeout)
@@ -478,15 +479,15 @@ def _renewJobs(self, jobList):
478479
params = {"action": "renew"}
479480
query = self._UrlJoin([command])
480481
if query.startswith("Unknown"):
481-
return S_ERROR("Problem creating REST query %s" % query)
482+
return S_ERROR(f"Problem creating REST query {query}")
482483
response = self.session.post(
483484
query,
484485
headers=self.headers,
485486
params=params,
486487
timeout=self.arcRESTTimeout,
487488
)
488489
if response.ok:
489-
self.log.debug("Proxy successfully renewed", "for job %s" % job)
490+
self.log.debug("Proxy successfully renewed", f"for job {job}")
490491
else:
491492
self.log.debug(
492493
"Proxy not renewed",
@@ -519,15 +520,15 @@ def getJobStatus(self, jobIDList):
519520
job = j.split(":::")[0]
520521
jobList.append(job)
521522

522-
self.log.debug("Getting status of jobs : %s" % jobList)
523+
self.log.debug("Getting status of jobs:", jobList)
523524
jobsJson = {"job": [{"id": self._DiracToArcID(job)} for job in jobList]}
524525

525526
# Prepare the command
526527
command = "jobs"
527528
params = {"action": "status"}
528529
query = self._UrlJoin([command])
529530
if query.startswith("Unknown"):
530-
return S_ERROR("Problem creating REST query %s" % query)
531+
return S_ERROR(f"Problem creating REST query {query}")
531532

532533
# Submit the POST request to get status of the pilots
533534
response = self.session.post(
@@ -560,7 +561,7 @@ def getJobStatus(self, jobIDList):
560561
# Cancel held jobs so they don't sit in the queue forever
561562
if arcState == "Hold":
562563
jobsToCancel.append(job["id"])
563-
self.log.debug("Killing held job %s" % jobID)
564+
self.log.debug(f"Killing held job {jobID}")
564565

565566
# Renew jobs to be renewed
566567
# Does not work at present - wait for a new release of ARC CEs for this.
@@ -598,29 +599,29 @@ def getJobOutput(self, jobID, _localDir=None):
598599
pilotRef = jobID
599600
stamp = ""
600601
if not stamp:
601-
return S_ERROR("Pilot stamp not defined for %s" % pilotRef)
602+
return S_ERROR(f"Pilot stamp not defined for {pilotRef}")
602603

603604
# Prepare the command
604605
command = "jobs/"
605606
job = self._DiracToArcID(pilotRef)
606607
query = self._UrlJoin([command, job, "session", stamp, ".out"])
607608
if query.startswith("Unknown"):
608-
return S_ERROR("Problem creating REST query %s" % query)
609+
return S_ERROR(f"Problem creating REST query {query}")
609610

610611
# Submit the GET request to retrieve outputs
611612
response = self.session.get(query, headers=self.headers, timeout=self.arcRESTTimeout)
612613
if not response.ok:
613614
self.log.error("Error downloading stdout", f"for {job}: {response.text}")
614-
return S_ERROR("Failed to retrieve at least some output for %s" % jobID)
615+
return S_ERROR(f"Failed to retrieve at least some output for {jobID}")
615616
output = response.text
616617

617618
query = self._UrlJoin([command, job, "session", stamp, ".err"])
618619
if query.startswith("Unknown"):
619-
return S_ERROR("Problem creating REST query %s" % query)
620+
return S_ERROR(f"Problem creating REST query {query}")
620621
response = self.session.get(query, headers=self.headers, timeout=self.arcRESTTimeout)
621622
if not response.ok:
622623
self.log.error("Error downloading stderr", f"for {job}: {response.text}")
623-
return S_ERROR("Failed to retrieve at least some output for %s" % jobID)
624+
return S_ERROR(f"Failed to retrieve at least some output for {jobID}")
624625
error = response.text
625626

626627
return S_OK((output, error))

0 commit comments

Comments
 (0)