Skip to content

Commit a18d85f

Browse files
authored
Merge pull request #6208 from martynia/integration_janusz_pilotlogsWrapper_dev
[integration] Remote Pilot Logger to Tornado
2 parents d24f3ae + 9bce963 commit a18d85f

15 files changed

+999
-120
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
""" :mod: PilotLoggingAgent
2+
3+
PilotLoggingAgent sends Pilot log files to an SE.
4+
5+
.. literalinclude:: ../ConfigTemplate.cfg
6+
:start-after: ##BEGIN PilotLoggingAgent
7+
:end-before: ##END
8+
:dedent: 2
9+
:caption: PilotLoggingAgent options
10+
"""
11+
12+
# # imports
13+
import os
14+
import time
15+
from DIRAC import S_OK, S_ERROR, gConfig
16+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
17+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs
18+
from DIRAC.Core.Base.AgentModule import AgentModule
19+
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
20+
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
21+
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient
22+
23+
24+
class PilotLoggingAgent(AgentModule):
25+
"""
26+
.. class:: PilotLoggingAgent
27+
28+
The agent sends completed pilot log files to permanent storage for analysis.
29+
"""
30+
31+
def __init__(self, *args, **kwargs):
32+
"""c'tor"""
33+
super().__init__(*args, **kwargs)
34+
self.clearPilotsDelay = 30
35+
36+
def initialize(self):
37+
"""
38+
agent's initialisation. Use this agent's CS information to:
39+
Determine what Defaults/Shifter shifter proxy to use.,
40+
get the target SE name from the CS.
41+
Obtain log file location from Tornado.
42+
43+
:param self: self reference
44+
"""
45+
# pilot logs lifetime in days
46+
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", self.clearPilotsDelay)
47+
# configured VOs and setup
48+
res = getVOs()
49+
if not res["OK"]:
50+
return res
51+
self.voList = res.get("Value", [])
52+
53+
if isinstance(self.voList, str):
54+
self.voList = [self.voList]
55+
56+
return S_OK()
57+
58+
def execute(self):
59+
"""
60+
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
61+
Use a shifter proxy dynamically loaded for every VO
62+
63+
:param self: self reference
64+
"""
65+
voRes = {}
66+
for vo in self.voList:
67+
self.opsHelper = Operations(vo=vo)
68+
# is remote pilot logging enabled for the VO ?
69+
pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False)
70+
if pilotLogging:
71+
res = self.opsHelper.getOptionsDict("Shifter/DataManager")
72+
if not res["OK"]:
73+
voRes[vo] = "No shifter defined - skipped"
74+
self.log.error(f"No shifter defined for VO: {vo} - skipping ...")
75+
continue
76+
77+
proxyUser = res["Value"].get("User")
78+
proxyGroup = res["Value"].get("Group")
79+
if proxyGroup is None or proxyUser is None:
80+
self.log.error(
81+
f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}"
82+
)
83+
voRes[vo] = "No proxy user or group defined - skipped"
84+
continue
85+
86+
self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
87+
res = self.executeForVO( # pylint: disable=unexpected-keyword-arg
88+
vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
89+
)
90+
if not res["OK"]:
91+
voRes[vo] = res["Message"]
92+
if voRes:
93+
for key, value in voRes.items():
94+
self.log.error(f"Error for {key} vo; message: {value}")
95+
voRes.update(S_ERROR("Agent cycle for some VO finished with errors"))
96+
return voRes
97+
return S_OK()
98+
99+
@executeWithUserProxy
100+
def executeForVO(self, vo):
101+
"""
102+
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS:
103+
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path).
104+
UploadSE - Storage element where the logs will be kept.
105+
106+
:param str vo: vo enabled for remote pilot logging
107+
:return: S_OK or S_ERROR
108+
:rtype: dict
109+
"""
110+
111+
self.log.info(f"Pilot files upload cycle started for VO: {vo}")
112+
res = self.opsHelper.getOptionsDict("Pilot")
113+
if not res["OK"]:
114+
return S_ERROR(f"No pilot section for {vo} vo")
115+
pilotOptions = res["Value"]
116+
uploadSE = pilotOptions.get("UploadSE")
117+
if uploadSE is None:
118+
return S_ERROR("Upload SE not defined")
119+
self.log.info(f"Pilot upload SE: {uploadSE}")
120+
121+
uploadPath = pilotOptions.get("UploadPath")
122+
if uploadPath is None:
123+
return S_ERROR(f"Upload path on SE {uploadSE} not defined")
124+
self.log.info(f"Pilot upload path: {uploadPath}")
125+
126+
client = TornadoPilotLoggingClient(useCertificates=True)
127+
resDict = client.getMetadata()
128+
129+
if not resDict["OK"]:
130+
return resDict
131+
132+
# vo-specific source log path:
133+
pilotLogPath = os.path.join(resDict["Value"]["LogPath"], vo)
134+
# check for new files and upload them
135+
if not os.path.exists(pilotLogPath):
136+
# not a disaster, the VO is enabled, but no logfiles were ever stored.
137+
return S_OK()
138+
# delete old pilot log files for the vo VO
139+
self.clearOldPilotLogs(pilotLogPath)
140+
141+
self.log.info(f"Pilot log files location = {pilotLogPath} for VO: {vo}")
142+
143+
# get finalised (.log) files from Tornado and upload them to the selected SE
144+
145+
files = [
146+
f for f in os.listdir(pilotLogPath) if os.path.isfile(os.path.join(pilotLogPath, f)) and f.endswith("log")
147+
]
148+
149+
if not files:
150+
self.log.info("No files to upload for this cycle")
151+
for elem in files:
152+
lfn = os.path.join(uploadPath, elem)
153+
name = os.path.join(pilotLogPath, elem)
154+
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=uploadSE, overwrite=True)
155+
if not res["OK"]:
156+
self.log.error("Could not upload", f"to {uploadSE}: {res['Message']}")
157+
else:
158+
self.log.verbose("File uploaded: ", f"LFN = {res['Value']}")
159+
try:
160+
os.remove(name)
161+
except Exception as excp:
162+
self.log.exception("Cannot remove a local file after uploading", lException=excp)
163+
return S_OK()
164+
165+
def clearOldPilotLogs(self, pilotLogPath):
166+
"""
167+
Delete old pilot log files unconditionally. Assumes that pilotLogPath exists.
168+
169+
:param str pilotLogPath: log files directory
170+
:return: None
171+
:rtype: None
172+
"""
173+
174+
files = os.listdir(pilotLogPath)
175+
seconds = int(self.clearPilotsDelay) * 86400
176+
currentTime = time.time()
177+
178+
for file in files:
179+
fullpath = os.path.join(pilotLogPath, file)
180+
modifTime = os.stat(fullpath).st_mtime
181+
if modifTime < currentTime - seconds:
182+
self.log.debug(f" Deleting old log : {fullpath}")
183+
try:
184+
os.remove(fullpath)
185+
except Exception as excp:
186+
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp)

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,10 +1017,6 @@ def _getPilotOptions(self, queue, **kwargs):
10171017

10181018
pilotOptions.append("--pythonVersion=3")
10191019

1020-
# Debug
1021-
if self.pilotLogLevel.lower() == "debug":
1022-
pilotOptions.append("-ddd")
1023-
10241020
# DIRAC Extensions to be used in pilots
10251021
pilotExtensionsList = opsHelper.getValue("Pilot/Extensions", [])
10261022
extensionsList = []
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
""" Test class for PilotLoggingAgent Agent
2+
"""
3+
import os
4+
import time
5+
import tempfile
6+
7+
import pytest
8+
from unittest.mock import MagicMock, patch
9+
10+
# DIRAC Components
11+
import DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent as plaModule
12+
from DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent import PilotLoggingAgent
13+
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
14+
15+
gLogger.setLevel("DEBUG")
16+
17+
# Mock Objects
18+
mockReply = MagicMock()
19+
mockReply1 = MagicMock()
20+
mockOperations = MagicMock()
21+
mockTornadoClient = MagicMock()
22+
mockDataManager = MagicMock()
23+
mockAM = MagicMock()
24+
mockNone = MagicMock()
25+
mockNone.return_value = None
26+
27+
upDict = {
28+
"OK": True,
29+
"Value": {"User": "proxyUser", "Group": "proxyGroup"},
30+
}
31+
32+
33+
@pytest.fixture
34+
def plaBase(mocker):
35+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.__init__")
36+
mocker.patch(
37+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties",
38+
side_effect=lambda x, y=None: y,
39+
create=True,
40+
)
41+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.am_getOption", return_value=mockAM)
42+
mocker.patch(
43+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getVOs",
44+
return_value={"OK": True, "Value": ["gridpp", "lz"]},
45+
)
46+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.gConfig.getValue", return_value="GridPP")
47+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getValue", side_effect=mockReply)
48+
mocker.patch(
49+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1
50+
)
51+
pla = PilotLoggingAgent()
52+
pla.log = gLogger
53+
pla._AgentModule__configDefaults = mockAM
54+
return pla
55+
56+
57+
@pytest.fixture
58+
def pla_initialised(mocker, plaBase):
59+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO")
60+
plaBase.initialize()
61+
return plaBase
62+
63+
64+
@pytest.fixture
65+
def pla(mocker, plaBase):
66+
mocker.patch(
67+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.TornadoPilotLoggingClient",
68+
side_effect=mockTornadoClient,
69+
)
70+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations", side_effect=mockOperations)
71+
mocker.patch(
72+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager",
73+
side_effect=mockDataManager,
74+
)
75+
plaBase.initialize()
76+
return plaBase
77+
78+
79+
def test_initialize(plaBase):
80+
res = plaBase.initialize()
81+
assert plaBase.voList == plaModule.getVOs()["Value"]
82+
assert res == S_OK()
83+
84+
85+
@pytest.mark.parametrize(
86+
"mockReplyInput, expected, expectedExecOut, expected2",
87+
[
88+
("/Pilot/RemoteLogging", [True, False], S_OK(), upDict),
89+
("/Pilot/RemoteLogging", [False, False], S_OK(), upDict),
90+
("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict),
91+
],
92+
)
93+
def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2):
94+
"""Testing a thin version of execute (executeForVO is mocked)"""
95+
assert pla_initialised.voList == plaModule.getVOs()["Value"]
96+
mockReply.side_effect = expected
97+
mockReply1.return_value = expected2
98+
# remote pilot logging on (gridpp only) and off.
99+
pla_initialised.executeForVO.return_value = expectedExecOut
100+
res = pla_initialised.execute()
101+
if not any(expected):
102+
pla_initialised.executeForVO.assert_not_called()
103+
else:
104+
assert pla_initialised.executeForVO.called
105+
pla_initialised.executeForVO.assert_called_with(
106+
"gridpp",
107+
proxyUserName=upDict["Value"]["User"],
108+
proxyUserGroup=upDict["Value"]["Group"],
109+
)
110+
assert res["OK"] == expectedExecOut["OK"]
111+
112+
113+
@pytest.mark.parametrize(
114+
"ppath, files, result",
115+
[
116+
("pilot/log/path/", ["file1.log", "file2.log", "file3.log"], S_OK()),
117+
("pilot/log/path/", [], S_OK()),
118+
],
119+
)
120+
def test_executeForVO(pla, ppath, files, result):
121+
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
122+
# full local temporary path:
123+
filepath = os.path.join(tempfile.TemporaryDirectory().name, ppath)
124+
# this is what getMetadata returns:
125+
resDict = {"OK": True, "Value": {"LogPath": filepath}}
126+
mockTornadoClient.return_value.getMetadata.return_value = resDict
127+
mockDataManager.return_value.putAndRegister.return_value = result
128+
if files:
129+
os.makedirs(os.path.join(filepath, "gridpp"), exist_ok=True)
130+
for elem in files:
131+
open(os.path.join(filepath, "gridpp", elem), "w")
132+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
133+
pla.opsHelper = mockOperations.return_value
134+
# success route
135+
res = pla.executeForVO(vo="gridpp")
136+
mockTornadoClient.assert_called_with(useCertificates=True)
137+
assert mockTornadoClient.return_value.getMetadata.called
138+
# only called with a non-empty file list:
139+
if files:
140+
assert mockDataManager.return_value.putAndRegister.called
141+
assert res == S_OK()
142+
143+
144+
def test_executeForVOMetaFails(pla):
145+
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
146+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
147+
pla.opsHelper = mockOperations.return_value
148+
# getMetadata call fails.
149+
mockTornadoClient.return_value.getMetadata.return_value = {"OK": False, "Message": "Failed, sorry.."}
150+
res = pla.executeForVO(vo="anything")
151+
assert res["OK"] is False
152+
153+
154+
@pytest.mark.parametrize(
155+
"opsHelperValues, expectedRes",
156+
[
157+
({"OK": True, "Value": {"UploadPath": "/gridpp/uploadPath"}}, S_ERROR("Upload SE not defined")),
158+
({"OK": True, "Value": {"UploadSE": "testUploadSE"}}, S_ERROR("Upload path on SE testUploadSE not defined")),
159+
({"OK": False}, S_ERROR(f"No pilot section for gridpp vo")),
160+
],
161+
)
162+
def test_executeForVOBadConfig(pla, opsHelperValues, expectedRes):
163+
"""Testing an incomplete configuration"""
164+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
165+
pla.opsHelper = mockOperations.return_value
166+
res = pla.executeForVO(vo="gridpp")
167+
assert res["OK"] is False
168+
assert res["Message"] == expectedRes["Message"]
169+
mockTornadoClient.return_value.getMetadata.reset_mock()
170+
mockTornadoClient.return_value.getMetadata.assert_not_called()
171+
172+
173+
@pytest.mark.parametrize(
174+
"filename, fileAge, ageLimit, expectedResult", [("survives.log", 10, 20, True), ("getsdeleted.log", 21, 20, False)]
175+
)
176+
def test_oldLogsCleaner(plaBase, filename, fileAge, ageLimit, expectedResult):
177+
"""Testing old files removal"""
178+
plaBase.clearPilotsDelay = ageLimit
179+
filepath = tempfile.TemporaryDirectory().name
180+
os.makedirs(filepath, exist_ok=True)
181+
testfile = os.path.join(filepath, filename)
182+
fd = open(testfile, "w")
183+
fd.close()
184+
assert os.path.exists(testfile) is True
185+
# cannot patch os.stat globally because os.path.exists uses it !
186+
with patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.os.stat") as mockOSStat:
187+
mockOSStat.return_value.st_mtime = time.time() - fileAge * 86400 # file older that fileAge in seconds
188+
plaBase.clearOldPilotLogs(filepath)
189+
assert os.path.exists(testfile) is expectedResult

0 commit comments

Comments
 (0)