@@ -46,8 +46,6 @@ def __init__(self, ceUniqueID):
46
46
self .restVersion = "1.0"
47
47
# Time left before proxy renewal: 3 hours is a good default
48
48
self .proxyTimeLeftBeforeRenewal = 10800
49
- # Current delegation ID, generated/fetched in submitJob(), renewed in getJobStatus()
50
- self ._delegationID = None
51
49
# Timeout
52
50
self .timeout = 5.0
53
51
# Request session
@@ -179,6 +177,10 @@ def _checkSession(self):
179
177
self .headers .pop ("Authorization" , None )
180
178
181
179
# Get a proxy: still mandatory, even if tokens are used to authenticate
180
+ if not self .proxy :
181
+ self .log .error ("Proxy not set" )
182
+ return S_ERROR ("Proxy not set" )
183
+
182
184
result = self ._prepareProxy ()
183
185
if not result ["OK" ]:
184
186
self .log .error ("Failed to set up proxy" , result ["Message" ])
@@ -190,7 +192,7 @@ def _checkSession(self):
190
192
return S_OK ()
191
193
192
194
# Attach the proxy to the session, only if the token is unavailable
193
- self .session .cert = Locations . getProxyLocation ()
195
+ self .session .cert = os . environ [ "X509_USER_PROXY" ]
194
196
return S_OK ()
195
197
196
198
#############################################################################
@@ -233,15 +235,8 @@ def __uploadCertificate(self, delegationID, csrContent):
233
235
headers = {"Content-Type" : "x-pem-file" }
234
236
query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
235
237
236
- # Get a proxy and sign the CSR
237
- proxy = X509Chain ()
238
- proxyFile = Locations .getProxyLocation ()
239
- if not proxyFile :
240
- return S_ERROR (f"No proxy available" )
241
- result = proxy .loadProxyFromFile (proxyFile )
242
- if not result ["OK" ]:
243
- return S_ERROR (f"Can't load { proxyFile } : { result ['Message' ]} " )
244
- result = proxy .generateChainFromRequestString (csrContent )
238
+ # Sign the CSR
239
+ result = self .proxy .generateChainFromRequestString (csrContent )
245
240
if not result ["OK" ]:
246
241
self .log .error ("Problem with the Certificate Signing Request:" , result ["Message" ])
247
242
return S_ERROR ("Problem with the Certificate Signing Request" )
@@ -314,6 +309,32 @@ def _getDelegationIDs(self):
314
309
delegationIDs = [delegationContent ["id" ] for delegationContent in delegations ]
315
310
return S_OK (delegationIDs )
316
311
312
+ def _getProxyFromDelegationID (self , delegationID ):
313
+ """Get proxy stored within the delegation
314
+
315
+ :param str delegationID: delegation ID
316
+ """
317
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
318
+ params = {"action" : "get" }
319
+
320
+ # Submit the POST request to get the delegation
321
+ result = self ._request ("post" , query , params = params )
322
+ if not result ["OK" ]:
323
+ self .log .error ("Issue while interacting with the delegations." , result ["Message" ])
324
+ return S_ERROR ("Issue while interacting with the delegations" )
325
+ response = result ["Value" ]
326
+
327
+ proxyContent = response .text
328
+ proxy = X509Chain ()
329
+ result = proxy .loadChainFromString (proxyContent )
330
+ if not result ["OK" ]:
331
+ self .log .error (
332
+ "Issue while trying to load proxy content from delegation" , f"{ delegationID } : { result ['Message' ]} "
333
+ )
334
+ return S_ERROR ("Issue while trying to load proxy content from delegation" )
335
+
336
+ return S_OK (proxy )
337
+
317
338
#############################################################################
318
339
319
340
def _getArcJobID (self , executableFile , inputs , outputs , delegation ):
@@ -398,18 +419,33 @@ def submitJob(self, executableFile, proxy, numberOfJobs=1, inputs=None, outputs=
398
419
if not result ["OK" ]:
399
420
self .log .error ("Could not get delegation IDs." , result ["Message" ])
400
421
return S_ERROR ("Could not get delegation IDs" )
401
-
402
422
delegationIDs = result ["Value" ]
403
- if not delegationIDs :
423
+
424
+ # Get the delegationID which corresponds to the DIRAC group of the proxy if it exists
425
+ currentDelegationID = None
426
+ proxyGroup = self .proxy .getDIRACGroup ()
427
+ for delegationID in delegationIDs :
428
+ # Get the proxy attached to the delegationID
429
+ result = self ._getProxyFromDelegationID (delegationID )
430
+ if not result ["OK" ]:
431
+ return result
432
+ proxy = result ["Value" ]
433
+
434
+ if proxy .getDIRACGroup () != proxyGroup :
435
+ continue
436
+
437
+ # If we are here, we have found the right delegationID to use
438
+ currentDelegationID = delegationID
439
+
440
+ if not currentDelegationID :
404
441
# No existing delegation, we need to prepare one
405
442
result = self ._prepareDelegation ()
406
443
if not result ["OK" ]:
407
444
self .log .warn ("Could not get a new delegation" , f"for CE { self .ceHost } " )
408
445
return S_ERROR ("Could not get a new delegation" )
409
- self ._delegationID = result ["Value" ]
410
- else :
411
- self ._delegationID = delegationIDs [0 ]
412
- delegation = f"\n (delegationid={ self ._delegationID } )"
446
+ currentDelegationID = result ["Value" ]
447
+
448
+ delegation = f"\n (delegationid={ currentDelegationID } )"
413
449
414
450
if not inputs :
415
451
inputs = []
@@ -554,10 +590,14 @@ def getCEStatus(self):
554
590
self .log .error ("Cannot get CE Status" , result ["Message" ])
555
591
return result
556
592
557
- # Try to find out which VO we are running for.
593
+ # Find out which VO we are running for.
558
594
# Essential now for REST interface.
559
- res = getVOfromProxyGroup ()
560
- vo = res ["Value" ] if res ["OK" ] else ""
595
+ result = getVOfromProxyGroup ()
596
+ if not result ["OK" ]:
597
+ return result
598
+ if not result ["Value" ]:
599
+ return S_ERROR ("Could not get VO value from the proxy group" )
600
+ vo = result ["Value" ]
561
601
562
602
# Prepare the command
563
603
params = {"schema" : "glue2" }
@@ -591,33 +631,36 @@ def getCEStatus(self):
591
631
592
632
#############################################################################
593
633
594
- def _renewDelegation (self ):
595
- """Renew the delegations"""
634
+ def _renewDelegation (self , delegationID ):
635
+ """Renew the delegation
636
+
637
+ :params delegationID: delegation ID to renew
638
+ """
596
639
# Prepare the command
597
640
params = {"action" : "get" }
598
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
641
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
599
642
600
643
# Submit the POST request to get the proxy
601
644
result = self ._request ("post" , query , params = params )
602
645
if not result ["OK" ]:
603
- self .log .error ("Could not get a proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
604
- return S_ERROR (f"Could not get a proxy for delegation { self . _delegationID } " )
646
+ self .log .error ("Could not get a proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
647
+ return S_ERROR (f"Could not get a proxy for delegation { delegationID } " )
605
648
response = result ["Value" ]
606
649
607
650
proxy = X509Chain ()
608
651
result = proxy .loadChainFromString (response .text )
609
652
if not result ["OK" ]:
610
- self .log .error ("Could not load proxy for" , f"delegation { self . _delegationID } : { result ['Message' ]} " )
611
- return S_ERROR (f"Could not load proxy for delegation { self . _delegationID } " )
653
+ self .log .error ("Could not load proxy for" , f"delegation { delegationID } : { result ['Message' ]} " )
654
+ return S_ERROR (f"Could not load proxy for delegation { delegationID } " )
612
655
613
656
# Now test and renew the proxy
614
657
result = proxy .getRemainingSecs ()
615
658
if not result ["OK" ]:
616
659
self .log .error (
617
660
"Could not get remaining time from the proxy for" ,
618
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
661
+ f"delegation { delegationID } : { result ['Message' ]} " ,
619
662
)
620
- return S_ERROR (f"Could not get remaining time from the proxy for delegation { self . _delegationID } " )
663
+ return S_ERROR (f"Could not get remaining time from the proxy for delegation { delegationID } " )
621
664
timeLeft = result ["Value" ]
622
665
623
666
if timeLeft >= self .proxyTimeLeftBeforeRenewal :
@@ -626,31 +669,31 @@ def _renewDelegation(self):
626
669
627
670
self .log .verbose (
628
671
"Renewing delegation" ,
629
- f"{ self . _delegationID } whose proxy expires at { timeLeft } " ,
672
+ f"{ delegationID } whose proxy expires at { timeLeft } " ,
630
673
)
631
674
# Proxy needs to be renewed - try to renew it
632
675
# First, get a new CSR from the delegation
633
676
params = {"action" : "renew" }
634
- query = self ._urlJoin (os .path .join ("delegations" , self . _delegationID ))
677
+ query = self ._urlJoin (os .path .join ("delegations" , delegationID ))
635
678
result = self ._request ("post" , query , params = params )
636
679
if not result ["OK" ]:
637
680
self .log .error (
638
681
"Proxy not renewed, failed to get CSR" ,
639
- f"for delegation { self . _delegationID } " ,
682
+ f"for delegation { delegationID } " ,
640
683
)
641
- return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { self . _delegationID } " )
684
+ return S_ERROR (f"Proxy not renewed, failed to get CSR for delegation { delegationID } " )
642
685
response = result ["Value" ]
643
686
644
687
# Then, sign and upload the certificate
645
- result = self .__uploadCertificate (self . _delegationID , response .text )
688
+ result = self .__uploadCertificate (delegationID , response .text )
646
689
if not result ["OK" ]:
647
690
self .log .error (
648
691
"Proxy not renewed, failed to send renewed proxy" ,
649
- f"delegation { self . _delegationID } : { result ['Message' ]} " ,
692
+ f"delegation { delegationID } : { result ['Message' ]} " ,
650
693
)
651
- return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { self . _delegationID } " )
694
+ return S_ERROR (f"Proxy not renewed, failed to send renewed proxy for delegation { delegationID } " )
652
695
653
- self .log .verbose ("Proxy successfully renewed" , f"for delegation { self . _delegationID } " )
696
+ self .log .verbose ("Proxy successfully renewed" , f"for delegation { delegationID } " )
654
697
655
698
return S_OK ()
656
699
@@ -706,12 +749,16 @@ def getJobStatus(self, jobIDList):
706
749
jobsToCancel .append (arcJob ["id" ])
707
750
self .log .debug (f"Killing held job { jobID } " )
708
751
709
- # Renew delegation to renew the proxies of the jobs
710
- if self ._delegationID :
711
- result = self ._renewDelegation ()
752
+ # Renew delegations to renew the proxies of the jobs
753
+ result = self ._getDelegationIDs ()
754
+ if not result ["OK" ]:
755
+ return result
756
+ delegationIDs = result ["Value" ]
757
+ for delegationID in delegationIDs :
758
+ result = self ._renewDelegation (delegationID )
712
759
if not result ["OK" ]:
713
760
# Only log here as we still want to return statuses
714
- self .log .warn ("Failed to renew delegation" , f"{ self . _delegationID } : { result ['Message' ]} " )
761
+ self .log .warn ("Failed to renew delegation" , f"{ delegationID } : { result ['Message' ]} " )
715
762
716
763
# Kill held jobs
717
764
if jobsToCancel :
0 commit comments