@@ -46,8 +46,6 @@ def __init__(self, ceUniqueID):
46
46
self .restVersion = "1.0"
47
47
# Time left before proxy renewal: 3 hours is a good default
48
48
self .proxyTimeLeftBeforeRenewal = 10800
49
- # Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50
- self ._delegationID = None
51
49
# Timeout
52
50
self .timeout = 5.0
53
51
# Request session
@@ -187,6 +185,10 @@ def _checkSession(self):
187
185
self .headers .pop ("Authorization" , None )
188
186
189
187
# Get a proxy: still mandatory, even if tokens are used to authenticate
188
+ if not self .proxy :
189
+ self .log .error ("Proxy not set" )
190
+ return S_ERROR ("Proxy not set" )
191
+
190
192
result = self ._prepareProxy ()
191
193
if not result ["OK" ]:
192
194
self .log .error ("Failed to set up proxy" , result ["Message" ])
@@ -198,7 +200,7 @@ def _checkSession(self):
198
200
return S_OK ()
199
201
200
202
# Attach the proxy to the session, only if the token is unavailable
201
- self .session .cert = Locations . getProxyLocation ()
203
+ self .session .cert = os . environ [ "X509_USER_PROXY" ]
202
204
return S_OK ()
203
205
204
206
#############################################################################
@@ -241,15 +243,8 @@ def __uploadCertificate(self, delegationID, csrContent):
241
243
headers = {"Content-Type" : "x-pem-file" }
242
244
query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
243
245
244
- # Get a proxy and sign the CSR
245
- proxy = X509Chain ()
246
- proxyFile = Locations .getProxyLocation ()
247
- if not proxyFile :
248
- return S_ERROR (f"No proxy available" )
249
- result = proxy .loadProxyFromFile (proxyFile )
250
- if not result ["OK" ]:
251
- return S_ERROR (f"Can't load { proxyFile } : { result ['Message' ]} " )
252
- result = proxy .generateChainFromRequestString (csrContent )
246
+ # Sign the CSR
247
+ result = self .proxy .generateChainFromRequestString (csrContent )
253
248
if not result ["OK" ]:
254
249
self .log .error ("Problem with the Certificate Signing Request:" , result ["Message" ])
255
250
return S_ERROR ("Problem with the Certificate Signing Request" )
@@ -322,6 +317,32 @@ def _getDelegationIDs(self):
322
317
delegationIDs = [delegationContent ["id" ] for delegationContent in delegations ]
323
318
return S_OK (delegationIDs )
324
319
320
+ def _getProxyFromDelegationID (self , delegationID ):
321
+ """Get proxy stored within the delegation
322
+
323
+ :param str delegationID: delegation ID
324
+ """
325
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
326
+ params = {"action" : "get" }
327
+
328
+ # Submit the POST request to get the delegation
329
+ result = self ._request ("post" , query , params = params )
330
+ if not result ["OK" ]:
331
+ self .log .error ("Issue while interacting with the delegations." , result ["Message" ])
332
+ return S_ERROR ("Issue while interacting with the delegations" )
333
+ response = result ["Value" ]
334
+
335
+ proxyContent = response .text
336
+ proxy = X509Chain ()
337
+ result = proxy .loadChainFromString (proxyContent )
338
+ if not result ["OK" ]:
339
+ self .log .error (
340
+ "Issue while trying to load proxy content from delegation" , f"{ delegationID } : { result ['Message' ]} "
341
+ )
342
+ return S_ERROR ("Issue while trying to load proxy content from delegation" )
343
+
344
+ return S_OK (proxy )
345
+
325
346
#############################################################################
326
347
327
348
def _getArcJobID (self , executableFile , inputs , outputs , delegation ):
@@ -406,18 +427,33 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
406
427
if not result ["OK" ]:
407
428
self .log .error ("Could not get delegation IDs." , result ["Message" ])
408
429
return S_ERROR ("Could not get delegation IDs" )
409
-
410
430
delegationIDs = result ["Value" ]
411
- if not delegationIDs :
431
+
432
+ # Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
433
+ currentDelegationID = None
434
+ proxyGroup = self .proxy .getDIRACGroup ()
435
+ for delegationID in delegationIDs :
436
+ # Get the proxy attached to the delegationID
437
+ result = self ._getProxyFromDelegationID (delegationID )
438
+ if not result ["OK" ]:
439
+ return result
440
+ proxy = result ["Value" ]
441
+
442
+ if proxy .getDIRACGroup () != proxyGroup :
443
+ continue
444
+
445
+ # If we are here, we have found the right delegationID to use
446
+ currentDelegationID = delegationID
447
+
448
+ if not currentDelegationID :
412
449
# No existing delegation, we need to prepare one
413
450
result = self ._prepareDelegation ()
414
451
if not result ["OK" ]:
415
452
self .log .warn ("Could not get a new delegation" , f"for CE { self .ceHost } " )
416
453
return S_ERROR ("Could not get a new delegation" )
417
- self ._delegationID = result ["Value" ]
418
- else :
419
- self ._delegationID = delegationIDs [0 ]
420
- delegation = f"\n (delegationid={ self ._delegationID } )"
454
+ currentDelegationID = result ["Value" ]
455
+
456
+ delegation = f"\n (delegationid={ currentDelegationID } )"
421
457
422
458
if not inputs :
423
459
inputs = []
@@ -562,10 +598,14 @@ def getCEStatus(self):
562
598
self .log .error ("Cannot get CE Status" , result ["Message" ])
563
599
return result
564
600
565
- # Try to find out which VO we are running for.
601
+ # Find out which VO we are running for.
566
602
# Essential now for REST interface.
567
- res = getVOfromProxyGroup ()
568
- vo = res ["Value" ] if res ["OK" ] else ""
603
+ result = getVOfromProxyGroup ()
604
+ if not result ["OK" ]:
605
+ return result
606
+ if not result ["Value" ]:
607
+ return S_ERROR ("Could not get VO value from the proxy group" )
608
+ vo = result ["Value" ]
569
609
570
610
# Prepare the command
571
611
params = {"schema" : "glue2" }
@@ -599,33 +639,36 @@ def getCEStatus(self):
599
639
600
640
#############################################################################
601
641
602
- def _renewDelegation (self ):
603
- """Renew the delegations"""
642
+ def _renewDelegation (self , delegationID ):
643
+ """Renew the delegation
644
+
645
+ :params delegationID: delegation ID to renew
646
+ """
604
647
# Prepare the command
605
648
params = {"action" : "get" }
606
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
649
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
607
650
608
651
# Submit the POST request to get the proxy
609
652
result = self ._request ("post" , query , params = params )
610
653
if not result ["OK" ]:
611
- self .log .error ("Could not get a proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
612
- return S_ERROR (f"Could not get a proxy for delegation { self . _delegationID } " )
654
+ self .log .error ("Could not get a proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
655
+ return S_ERROR (f"Could not get a proxy for delegation { delegationID } " )
613
656
response = result ["Value" ]
614
657
615
658
proxy = X509Chain ()
616
659
result = proxy .loadChainFromString (response .text )
617
660
if not result ["OK" ]:
618
- self .log .error ("Could not load proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
619
- return S_ERROR (f"Could not load proxy for delegation { self . _delegationID } " )
661
+ self .log .error ("Could not load proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
662
+ return S_ERROR (f"Could not load proxy for delegation { delegationID } " )
620
663
621
664
# Now test and renew the proxy
622
665
result = proxy .getRemainingSecs ()
623
666
if not result ["OK" ]:
624
667
self .log .error (
625
668
"Could not get remaining time from the proxy for" ,
626
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
669
+ f"delegation { delegationID } : { result ['Message' ]} " ,
627
670
)
628
- return S_ERROR (f"Could not get remaining time from the proxy for delegation { self . _delegationID } " )
671
+ return S_ERROR (f"Could not get remaining time from the proxy for delegation { delegationID } " )
629
672
timeLeft = result ["Value" ]
630
673
631
674
if timeLeft >= self .proxyTimeLeftBeforeRenewal :
@@ -634,31 +677,31 @@ def _renewDelegation(self):
634
677
635
678
self .log .verbose (
636
679
"Renewing delegation" ,
637
- f"{ self . _delegationID } whose proxy expires at { timeLeft } " ,
680
+ f"{ delegationID } whose proxy expires at { timeLeft } " ,
638
681
)
639
682
# Proxy needs to be renewed - try to renew it
640
683
# First, get a new CSR from the delegation
641
684
params = {"action" : "renew" }
642
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
685
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
643
686
result = self ._request ("post" , query , params = params )
644
687
if not result ["OK" ]:
645
688
self .log .error (
646
689
"Proxy not renewed, failed to get CSR" ,
647
- f"for delegation { self . _delegationID } " ,
690
+ f"for delegation { delegationID } " ,
648
691
)
649
- return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { self . _delegationID } " )
692
+ return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { delegationID } " )
650
693
response = result ["Value" ]
651
694
652
695
# Then, sign and upload the certificate
653
- result = self .__uploadCertificate (self . _delegationID , response .text )
696
+ result = self .__uploadCertificate (delegationID , response .text )
654
697
if not result ["OK" ]:
655
698
self .log .error (
656
699
"Proxy not renewed, failed to send renewed proxy" ,
657
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
700
+ f"delegation { delegationID } : { result ['Message' ]} " ,
658
701
)
659
- return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { self . _delegationID } " )
702
+ return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { delegationID } " )
660
703
661
- self .log .verbose ("Proxy successfully renewed" , f"for delegation { self . _delegationID } " )
704
+ self .log .verbose ("Proxy successfully renewed" , f"for delegation { delegationID } " )
662
705
663
706
return S_OK ()
664
707
@@ -714,12 +757,16 @@ def getJobStatus(self, jobIDList):
714
757
jobsToCancel .append (arcJob ["id" ])
715
758
self .log .debug (f"Killing held job { jobID } " )
716
759
717
- # Renew delegation to renew the proxies of the jobs
718
- if self ._delegationID :
719
- result = self ._renewDelegation ()
760
+ # Renew delegations to renew the proxies of the jobs
761
+ result = self ._getDelegationIDs ()
762
+ if not result ["OK" ]:
763
+ return result
764
+ delegationIDs = result ["Value" ]
765
+ for delegationID in delegationIDs :
766
+ result = self ._renewDelegation (delegationID )
720
767
if not result ["OK" ]:
721
768
# Only log here as we still want to return statuses
722
- self .log .warn ("Failed to renew delegation" , f"{ self . _delegationID } : { result ['Message' ]} " )
769
+ self .log .warn ("Failed to renew delegation" , f"{ delegationID } : { result ['Message' ]} " )
723
770
724
771
# Kill held jobs
725
772
if jobsToCancel :
0 commit comments