@@ -57,7 +57,7 @@ def _reset(self):
57
57
filled with CEDefaults only at the time this class is initialised for the given CE
58
58
"""
59
59
super ()._reset ()
60
- self .log .debug ("Testing if the REST interface is available" , "for %s" % self .ceName )
60
+ self .log .debug ("Testing if the REST interface is available" , f "for { self .ceName } " )
61
61
62
62
# Get options from the ceParameters dictionary
63
63
self .port = self .ceParameters .get ("Port" , self .port )
@@ -119,7 +119,7 @@ def _DiracToArcID(self, diracJobID):
119
119
def _UrlJoin (self , words ):
120
120
# Return a full URL. The base_url is already defined.
121
121
if not isinstance (words , list ):
122
- return "Unknown input : %s" % words
122
+ return f "Unknown input : { words } "
123
123
b_url = self .base_url .strip ()
124
124
q_url = b_url if b_url .endswith ("/" ) else b_url + "/"
125
125
for word in words :
@@ -130,7 +130,7 @@ def _UrlJoin(self, words):
130
130
131
131
#############################################################################
132
132
133
- def _getDelegation (self , jobID ):
133
+ def _getDelegation (self , jobID = None ):
134
134
"""Here we handle the delegations (Nordugrid language) = Proxy (Dirac language)
135
135
136
136
If the jobID is empty:
@@ -155,18 +155,18 @@ def _getDelegation(self, jobID):
155
155
"""
156
156
# Create a delegation
157
157
if not jobID :
158
- # Prepare the command
158
+ # Prepare the command: starts a new delegation process
159
159
command = "delegations"
160
160
params = {"action" : "new" }
161
161
query = self ._UrlJoin ([command ])
162
162
if query .startswith ("Unknown" ):
163
- return S_ERROR ("Problem creating REST query %s" % query )
163
+ return S_ERROR (f "Problem creating REST query { query } " )
164
164
165
165
# Get a proxy
166
166
proxy = X509Chain ()
167
167
result = proxy .loadProxyFromFile (self .session .cert )
168
168
if not result ["OK" ]:
169
- return S_ERROR ("Can't load {}: {} " . format ( self .session .cert , result [" Message" ]) )
169
+ return S_ERROR (f "Can't load { self .session .cert } : { result [' Message' ] } " )
170
170
171
171
# Submit a POST request
172
172
response = self .session .post (
@@ -176,30 +176,33 @@ def _getDelegation(self, jobID):
176
176
params = params ,
177
177
timeout = self .arcRESTTimeout ,
178
178
)
179
- delegationID = ""
180
- if response .ok :
181
- delegationURL = response .headers .get ("location" , "" )
182
- if delegationURL :
183
- delegationID = delegationURL .split ("new/" )[- 1 ]
184
- # Prepare the command
185
- command = "delegations/" + delegationID
186
- query = self ._UrlJoin ([command ])
187
- if query .startswith ("Unknown" ):
188
- return S_ERROR ("Problem creating REST query %s" % query )
189
-
190
- # Submit the proxy
191
- response = self .session .put (
192
- query ,
193
- data = response .text ,
194
- headers = self .headers ,
195
- timeout = self .arcRESTTimeout ,
196
- )
197
- if not response .ok :
198
- self .log .warn (
199
- "Issue while interacting with the delegation" ,
200
- f"{ response .status_code } - { response .reason } " ,
201
- )
202
- delegationID = ""
179
+ if not response .ok :
180
+ return S_ERROR (f"Failed to get a delegation ID: { response .status_code } { response .reason } " )
181
+
182
+ # Extract delegationID from response
183
+ delegationURL = response .headers .get ("location" , "" )
184
+ if not delegationURL :
185
+ return S_ERROR (f"Cannot extract delegation ID from the response: { response .headers } " )
186
+
187
+ delegationID = delegationURL .split ("new/" )[- 1 ]
188
+
189
+ # Prepare the command:
190
+ command = "delegations/" + delegationID
191
+ query = self ._UrlJoin ([command ])
192
+ if query .startswith ("Unknown" ):
193
+ return S_ERROR (f"Problem creating REST query { query } " )
194
+
195
+ # Submit the proxy
196
+ response = self .session .put (
197
+ query ,
198
+ data = response .text ,
199
+ headers = self .headers ,
200
+ timeout = self .arcRESTTimeout ,
201
+ )
202
+ if not response .ok :
203
+ return S_ERROR (
204
+ f"Issue while interacting with the delegation { response .status_code } - { response .reason } "
205
+ )
203
206
204
207
return S_OK (delegationID )
205
208
@@ -210,7 +213,7 @@ def _getDelegation(self, jobID):
210
213
params = {"action" : "delegations" }
211
214
query = self ._UrlJoin ([command ])
212
215
if query .startswith ("Unknown" ):
213
- return S_ERROR ("Problem creating REST query %s" % query )
216
+ return S_ERROR (f "Problem creating REST query { query } " )
214
217
215
218
# Submit the POST request to get the delegation
216
219
jobsJson = {"job" : [{"id" : jobID }]}
@@ -236,7 +239,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
236
239
"""
237
240
if not self .session :
238
241
return S_ERROR ("REST interface not initialised. Cannot submit jobs." )
239
- self .log .verbose ("Executable file path: %s" % executableFile )
242
+ self .log .verbose ("Executable file path:" , executableFile )
240
243
241
244
# Get the name of the queue: nordugrid-<batchsystem>-<queue>
242
245
self .arcQueue = self .queue .split ("-" , 2 )[2 ]
@@ -253,16 +256,16 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
253
256
params = {"action" : "new" }
254
257
query = self ._UrlJoin ([command ])
255
258
if query .startswith ("Unknown" ):
256
- return S_ERROR ("Problem creating REST query %s" % query )
259
+ return S_ERROR (f "Problem creating REST query { query } " )
257
260
258
261
# Get a "delegation" and use the same delegation for all the jobs
259
262
delegation = ""
260
- result = self ._getDelegation ("" )
263
+ result = self ._getDelegation ()
261
264
if not result ["OK" ]:
262
- self .log .warn ("Could not get a delegation" , "For CE %s" % self .ceHost )
265
+ self .log .warn (f "Could not get a delegation" , f "For CE { self .ceHost } " )
263
266
self .log .warn ("Continue without a delegation" )
264
267
else :
265
- delegation = "(delegationid=%s)" % result [" Value" ]
268
+ delegation = f "(delegationid={ result [' Value' ] } )"
266
269
267
270
# Submit multiple jobs sequentially.
268
271
# Bulk submission would not be significantly faster than multiple single submission.
@@ -274,8 +277,8 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
274
277
# Get the job into the ARC way
275
278
xrslString , diracStamp = self ._writeXRSL (executableFile )
276
279
xrslString += delegation
277
- self .log .debug ("XRSL string submitted" , "is %s" % xrslString )
278
- self .log .debug ("DIRAC stamp for job" , "is %s" % diracStamp )
280
+ self .log .debug ("XRSL string submitted" , f "is { xrslString } " )
281
+ self .log .debug ("DIRAC stamp for job" , f "is { diracStamp } " )
279
282
280
283
# Submit the POST request
281
284
response = self .session .post (
@@ -288,17 +291,15 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
288
291
if not response .ok :
289
292
self .log .warn (
290
293
"Failed to submit job" ,
291
- "to CE %s with error - %s - and messages : %s"
292
- % (self .ceHost , response .status_code , response .reason ),
294
+ f"to CE { self .ceHost } with error - { response .status_code } - and messages : { response .reason } " ,
293
295
)
294
296
break
295
297
296
298
responseJob = response .json ()["job" ]
297
299
if responseJob ["status-code" ] > "400" :
298
300
self .log .warn (
299
301
"Failed to submit job" ,
300
- "to CE %s with error - %s - and messages: %s"
301
- % (self .ceHost , responseJob ["status-code" ], responseJob ["reason" ]),
302
+ f"to CE { self .ceHost } with error - { response ['status-code' ]} - and messages: { responseJob ['reason' ]} " ,
302
303
)
303
304
break
304
305
@@ -316,7 +317,7 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1):
316
317
command = "jobs/" + jobID + "/session/" + os .path .basename (executableFile )
317
318
query = self ._UrlJoin ([command ])
318
319
if query .startswith ("Unknown" ):
319
- return S_ERROR ("Problem creating REST query %s" % query )
320
+ return S_ERROR (f "Problem creating REST query { query } " )
320
321
321
322
# Extract the content of the file
322
323
with open (executableFile ) as f :
@@ -361,7 +362,7 @@ def killJob(self, jobIDList):
361
362
params = {"action" : "kill" }
362
363
query = self ._UrlJoin ([command ])
363
364
if query .startswith ("Unknown" ):
364
- return S_ERROR ("Problem creating REST query %s" % query )
365
+ return S_ERROR (f "Problem creating REST query { query } " )
365
366
366
367
# Killing jobs should be fast
367
368
response = self .session .post (
@@ -374,7 +375,7 @@ def killJob(self, jobIDList):
374
375
if not response .ok :
375
376
return S_ERROR (f"Failed to kill all these jobs: { response .status_code } { response .reason } " )
376
377
377
- self .log .debug ("Successfully deleted jobs %s " % ( response .json () ))
378
+ self .log .debug ("Successfully deleted jobs" , response .json ())
378
379
return S_OK ()
379
380
380
381
#############################################################################
@@ -409,12 +410,12 @@ def getCEStatus(self):
409
410
params = {"schema" : "glue2" }
410
411
query = self ._UrlJoin ([command ])
411
412
if query .startswith ("Unknown" ):
412
- return S_ERROR ("Problem creating REST query %s" % query )
413
+ return S_ERROR (f "Problem creating REST query { query } " )
413
414
414
415
# Submit the GET request
415
416
response = self .session .get (query , headers = self .headers , params = params , timeout = self .arcRESTTimeout )
416
417
if not response .ok :
417
- res = S_ERROR ("Unknown failure for CE %s. Is the CE down?" % self . ceHost )
418
+ res = S_ERROR (f "Unknown failure for CE { self . ceHost } . Is the CE down?" )
418
419
return res
419
420
ceData = response .json ()
420
421
@@ -447,7 +448,7 @@ def _renewJobs(self, jobList):
447
448
# First get the delegation (proxy)
448
449
result = self ._getDelegation (job )
449
450
if not result ["OK" ]:
450
- self .log .warn ("Could not get a delegation from" , "Job %s" % job )
451
+ self .log .warn ("Could not get a delegation from" , f "Job { job } " )
451
452
continue
452
453
delegationID = result ["Value" ]
453
454
@@ -456,7 +457,7 @@ def _renewJobs(self, jobList):
456
457
params = {"action" : "get" }
457
458
query = self ._UrlJoin ([command ])
458
459
if query .startswith ("Unknown" ):
459
- return S_ERROR ("Problem creating REST query %s" % query )
460
+ return S_ERROR (f "Problem creating REST query { query } " )
460
461
461
462
# Submit the POST request to get the proxy
462
463
response = self .session .post (query , headers = self .headers , params = params , timeout = self .arcRESTTimeout )
@@ -478,15 +479,15 @@ def _renewJobs(self, jobList):
478
479
params = {"action" : "renew" }
479
480
query = self ._UrlJoin ([command ])
480
481
if query .startswith ("Unknown" ):
481
- return S_ERROR ("Problem creating REST query %s" % query )
482
+ return S_ERROR (f "Problem creating REST query { query } " )
482
483
response = self .session .post (
483
484
query ,
484
485
headers = self .headers ,
485
486
params = params ,
486
487
timeout = self .arcRESTTimeout ,
487
488
)
488
489
if response .ok :
489
- self .log .debug ("Proxy successfully renewed" , "for job %s" % job )
490
+ self .log .debug ("Proxy successfully renewed" , f "for job { job } " )
490
491
else :
491
492
self .log .debug (
492
493
"Proxy not renewed" ,
@@ -519,15 +520,15 @@ def getJobStatus(self, jobIDList):
519
520
job = j .split (":::" )[0 ]
520
521
jobList .append (job )
521
522
522
- self .log .debug ("Getting status of jobs : %s" % jobList )
523
+ self .log .debug ("Getting status of jobs:" , jobList )
523
524
jobsJson = {"job" : [{"id" : self ._DiracToArcID (job )} for job in jobList ]}
524
525
525
526
# Prepare the command
526
527
command = "jobs"
527
528
params = {"action" : "status" }
528
529
query = self ._UrlJoin ([command ])
529
530
if query .startswith ("Unknown" ):
530
- return S_ERROR ("Problem creating REST query %s" % query )
531
+ return S_ERROR (f "Problem creating REST query { query } " )
531
532
532
533
# Submit the POST request to get status of the pilots
533
534
response = self .session .post (
@@ -560,7 +561,7 @@ def getJobStatus(self, jobIDList):
560
561
# Cancel held jobs so they don't sit in the queue forever
561
562
if arcState == "Hold" :
562
563
jobsToCancel .append (job ["id" ])
563
- self .log .debug ("Killing held job %s" % jobID )
564
+ self .log .debug (f "Killing held job { jobID } " )
564
565
565
566
# Renew jobs to be renewed
566
567
# Does not work at present - wait for a new release of ARC CEs for this.
@@ -598,29 +599,29 @@ def getJobOutput(self, jobID, _localDir=None):
598
599
pilotRef = jobID
599
600
stamp = ""
600
601
if not stamp :
601
- return S_ERROR ("Pilot stamp not defined for %s" % pilotRef )
602
+ return S_ERROR (f "Pilot stamp not defined for { pilotRef } " )
602
603
603
604
# Prepare the command
604
605
command = "jobs/"
605
606
job = self ._DiracToArcID (pilotRef )
606
607
query = self ._UrlJoin ([command , job , "session" , stamp , ".out" ])
607
608
if query .startswith ("Unknown" ):
608
- return S_ERROR ("Problem creating REST query %s" % query )
609
+ return S_ERROR (f "Problem creating REST query { query } " )
609
610
610
611
# Submit the GET request to retrieve outputs
611
612
response = self .session .get (query , headers = self .headers , timeout = self .arcRESTTimeout )
612
613
if not response .ok :
613
614
self .log .error ("Error downloading stdout" , f"for { job } : { response .text } " )
614
- return S_ERROR ("Failed to retrieve at least some output for %s" % jobID )
615
+ return S_ERROR (f "Failed to retrieve at least some output for { jobID } " )
615
616
output = response .text
616
617
617
618
query = self ._UrlJoin ([command , job , "session" , stamp , ".err" ])
618
619
if query .startswith ("Unknown" ):
619
- return S_ERROR ("Problem creating REST query %s" % query )
620
+ return S_ERROR (f "Problem creating REST query { query } " )
620
621
response = self .session .get (query , headers = self .headers , timeout = self .arcRESTTimeout )
621
622
if not response .ok :
622
623
self .log .error ("Error downloading stderr" , f"for { job } : { response .text } " )
623
- return S_ERROR ("Failed to retrieve at least some output for %s" % jobID )
624
+ return S_ERROR (f "Failed to retrieve at least some output for { jobID } " )
624
625
error = response .text
625
626
626
627
return S_OK ((output , error ))
0 commit comments