@@ -46,8 +46,6 @@ def __init__(self, ceUniqueID):
46
46
self .restVersion = "1.0"
47
47
# Time left before proxy renewal: 3 hours is a good default
48
48
self .proxyTimeLeftBeforeRenewal = 10800
49
- # Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50
- self ._delegationID = None
51
49
# Timeout
52
50
self .timeout = 5.0
53
51
# Request session
@@ -187,6 +185,10 @@ def _checkSession(self):
187
185
self .headers .pop ("Authorization" , None )
188
186
189
187
# Get a proxy: still mandatory, even if tokens are used to authenticate
188
+ if not self .proxy :
189
+ self .log .error ("Proxy not set" )
190
+ return S_ERROR ("Proxy not set" )
191
+
190
192
result = self ._prepareProxy ()
191
193
if not result ["OK" ]:
192
194
self .log .error ("Failed to set up proxy" , result ["Message" ])
@@ -198,7 +200,7 @@ def _checkSession(self):
198
200
return S_OK ()
199
201
200
202
# Attach the proxy to the session, only if the token is unavailable
201
- self .session .cert = Locations . getProxyLocation ()
203
+ self .session .cert = os . environ [ "X509_USER_PROXY" ]
202
204
return S_OK ()
203
205
204
206
#############################################################################
@@ -241,15 +243,8 @@ def __uploadCertificate(self, delegationID, csrContent):
241
243
headers = {"Content-Type" : "x-pem-file" }
242
244
query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
243
245
244
- # Get a proxy and sign the CSR
245
- proxy = X509Chain ()
246
- proxyFile = Locations .getProxyLocation ()
247
- if not proxyFile :
248
- return S_ERROR (f"No proxy available" )
249
- result = proxy .loadProxyFromFile (proxyFile )
250
- if not result ["OK" ]:
251
- return S_ERROR (f"Can't load { proxyFile } : { result ['Message' ]} " )
252
- result = proxy .generateChainFromRequestString (csrContent )
246
+ # Sign the CSR
247
+ result = self .proxy .generateChainFromRequestString (csrContent )
253
248
if not result ["OK" ]:
254
249
self .log .error ("Problem with the Certificate Signing Request:" , result ["Message" ])
255
250
return S_ERROR ("Problem with the Certificate Signing Request" )
@@ -322,6 +317,29 @@ def _getDelegationIDs(self):
322
317
delegationIDs = [delegationContent ["id" ] for delegationContent in delegations ]
323
318
return S_OK (delegationIDs )
324
319
320
+ def _getProxyFromDelegationID (self , delegationID ):
321
+ """Get proxy stored within the"""
322
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
323
+ params = {"action" : "get" }
324
+
325
+ # Submit the POST request to get the delegation
326
+ result = self ._request ("post" , query , params = params )
327
+ if not result ["OK" ]:
328
+ self .log .error ("Issue while interacting with the delegations." , result ["Message" ])
329
+ return S_ERROR ("Issue while interacting with the delegations" )
330
+ response = result ["Value" ]
331
+
332
+ proxyContent = response .text
333
+ proxy = X509Chain ()
334
+ result = proxy .loadChainFromString (proxyContent )
335
+ if not result ["OK" ]:
336
+ self .log .error (
337
+ "Issue while trying to load proxy content from delegation" , f"{ delegationID } : { result ['Message' ]} "
338
+ )
339
+ return S_ERROR ("Issue while trying to load proxy content from delegation" )
340
+
341
+ return S_OK (proxy )
342
+
325
343
#############################################################################
326
344
327
345
def _getArcJobID (self , executableFile , inputs , outputs , delegation ):
@@ -406,18 +424,33 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
406
424
if not result ["OK" ]:
407
425
self .log .error ("Could not get delegation IDs." , result ["Message" ])
408
426
return S_ERROR ("Could not get delegation IDs" )
409
-
410
427
delegationIDs = result ["Value" ]
411
- if not delegationIDs :
428
+
429
+ # Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
430
+ currentDelegationID = None
431
+ proxyGroup = self .proxy .getDIRACGroup ()
432
+ for delegationID in delegationIDs :
433
+ # Get the proxy attached to the delegationID
434
+ result = self ._getProxyFromDelegationID (delegationID )
435
+ if not result ["OK" ]:
436
+ return result
437
+ proxy = result ["Value" ]
438
+
439
+ if proxy .getDIRACGroup () != proxyGroup :
440
+ continue
441
+
442
+ # If we are here, we have found the right delegationID to use
443
+ currentDelegationID = delegationID
444
+
445
+ if not currentDelegationID :
412
446
# No existing delegation, we need to prepare one
413
447
result = self ._prepareDelegation ()
414
448
if not result ["OK" ]:
415
449
self .log .warn ("Could not get a new delegation" , f"for CE { self .ceHost } " )
416
450
return S_ERROR ("Could not get a new delegation" )
417
- self ._delegationID = result ["Value" ]
418
- else :
419
- self ._delegationID = delegationIDs [0 ]
420
- delegation = f"\n (delegationid={ self ._delegationID } )"
451
+ currentDelegationID = result ["Value" ]
452
+
453
+ delegation = f"\n (delegationid={ currentDelegationID } )"
421
454
422
455
if not inputs :
423
456
inputs = []
@@ -599,33 +632,36 @@ def getCEStatus(self):
599
632
600
633
#############################################################################
601
634
602
- def _renewDelegation (self ):
603
- """Renew the delegations"""
635
+ def _renewDelegation (self , delegationID ):
636
+ """Renew the delegation
637
+
638
+ :params delegationID: delegation ID to renew
639
+ """
604
640
# Prepare the command
605
641
params = {"action" : "get" }
606
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
642
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
607
643
608
644
# Submit the POST request to get the proxy
609
645
result = self ._request ("post" , query , params = params )
610
646
if not result ["OK" ]:
611
- self .log .error ("Could not get a proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
612
- return S_ERROR (f"Could not get a proxy for delegation { self . _delegationID } " )
647
+ self .log .error ("Could not get a proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
648
+ return S_ERROR (f"Could not get a proxy for delegation { delegationID } " )
613
649
response = result ["Value" ]
614
650
615
651
proxy = X509Chain ()
616
652
result = proxy .loadChainFromString (response .text )
617
653
if not result ["OK" ]:
618
- self .log .error ("Could not load proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
619
- return S_ERROR (f"Could not load proxy for delegation { self . _delegationID } " )
654
+ self .log .error ("Could not load proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
655
+ return S_ERROR (f"Could not load proxy for delegation { delegationID } " )
620
656
621
657
# Now test and renew the proxy
622
658
result = proxy .getRemainingSecs ()
623
659
if not result ["OK" ]:
624
660
self .log .error (
625
661
"Could not get remaining time from the proxy for" ,
626
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
662
+ f"delegation { delegationID } : { result ['Message' ]} " ,
627
663
)
628
- return S_ERROR (f"Could not get remaining time from the proxy for delegation { self . _delegationID } " )
664
+ return S_ERROR (f"Could not get remaining time from the proxy for delegation { delegationID } " )
629
665
timeLeft = result ["Value" ]
630
666
631
667
if timeLeft >= self .proxyTimeLeftBeforeRenewal :
@@ -634,31 +670,31 @@ def _renewDelegation(self):
634
670
635
671
self .log .verbose (
636
672
"Renewing delegation" ,
637
- f"{ self . _delegationID } whose proxy expires at { timeLeft } " ,
673
+ f"{ delegationID } whose proxy expires at { timeLeft } " ,
638
674
)
639
675
# Proxy needs to be renewed - try to renew it
640
676
# First, get a new CSR from the delegation
641
677
params = {"action" : "renew" }
642
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
678
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
643
679
result = self ._request ("post" , query , params = params )
644
680
if not result ["OK" ]:
645
681
self .log .error (
646
682
"Proxy not renewed, failed to get CSR" ,
647
- f"for delegation { self . _delegationID } " ,
683
+ f"for delegation { delegationID } " ,
648
684
)
649
- return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { self . _delegationID } " )
685
+ return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { delegationID } " )
650
686
response = result ["Value" ]
651
687
652
688
# Then, sign and upload the certificate
653
- result = self .__uploadCertificate (self . _delegationID , response .text )
689
+ result = self .__uploadCertificate (delegationID , response .text )
654
690
if not result ["OK" ]:
655
691
self .log .error (
656
692
"Proxy not renewed, failed to send renewed proxy" ,
657
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
693
+ f"delegation { delegationID } : { result ['Message' ]} " ,
658
694
)
659
- return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { self . _delegationID } " )
695
+ return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { delegationID } " )
660
696
661
- self .log .verbose ("Proxy successfully renewed" , f"for delegation { self . _delegationID } " )
697
+ self .log .verbose ("Proxy successfully renewed" , f"for delegation { delegationID } " )
662
698
663
699
return S_OK ()
664
700
@@ -714,12 +750,16 @@ def getJobStatus(self, jobIDList):
714
750
jobsToCancel .append (arcJob ["id" ])
715
751
self .log .debug (f"Killing held job { jobID } " )
716
752
717
- # Renew delegation to renew the proxies of the jobs
718
- if self ._delegationID :
719
- result = self ._renewDelegation ()
753
+ # Renew delegations to renew the proxies of the jobs
754
+ result = self ._getDelegationIDs ()
755
+ if not result ["OK" ]:
756
+ return result
757
+ delegationIDs = result ["Value" ]
758
+ for delegationID in delegationIDs :
759
+ result = self ._renewDelegation (delegationID )
720
760
if not result ["OK" ]:
721
761
# Only log here as we still want to return statuses
722
- self .log .warn ("Failed to renew delegation" , f"{ self . _delegationID } : { result ['Message' ]} " )
762
+ self .log .warn ("Failed to renew delegation" , f"{ delegationID } : { result ['Message' ]} " )
723
763
724
764
# Kill held jobs
725
765
if jobsToCancel :
0 commit comments