Skip to content

Commit f2fc2f1

Browse files
authored
Merge pull request #7257 from martynia/integration_janusz_pilotlogsWrapper_fix7249
[integration] Refactor the PilotLoggingAgent - download proxies at initialisation (fix #7249)
2 parents 90ec210 + 7e094f9 commit f2fc2f1

File tree

2 files changed

+162
-60
lines changed

2 files changed

+162
-60
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PilotLoggingAgent.py

Lines changed: 103 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@
1212
# # imports
1313
import os
1414
import time
15+
import tempfile
1516
from DIRAC import S_OK, S_ERROR, gConfig
1617
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
1718
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs
1819
from DIRAC.Core.Base.AgentModule import AgentModule
19-
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
20+
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
21+
from DIRAC.Core.Utilities.Proxy import executeWithoutServerCertificate
22+
from DIRAC.Core.Utilities.Proxy import getProxy
2023
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
24+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOMSAttributeForGroup, getDNForUsername
2125
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient
2226

2327

@@ -31,46 +35,39 @@ class PilotLoggingAgent(AgentModule):
3135
def __init__(self, *args, **kwargs):
3236
"""c'tor"""
3337
super().__init__(*args, **kwargs)
34-
self.clearPilotsDelay = 30
38+
self.clearPilotsDelay = 30 # in days
39+
self.proxyTimeleftLimit = 600 # in seconds
3540

3641
def initialize(self):
3742
"""
3843
agent's initialisation. Use this agent's CS information to:
39-
Determine what Defaults/Shifter shifter proxy to use.,
40-
get the target SE name from the CS.
41-
Obtain log file location from Tornado.
44+
Determine VOs with remote logging enabled,
45+
Determine what Defaults/Shifter shifter proxy to use., download the proxies.
4246
4347
:param self: self reference
4448
"""
4549
# pilot logs lifetime in days
4650
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay)
47-
# configured VOs and setup
51+
# proxy timeleft limit before we get a new one.
52+
self.proxyTimeleftLimit = self.am_getOption("ProxyTimeleftLimit", self.proxyTimeleftLimit)
53+
# configured VOs
4854
res = getVOs()
4955
if not res["OK"]:
5056
return res
5157
self.voList = res.get("Value", [])
5258

5359
if isinstance(self.voList, str):
5460
self.voList = [self.voList]
61+
# download shifter proxies for enabled VOs:
62+
self.proxyDict = {}
5563

56-
return S_OK()
57-
58-
def execute(self):
59-
"""
60-
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
61-
Use a shifter proxy dynamically loaded for every VO
62-
63-
:param self: self reference
64-
"""
65-
voRes = {}
6664
for vo in self.voList:
67-
self.opsHelper = Operations(vo=vo)
65+
opsHelper = Operations(vo=vo)
6866
# is remote pilot logging enabled for the VO ?
69-
pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False)
67+
pilotLogging = opsHelper.getValue("/Pilot/RemoteLogging", False)
7068
if pilotLogging:
71-
res = self.opsHelper.getOptionsDict("Shifter/DataManager")
69+
res = opsHelper.getOptionsDict("Shifter/DataManager")
7270
if not res["OK"]:
73-
voRes[vo] = "No shifter defined - skipped"
7471
self.log.error(f"No shifter defined for VO: {vo} - skipping ...")
7572
continue
7673

@@ -80,36 +77,75 @@ def execute(self):
8077
self.log.error(
8178
f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}"
8279
)
83-
voRes[vo] = "No proxy user or group defined - skipped"
8480
continue
8581

8682
self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
87-
res = self.executeForVO( # pylint: disable=unexpected-keyword-arg
88-
vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
89-
)
90-
if not res["OK"]:
91-
voRes[vo] = res["Message"]
83+
# download a proxy and save a file name, userDN and proxyGroup for future use:
84+
result = getDNForUsername(proxyUser)
85+
if not result["OK"]:
86+
self.log.error(f"Could not obtain a DN of user {proxyUser} for VO {vo}, skipped")
87+
continue
88+
userDNs = result["Value"] # the same user may have more than one DN
89+
90+
with tempfile.NamedTemporaryFile(prefix="gridpp" + "__", delete=False) as ntf:
91+
result = self._downloadProxy(vo, userDNs, proxyGroup, ntf.name)
92+
93+
if not result["OK"]:
94+
# no proxy, we have no other option than to skip the VO
95+
continue
96+
self.proxyDict[vo] = {"proxy": result["Value"], "DN": userDNs, "group": proxyGroup}
97+
98+
return S_OK()
99+
100+
def execute(self):
101+
"""
102+
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
103+
Consider only VOs we have proxies for.
104+
105+
:param self: self reference
106+
"""
107+
voRes = {}
108+
self.log.verbose(f"VOs configured for remote logging: {list(self.proxyDict.keys())}")
109+
originalUserProxy = os.environ.get("X509_USER_PROXY")
110+
for vo, elem in self.proxyDict.items():
111+
if self._isProxyExpired(elem["proxy"], self.proxyTimeleftLimit):
112+
result = self._downloadProxy(vo, elem["DN"], elem["group"], elem["proxy"])
113+
if not result["OK"]:
114+
voRes[vo] = result["Message"]
115+
continue
116+
os.environ["X509_USER_PROXY"] = elem["proxy"]
117+
res = self.executeForVO(vo)
118+
if not res["OK"]:
119+
voRes[vo] = res["Message"]
120+
# restore the original proxy:
121+
if originalUserProxy:
122+
os.environ["X509_USER_PROXY"] = originalUserProxy
123+
else:
124+
os.environ.pop("X509_USER_PROXY", None)
125+
92126
if voRes:
93127
for key, value in voRes.items():
94128
self.log.error(f"Error for {key} vo; message: {value}")
95129
voRes.update(S_ERROR("Agent cycle for some VO finished with errors"))
96130
return voRes
131+
97132
return S_OK()
98133

99-
@executeWithUserProxy
134+
@executeWithoutServerCertificate
100135
def executeForVO(self, vo):
101136
"""
102137
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS:
103138
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path).
104139
UploadSE - Storage element where the logs will be kept.
105140
106-
:param str vo: vo enabled for remote pilot logging
141+
:param str vo: vo enabled for remote pilot logging (and a successfully downloaded proxy for the VO)
107142
:return: S_OK or S_ERROR
108143
:rtype: dict
109144
"""
110145

111146
self.log.info(f"Pilot files upload cycle started for VO: {vo}")
112-
res = self.opsHelper.getOptionsDict("Pilot")
147+
opsHelper = Operations(vo=vo)
148+
res = opsHelper.getOptionsDict("Pilot")
113149
if not res["OK"]:
114150
return S_ERROR(f"No pilot section for {vo} vo")
115151
pilotOptions = res["Value"]
@@ -184,3 +220,41 @@ def clearOldPilotLogs(self, pilotLogPath):
184220
os.remove(fullpath)
185221
except Exception as excp:
186222
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp)
223+
224+
def _downloadProxy(self, vo, userDNs, proxyGroup, filename):
225+
"""
226+
Fetch a new proxy and store it in a file filename.
227+
228+
:param str vo: VO to get a proxy for
229+
:param list userDNs: user DN list
230+
:param str proxyGroup: user group
231+
:param str filename: file name to store a proxy
232+
:return: Dirac S_OK or S_ERROR object
233+
:rtype: dict
234+
"""
235+
vomsAttr = getVOMSAttributeForGroup(proxyGroup)
236+
result = getProxy(userDNs, proxyGroup, vomsAttr=vomsAttr, proxyFilePath=filename)
237+
if not result["OK"]:
238+
self.log.error(f"Could not download a proxy for DN {userDNs}, group {proxyGroup} for VO {vo}, skipped")
239+
return S_ERROR(f"Could not download a proxy, {vo} skipped")
240+
return result
241+
242+
def _isProxyExpired(self, proxyfile, limit):
243+
"""
244+
Check proxy timeleft. If less than a limit, return True.
245+
246+
:param str proxyfile:
247+
:param int limit: timeleft threshold below which a proxy is considered expired.
248+
:return: True or False
249+
:rtype: bool
250+
"""
251+
result = getProxyInfo(proxyfile)
252+
if not result["OK"]:
253+
self.log.error(f"Could not get proxy info {result['Message']}")
254+
return True
255+
timeleft = result["Value"]["secondsLeft"]
256+
self.log.debug(f"Proxy {proxyfile} time left: {timeleft}")
257+
if timeleft < limit:
258+
self.log.info(f"proxy {proxyfile} expired/is about to expire. Will fetch a new one")
259+
return True
260+
return False

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PilotLoggingAgent.py

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
# Mock Objects
1818
mockReply = MagicMock()
1919
mockReply1 = MagicMock()
20+
mockGetDNForUsername = MagicMock()
21+
mockGetVomsAttr = MagicMock()
22+
mockGetProxy = MagicMock()
2023
mockOperations = MagicMock()
2124
mockTornadoClient = MagicMock()
2225
mockDataManager = MagicMock()
@@ -48,19 +51,16 @@ def plaBase(mocker):
4851
mocker.patch(
4952
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1
5053
)
54+
mocker.patch(
55+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getDNForUsername", side_effect=mockGetDNForUsername
56+
)
57+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getProxy", side_effect=mockGetProxy)
5158
pla = PilotLoggingAgent()
5259
pla.log = gLogger
5360
pla._AgentModule__configDefaults = mockAM
5461
return pla
5562

5663

57-
@pytest.fixture
58-
def pla_initialised(mocker, plaBase):
59-
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO")
60-
plaBase.initialize()
61-
return plaBase
62-
63-
6464
@pytest.fixture
6565
def pla(mocker, plaBase):
6666
mocker.patch(
@@ -72,42 +72,70 @@ def pla(mocker, plaBase):
7272
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager",
7373
side_effect=mockDataManager,
7474
)
75-
plaBase.initialize()
7675
return plaBase
7776

7877

79-
def test_initialize(plaBase):
78+
@pytest.mark.parametrize(
79+
"remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes",
80+
[
81+
(
82+
[True, False],
83+
upDict,
84+
S_OK(["myDN"]),
85+
S_OK(),
86+
S_OK("proxyfilename"),
87+
{"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}},
88+
S_OK(),
89+
),
90+
([False, False], upDict, S_OK(["myDN"]), S_OK(), S_OK(), {}, S_OK()),
91+
([True, False], upDict, S_ERROR("Could not obtain a DN"), S_OK(), S_OK(), {}, S_OK()),
92+
([True, False], upDict, S_ERROR("Could not download proxy"), S_OK(), S_ERROR("Failure"), {}, S_OK()),
93+
],
94+
)
95+
def test_initialize(plaBase, remoteLogging, options, getDN, getVOMS, getProxy, resDict, expectedRes):
96+
"""
97+
After a successful initialisation the proxyDict should contain proxy filenames key by a VO name.
98+
test loops: gridpp enabled, lz disabled, proxy obtained, result proxyDict contains a proxy filename.
99+
both VOs disabled => proxyDict empty
100+
gridpp enabled, by getDNForUsername fails => proxyDict empty
101+
gridpp enabled, lz disabled, getProxy fails => proxyDict empty
102+
103+
"""
104+
mockReply.side_effect = remoteLogging # Operations.getValue("/Pilot/RemoteLogging", False)
105+
mockReply1.return_value = options # Operations.getOptionsDict("Shifter/DataManager")
106+
mockGetDNForUsername.return_value = getDN
107+
mockGetVomsAttr.return_value = getVOMS
108+
mockGetProxy.return_value = getProxy
109+
80110
res = plaBase.initialize()
111+
81112
assert plaBase.voList == plaModule.getVOs()["Value"]
82-
assert res == S_OK()
113+
assert resDict == plaBase.proxyDict
114+
assert res == expectedRes
83115

84116

85117
@pytest.mark.parametrize(
86-
"mockReplyInput, expected, expectedExecOut, expected2",
118+
"proxyDict, execVORes, expectedResult",
87119
[
88-
("/Pilot/RemoteLogging", [True, False], S_OK(), upDict),
89-
("/Pilot/RemoteLogging", [False, False], S_OK(), upDict),
90-
("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict),
120+
({}, S_OK(), S_OK()),
121+
({"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}}, S_OK(), S_OK()),
122+
(
123+
{"gridpp": {"DN": ["myDN"], "group": "proxyGroup", "proxy": "proxyfilename"}},
124+
S_ERROR("Execute for VO failed"),
125+
S_ERROR("Agent cycle for some VO finished with errors"),
126+
),
91127
],
92128
)
93-
def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2):
129+
def test_execute(plaBase, proxyDict, execVORes, expectedResult):
94130
"""Testing a thin version of execute (executeForVO is mocked)"""
95-
assert pla_initialised.voList == plaModule.getVOs()["Value"]
96-
mockReply.side_effect = expected
97-
mockReply1.return_value = expected2
98-
# remote pilot logging on (gridpp only) and off.
99-
pla_initialised.executeForVO.return_value = expectedExecOut
100-
res = pla_initialised.execute()
101-
if not any(expected):
102-
pla_initialised.executeForVO.assert_not_called()
103-
else:
104-
assert pla_initialised.executeForVO.called
105-
pla_initialised.executeForVO.assert_called_with(
106-
"gridpp",
107-
proxyUserName=upDict["Value"]["User"],
108-
proxyUserGroup=upDict["Value"]["Group"],
109-
)
110-
assert res["OK"] == expectedExecOut["OK"]
131+
132+
plaBase.executeForVO = MagicMock()
133+
plaBase._isProxyExpired = MagicMock()
134+
plaBase._isProxyExpired.return_value = False
135+
plaBase.proxyDict = proxyDict
136+
plaBase.executeForVO.return_value = execVORes
137+
res = plaBase.execute()
138+
assert res["OK"] == expectedResult["OK"]
111139

112140

113141
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)