@@ -46,11 +46,16 @@ def __init__(self, ceUniqueID):
46
46
self .restVersion = "1.0"
47
47
# Time left before proxy renewal: 3 hours is a good default
48
48
self .proxyTimeLeftBeforeRenewal = 10800
49
+ # Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50
+ self ._delegationID = None
49
51
# Timeout
50
52
self .timeout = 5.0
51
53
# Request session
52
54
self .session = None
53
- self .headers = {}
55
+ self .headers = {
56
+ "Accept" : "application/json" ,
57
+ "Content-Type" : "application/json" ,
58
+ }
54
59
# URL used to communicate with the REST interface
55
60
self .base_url = ""
56
61
@@ -80,13 +85,6 @@ def _reset(self):
80
85
# Set up the request framework
81
86
self .session = requests .Session ()
82
87
self .session .verify = Locations .getCAsLocation ()
83
- self .headers = {
84
- "Accept" : "application/json" ,
85
- "Content-Type" : "application/json" ,
86
- }
87
- # Attach the token to the headers if present
88
- if os .environ .get ("BEARER_TOKEN" ):
89
- self .headers ["Authorization" ] = "Bearer " + os .environ ["BEARER_TOKEN" ]
90
88
91
89
return S_OK ()
92
90
@@ -176,11 +174,22 @@ def _checkSession(self):
176
174
if not self .session :
177
175
return S_ERROR ("REST interface not initialised." )
178
176
179
- # Get a proxy
177
+ # Reinitialize the authentication parameters
178
+ self .session .cert = None
179
+ self .headers .pop ("Authorization" , None )
180
+
181
+ # Get a proxy: still mandatory, even if tokens are used to authenticate
180
182
result = self ._prepareProxy ()
181
183
if not result ["OK" ]:
182
184
self .log .error ("Failed to set up proxy" , result ["Message" ])
183
185
return result
186
+
187
+ if self .token :
188
+ # Attach the token to the headers if present
189
+ self .headers ["Authorization" ] = "Bearer " + self .token ["access_token" ]
190
+ return S_OK ()
191
+
192
+ # Attach the proxy to the session, only if the token is unavailable
184
193
self .session .cert = Locations .getProxyLocation ()
185
194
return S_OK ()
186
195
@@ -226,11 +235,15 @@ def __uploadCertificate(self, delegationID, csrContent):
226
235
227
236
# Get a proxy and sign the CSR
228
237
proxy = X509Chain ()
229
- result = proxy .loadProxyFromFile (self .session .cert )
238
+ proxyFile = Locations .getProxyLocation ()
239
+ if not proxyFile :
240
+ return S_ERROR (f"No proxy available" )
241
+ result = proxy .loadProxyFromFile (proxyFile )
230
242
if not result ["OK" ]:
231
- return S_ERROR (f"Can't load { self . session . cert } : { result ['Message' ]} " )
243
+ return S_ERROR (f"Can't load { proxyFile } : { result ['Message' ]} " )
232
244
result = proxy .generateChainFromRequestString (csrContent )
233
245
if not result ["OK" ]:
246
+ self .log .error ("Problem with the Certificate Signing Request:" , result ["Message" ])
234
247
return S_ERROR ("Problem with the Certificate Signing Request" )
235
248
236
249
# Submit the certificate
@@ -262,38 +275,44 @@ def _prepareDelegation(self):
262
275
return result
263
276
return S_OK (delegationID )
264
277
265
- def _getDelegationID (self , arcJobID ):
266
- """Query and return the delegation ID of the given job .
278
+ def _getDelegationIDs (self ):
279
+ """Query and return the delegation IDs .
267
280
268
- This happens when the call is from self.renewJobs. This function needs to know the
269
- delegation associated to the job
281
+ This happens when the call is from self.renewDelegations.
270
282
More info at
271
283
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#jobs-management
272
284
https://www.nordugrid.org/arc/arc6/tech/rest/rest.html#delegations-management
273
285
274
- :param str jobID: ARC job ID
275
- :return: delegation ID
286
+ :return: list of delegation IDs
276
287
"""
277
- params = {"action" : "delegations" }
278
- query = self ._urlJoin ("jobs" )
288
+ query = self ._urlJoin ("delegations" )
279
289
280
290
# Submit the POST request to get the delegation
281
- jobsJson = {"job" : [{"id" : arcJobID }]}
282
- result = self ._request ("post" , query , params = params , data = json .dumps (jobsJson ))
291
+ result = self ._request ("get" , query )
283
292
if not result ["OK" ]:
284
- self .log .error ("Issue while interacting with the delegation ." , result ["Message" ])
285
- return S_ERROR ("Issue while interacting with the delegation " )
293
+ self .log .error ("Issue while interacting with the delegations ." , result ["Message" ])
294
+ return S_ERROR ("Issue while interacting with the delegations " )
286
295
response = result ["Value" ]
287
296
288
- responseDelegation = response .json ()
289
- if "delegation_id" not in responseDelegation ["job" ]:
290
- return S_ERROR (f"Cannot find the Delegation ID for Job { arcJobID } " )
297
+ # If there is no delegation, response.json is expected to return an exception
298
+ try :
299
+ responseDelegation = response .json ()
300
+ except requests .JSONDecodeError :
301
+ return S_OK ([])
302
+
303
+ # This is not expected
304
+ if "delegation" not in responseDelegation :
305
+ return S_OK ([])
306
+
307
+ # If there is a single delegationID, then we get an str instead of a list
308
+ # Not specified in the documentation
309
+ delegations = responseDelegation ["delegation" ]
310
+ if isinstance (delegations , dict ):
311
+ delegations = [delegations ]
291
312
292
- delegationIDs = responseDelegation ["job" ]["delegation_id" ]
293
- # Documentation says "Array", but a single string is returned if there is only one
294
- if not isinstance (delegationIDs , list ):
295
- delegationIDs = [delegationIDs ]
296
- return S_OK (delegationIDs [0 ])
313
+ # responseDelegation should be {'delegation': [{'id': <delegationID>}, ...]}
314
+ delegationIDs = [delegationContent ["id" ] for delegationContent in delegations ]
315
+ return S_OK (delegationIDs )
297
316
298
317
#############################################################################
299
318
@@ -374,14 +393,23 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
374
393
375
394
self .log .verbose (f"Executable file path: { executableFile } " )
376
395
377
- # Get a "delegation" and use the same delegation for all the jobs
378
- delegation = ""
379
- result = self ._prepareDelegation ()
396
+ # Get a delegation and use the same delegation for all the jobs
397
+ result = self ._getDelegationIDs ()
380
398
if not result ["OK" ]:
381
- self .log .warn ("Could not get a delegation" , f"For CE { self .ceHost } " )
382
- self .log .warn ("Continue without a delegation" )
399
+ self .log .error ("Could not get delegation IDs." , result ["Message" ])
400
+ return S_ERROR ("Could not get delegation IDs" )
401
+
402
+ delegationIDs = result ["Value" ]
403
+ if not delegationIDs :
404
+ # No existing delegation, we need to prepare one
405
+ result = self ._prepareDelegation ()
406
+ if not result ["OK" ]:
407
+ self .log .warn ("Could not get a new delegation" , f"for CE { self .ceHost } " )
408
+ return S_ERROR ("Could not get a new delegation" )
409
+ self ._delegationID = result ["Value" ]
383
410
else :
384
- delegation = f"\n (delegationid={ result ['Value' ]} )"
411
+ self ._delegationID = delegationIDs [0 ]
412
+ delegation = f"\n (delegationid={ self ._delegationID } )"
385
413
386
414
if not inputs :
387
415
inputs = []
@@ -563,73 +591,66 @@ def getCEStatus(self):
563
591
564
592
#############################################################################
565
593
566
- def _renewJobs (self , arcJobList ):
567
- """Written for the REST interface - jobList is already in the ARC format
568
-
569
- :param list arcJobList: list of ARC Job ID
570
- """
571
- # Renew the jobs
572
- for arcJob in arcJobList :
573
- # First get the delegation (proxy)
574
- result = self ._getDelegationID (arcJob )
575
- if not result ["OK" ]:
576
- self .log .warn ("Could not get a delegation from" , f"Job { arcJob } " )
577
- continue
578
- delegationID = result ["Value" ]
579
-
580
- # Prepare the command
581
- params = {"action" : "get" }
582
- query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
594
+ def _renewDelegation (self ):
595
+ """Renew the delegations"""
596
+ # Prepare the command
597
+ params = {"action" : "get" }
598
+ query = self ._urlJoin (os .path .join ("delegations" , self ._delegationID ))
583
599
584
- # Submit the POST request to get the proxy
585
- result = self ._request ("post" , query , params = params )
586
- if not result ["OK" ]:
587
- self .log .debug ("Could not get a proxy for" , f"job { arcJob } : { result ['Message' ]} " )
588
- continue
589
- response = result ["Value" ]
600
+ # Submit the POST request to get the proxy
601
+ result = self ._request ("post" , query , params = params )
602
+ if not result ["OK" ]:
603
+ self .log .error ("Could not get a proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
604
+ return S_ERROR ( f"Could not get a proxy for delegation { self . _delegationID } " )
605
+ response = result ["Value" ]
590
606
591
- proxy = X509Chain ()
592
- result = proxy .loadChainFromString (response .text )
593
- if not result ["OK" ]:
594
- continue
607
+ proxy = X509Chain ()
608
+ result = proxy .loadChainFromString (response .text )
609
+ if not result ["OK" ]:
610
+ self .log .error ("Could not load proxy for" , f"delegation { self ._delegationID } : { result ['Message' ]} " )
611
+ return S_ERROR (f"Could not load proxy for delegation { self ._delegationID } " )
595
612
596
- # Now test and renew the proxy
597
- result = proxy .getRemainingSecs ()
598
- if not result ["OK" ]:
599
- continue
600
- timeLeft = result ["Value" ]
613
+ # Now test and renew the proxy
614
+ result = proxy .getRemainingSecs ()
615
+ if not result ["OK" ]:
616
+ self .log .error (
617
+ "Could not get remaining time from the proxy for" ,
618
+ f"delegation { self ._delegationID } : { result ['Message' ]} " ,
619
+ )
620
+ return S_ERROR (f"Could not get remaining time from the proxy for delegation { self ._delegationID } " )
621
+ timeLeft = result ["Value" ]
601
622
602
- if timeLeft >= self .proxyTimeLeftBeforeRenewal :
603
- # No need to renew. Proxy is long enough
604
- continue
623
+ if timeLeft >= self .proxyTimeLeftBeforeRenewal :
624
+ # No need to renew. Proxy is long enough
625
+ return S_OK ()
605
626
606
- self .log .debug (
607
- "Renewing proxy for job" ,
608
- f"{ arcJob } whose proxy expires at { timeLeft } " ,
627
+ self .log .verbose (
628
+ "Renewing delegation" ,
629
+ f"{ self ._delegationID } whose proxy expires at { timeLeft } " ,
630
+ )
631
+ # Proxy needs to be renewed - try to renew it
632
+ # First, get a new CSR from the delegation
633
+ params = {"action" : "renew" }
634
+ query = self ._urlJoin (os .path .join ("delegations" , self ._delegationID ))
635
+ result = self ._request ("post" , query , params = params )
636
+ if not result ["OK" ]:
637
+ self .log .error (
638
+ "Proxy not renewed, failed to get CSR" ,
639
+ f"for delegation { self ._delegationID } " ,
609
640
)
610
- # Proxy needs to be renewed - try to renew it
611
- # First, get a new CSR from the delegation
612
- params = {"action" : "renew" }
613
- query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
614
- result = self ._request ("post" , query , params = params )
641
+ return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { self ._delegationID } " )
642
+ response = result ["Value" ]
615
643
616
- if not response .ok :
617
- self .log .debug (
618
- "Proxy not renewed, failed to get CSR" ,
619
- f"for job { arcJob } with delegation { delegationID } " ,
620
- )
621
- continue
622
-
623
- # Then, sign and upload the certificate
624
- result = self .__uploadCertificate (delegationID , response .text )
625
- if not result ["OK" ]:
626
- self .log .debug (
627
- "Proxy not renewed, failed to send renewed proxy" ,
628
- f"for job { arcJob } with delegation { delegationID } : { result ['Message' ]} " ,
629
- )
630
- continue
644
+ # Then, sign and upload the certificate
645
+ result = self .__uploadCertificate (self ._delegationID , response .text )
646
+ if not result ["OK" ]:
647
+ self .log .error (
648
+ "Proxy not renewed, failed to send renewed proxy" ,
649
+ f"delegation { self ._delegationID } : { result ['Message' ]} " ,
650
+ )
651
+ return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { self ._delegationID } " )
631
652
632
- self .log .debug ("Proxy successfully renewed" , f"for job { arcJob } " )
653
+ self .log .verbose ("Proxy successfully renewed" , f"for delegation { self . _delegationID } " )
633
654
634
655
return S_OK ()
635
656
@@ -665,7 +686,6 @@ def getJobStatus(self, jobIDList):
665
686
response = result ["Value" ]
666
687
667
688
resultDict = {}
668
- jobsToRenew = []
669
689
jobsToCancel = []
670
690
671
691
# A single job is returned in a dict, while multiple jobs are returned in a list
@@ -681,23 +701,19 @@ def getJobStatus(self, jobIDList):
681
701
self .log .debug ("REST ARC status" , f"for job { jobID } is { arcState } " )
682
702
resultDict [jobID ] = self .mapStates [arcState ]
683
703
684
- # Renew proxy only of jobs which are running or queuing
685
- if arcState in ("Running" , "Queuing" ):
686
- jobsToRenew .append (arcJob ["id" ])
687
704
# Cancel held jobs so they don't sit in the queue forever
688
705
if arcState == "Hold" :
689
706
jobsToCancel .append (arcJob ["id" ])
690
707
self .log .debug (f"Killing held job { jobID } " )
691
708
692
- # Renew jobs to be renewed
693
- # Does not work at present - wait for a new release of ARC CEs for this.
694
- if jobsToRenew :
695
- result = self ._renewJobs (jobsToRenew )
709
+ # Renew delegation to renew the proxies of the jobs
710
+ if self ._delegationID :
711
+ result = self ._renewDelegation ()
696
712
if not result ["OK" ]:
697
713
# Only log here as we still want to return statuses
698
- self .log .warn ("Failed to renew job proxies: " , result [" Message" ] )
714
+ self .log .warn ("Failed to renew delegation " , f" { self . _delegationID } : { result [' Message' ] } " )
699
715
700
- # Kill jobs to be killed
716
+ # Kill held jobs
701
717
if jobsToCancel :
702
718
result = self ._killJob (jobsToCancel )
703
719
if not result ["OK" ]:
0 commit comments