12
12
# # imports
13
13
import os
14
14
import time
15
+ import tempfile
15
16
from DIRAC import S_OK , S_ERROR , gConfig
16
17
from DIRAC .ConfigurationSystem .Client .Helpers .Operations import Operations
17
18
from DIRAC .ConfigurationSystem .Client .Helpers .Registry import getVOs
18
19
from DIRAC .Core .Base .AgentModule import AgentModule
19
- from DIRAC .Core .Utilities .Proxy import executeWithUserProxy
20
+ from DIRAC .Core .Utilities .Proxy import executeWithoutServerCertificate
21
+ from DIRAC .Core .Utilities .Proxy import getProxy
20
22
from DIRAC .DataManagementSystem .Client .DataManager import DataManager
23
+ from DIRAC .ConfigurationSystem .Client .Helpers .Registry import getVOMSAttributeForGroup , getDNForUsername
21
24
from DIRAC .WorkloadManagementSystem .Client .TornadoPilotLoggingClient import TornadoPilotLoggingClient
22
25
23
26
@@ -36,9 +39,8 @@ def __init__(self, *args, **kwargs):
36
39
def initialize (self ):
37
40
"""
38
41
agent's initialisation. Use this agent's CS information to:
39
- Determine what Defaults/Shifter shifter proxy to use.,
40
- get the target SE name from the CS.
41
- Obtain log file location from Tornado.
42
+ Determine VOs with remote logging enabled,
43
+ Determine what Defaults/Shifter shifter proxy to use., download the proxies.
42
44
43
45
:param self: self reference
44
46
"""
@@ -52,25 +54,17 @@ def initialize(self):
52
54
53
55
if isinstance (self .voList , str ):
54
56
self .voList = [self .voList ]
57
+ # download shifter proxies for enabled VOs:
58
+ self .proxyDict = {}
55
59
56
- return S_OK ()
57
-
58
- def execute (self ):
59
- """
60
- Execute one agent cycle. Upload log files to the SE and register them in the DFC.
61
- Use a shifter proxy dynamically loaded for every VO
62
-
63
- :param self: self reference
64
- """
65
- voRes = {}
66
60
for vo in self .voList :
67
- self . opsHelper = Operations (vo = vo )
61
+ opsHelper = Operations (vo = vo )
68
62
# is remote pilot logging enabled for the VO ?
69
- pilotLogging = self . opsHelper .getValue ("/Pilot/RemoteLogging" , False )
63
+ pilotLogging = opsHelper .getValue ("/Pilot/RemoteLogging" , False )
70
64
if pilotLogging :
71
- res = self . opsHelper .getOptionsDict ("Shifter/DataManager" )
65
+ res = opsHelper .getOptionsDict ("Shifter/DataManager" )
72
66
if not res ["OK" ]:
73
- voRes [vo ] = "No shifter defined - skipped"
67
+ # voRes[vo] = "No shifter defined - skipped"
74
68
self .log .error (f"No shifter defined for VO: { vo } - skipping ..." )
75
69
continue
76
70
@@ -80,36 +74,73 @@ def execute(self):
80
74
self .log .error (
81
75
f"No proxy user or group defined for pilot: VO: { vo } , User: { proxyUser } , Group: { proxyGroup } "
82
76
)
83
- voRes [vo ] = "No proxy user or group defined - skipped"
84
77
continue
85
78
86
79
self .log .info (f"Proxy used for pilot logging: VO: { vo } , User: { proxyUser } , Group: { proxyGroup } " )
87
- res = self .executeForVO ( # pylint: disable=unexpected-keyword-arg
88
- vo , proxyUserName = proxyUser , proxyUserGroup = proxyGroup
89
- )
90
- if not res ["OK" ]:
91
- voRes [vo ] = res ["Message" ]
80
+ # download a proxy and save a filename for future use:
81
+ result = getDNForUsername (proxyUser )
82
+ if not result ["OK" ]:
83
+ self .log .error (f"Could not obtain a DN of user { proxyUser } for VO { vo } , skipped" )
84
+ continue
85
+ userDNs = result ["Value" ] # a same user may have more than one DN
86
+ fd , filename = tempfile .mkstemp (prefix = vo + "__" )
87
+ os .close (fd )
88
+ vomsAttr = getVOMSAttributeForGroup (proxyGroup )
89
+ result = getProxy (userDNs , proxyGroup , vomsAttr = vomsAttr , proxyFilePath = filename )
90
+
91
+ if not result ["OK" ]:
92
+ self .log .error (
93
+ f"Could not download a proxy for DN { userDNs } , group { proxyGroup } for VO { vo } , skipped"
94
+ )
95
+ return result
96
+ self .proxyDict [vo ] = result ["Value" ]
97
+
98
+ return S_OK ()
99
+
100
+ def execute (self ):
101
+ """
102
+ Execute one agent cycle. Upload log files to the SE and register them in the DFC.
103
+ Consider only VOs we have proxies for.
104
+
105
+ :param self: self reference
106
+ """
107
+ voRes = {}
108
+ self .log .verbose (f"VOs configured for remote logging: { list (self .proxyDict .keys ())} " )
109
+ originalUserProxy = os .environ .get ("X509_USER_PROXY" )
110
+ for vo , proxy in self .proxyDict .items ():
111
+ os .environ ["X509_USER_PROXY" ] = proxy
112
+ res = self .executeForVO (vo )
113
+ if not res ["OK" ]:
114
+ voRes [vo ] = res ["Message" ]
115
+ # restore the original proxy:
116
+ if originalUserProxy :
117
+ os .environ ["X509_USER_PROXY" ] = originalUserProxy
118
+ else :
119
+ os .environ .pop ("X509_USER_PROXY" , None )
120
+
92
121
if voRes :
93
122
for key , value in voRes .items ():
94
123
self .log .error (f"Error for { key } vo; message: { value } " )
95
124
voRes .update (S_ERROR ("Agent cycle for some VO finished with errors" ))
96
125
return voRes
126
+
97
127
return S_OK ()
98
128
99
- @executeWithUserProxy
129
+ @executeWithoutServerCertificate
100
130
def executeForVO (self , vo ):
101
131
"""
102
132
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS:
103
133
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path).
104
134
UploadSE - Storage element where the logs will be kept.
105
135
106
- :param str vo: vo enabled for remote pilot logging
136
+ :param str vo: vo enabled for remote pilot logging (and a successfully downloaded proxy for the VO)
107
137
:return: S_OK or S_ERROR
108
138
:rtype: dict
109
139
"""
110
140
111
141
self .log .info (f"Pilot files upload cycle started for VO: { vo } " )
112
- res = self .opsHelper .getOptionsDict ("Pilot" )
142
+ opsHelper = Operations (vo = vo )
143
+ res = opsHelper .getOptionsDict ("Pilot" )
113
144
if not res ["OK" ]:
114
145
return S_ERROR (f"No pilot section for { vo } vo" )
115
146
pilotOptions = res ["Value" ]
0 commit comments