Skip to content

Commit 9d437d8

Browse files
committed
feat: Enabling remote pilot logging to Tornado.
1 parent 64981e4 commit 9d437d8

15 files changed

+1001
-120
lines changed
Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
""" :mod: PilotLoggingAgent
2+
3+
PilotLoggingAgent sends Pilot log files to an SE.
4+
5+
.. literalinclude:: ../ConfigTemplate.cfg
6+
:start-after: ##BEGIN PilotLoggingAgent
7+
:end-before: ##END
8+
:dedent: 2
9+
:caption: PilotLoggingAgent options
10+
"""
11+
12+
# # imports
13+
import os
14+
import time
15+
from DIRAC import S_OK, S_ERROR, gConfig
16+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
17+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs
18+
from DIRAC.Core.Base.AgentModule import AgentModule
19+
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
20+
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
21+
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient
22+
23+
24+
class PilotLoggingAgent(AgentModule):
25+
"""
26+
.. class:: PilotLoggingAgent
27+
28+
The agent sends completed pilot log files to permanent storage for analysis.
29+
"""
30+
31+
def __init__(self, *args, **kwargs):
32+
"""c'tor"""
33+
super().__init__(*args, **kwargs)
34+
35+
def initialize(self):
36+
"""
37+
agent's initialisation. Use this agent's CS information to:
38+
Determine what Defaults/Shifter shifter proxy to use.,
39+
get the target SE name from the CS.
40+
Obtain log file location from Tornado.
41+
42+
:param self: self reference
43+
"""
44+
# pilot logs lifetime in days
45+
self.clearPilotsDelay = self.am_getOption("ClearPilotsDelay", 30)
46+
# configured VOs and setup
47+
res = getVOs()
48+
if not res["OK"]:
49+
return res
50+
self.voList = res.get("Value", [])
51+
52+
if isinstance(self.voList, str):
53+
self.voList = [self.voList]
54+
55+
self.setup = gConfig.getValue("/DIRAC/Setup", None)
56+
57+
return S_OK()
58+
59+
def execute(self):
60+
"""
61+
Execute one agent cycle. Upload log files to the SE and register them in the DFC.
62+
Use a shifter proxy dynamically loaded for every VO
63+
64+
:param self: self reference
65+
"""
66+
voRes = {}
67+
for vo in self.voList:
68+
self.opsHelper = Operations(vo=vo, setup=self.setup)
69+
# is remote pilot logging enabled for the VO ?
70+
pilotLogging = self.opsHelper.getValue("/Pilot/RemoteLogging", False)
71+
if pilotLogging:
72+
res = self.opsHelper.getOptionsDict("Shifter/DataManager")
73+
if not res["OK"]:
74+
voRes[vo] = "No shifter defined - skipped"
75+
self.log.error(f"No shifter defined for VO: {vo} - skipping ...")
76+
continue
77+
78+
proxyUser = res["Value"].get("User")
79+
proxyGroup = res["Value"].get("Group")
80+
if proxyGroup is None or proxyUser is None:
81+
self.log.error(
82+
f"No proxy user or group defined for pilot: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}"
83+
)
84+
voRes[vo] = "No proxy user or group defined - skipped"
85+
continue
86+
87+
self.log.info(f"Proxy used for pilot logging: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
88+
res = self.executeForVO( # pylint: disable=unexpected-keyword-arg
89+
vo, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
90+
)
91+
if not res["OK"]:
92+
voRes[vo] = res["Message"]
93+
if voRes:
94+
for key, value in voRes.items():
95+
self.log.error(f"Error for {key} vo; message: {value}")
96+
voRes.update(S_ERROR("Agent cycle for some VO finished with errors"))
97+
return voRes
98+
return S_OK()
99+
100+
@executeWithUserProxy
101+
def executeForVO(self, vo):
102+
"""
103+
Execute one agent cycle for a VO. It obtains VO-specific configuration pilot options from the CS:
104+
UploadPath - the path where the VO wants to upload pilot logs. It has to start with a VO name (/vo/path).
105+
UploadSE - Storage element where the logs will be kept.
106+
107+
:param str vo: vo enabled for remote pilot logging
108+
:return: S_OK or S_ERROR
109+
:rtype: dict
110+
"""
111+
112+
self.log.info(f"Pilot files upload cycle started for VO: {vo}")
113+
res = self.opsHelper.getOptionsDict("Pilot")
114+
if not res["OK"]:
115+
return S_ERROR(f"No pilot section for {vo} vo")
116+
pilotOptions = res["Value"]
117+
uploadSE = pilotOptions.get("UploadSE")
118+
if uploadSE is None:
119+
return S_ERROR("Upload SE not defined")
120+
self.log.info(f"Pilot upload SE: {uploadSE}")
121+
122+
uploadPath = pilotOptions.get("UploadPath")
123+
if uploadPath is None:
124+
return S_ERROR(f"Upload path on SE {uploadSE} not defined")
125+
self.log.info(f"Pilot upload path: {uploadPath}")
126+
127+
client = TornadoPilotLoggingClient(useCertificates=True)
128+
resDict = client.getMetadata()
129+
130+
if not resDict["OK"]:
131+
return resDict
132+
133+
# vo-specific source log path:
134+
pilotLogPath = os.path.join(resDict["Value"]["LogPath"], vo)
135+
# check for new files and upload them
136+
if not os.path.exists(pilotLogPath):
137+
# not a disaster, the VO is enabled, but no logfiles were ever stored.
138+
return S_OK()
139+
# delete old pilot log files for the vo VO
140+
self.clearOldPilotLogs(pilotLogPath)
141+
142+
self.log.info(f"Pilot log files location = {pilotLogPath} for VO: {vo}")
143+
144+
# get finalised (.log) files from Tornado and upload them to the selected SE
145+
146+
files = [
147+
f for f in os.listdir(pilotLogPath) if os.path.isfile(os.path.join(pilotLogPath, f)) and f.endswith("log")
148+
]
149+
150+
if not files:
151+
self.log.info("No files to upload for this cycle")
152+
for elem in files:
153+
lfn = os.path.join(uploadPath, elem)
154+
name = os.path.join(pilotLogPath, elem)
155+
res = DataManager().putAndRegister(lfn=lfn, fileName=name, diracSE=uploadSE, overwrite=True)
156+
if not res["OK"]:
157+
self.log.error("Could not upload", f"to {uploadSE}: {res['Message']}")
158+
else:
159+
self.log.verbose("File uploaded: ", f"LFN = {res['Value']}")
160+
try:
161+
os.remove(name)
162+
except Exception as excp:
163+
self.log.exception("Cannot remove a local file after uploading", lException=excp)
164+
return S_OK()
165+
166+
def clearOldPilotLogs(self, pilotLogPath):
167+
"""
168+
Delete old pilot log files unconditionally. Assumes that pilotLogPath exists.
169+
170+
:param str pilotLogPath: log files directory
171+
:return: None
172+
:rtype: None
173+
"""
174+
175+
files = os.listdir(pilotLogPath)
176+
seconds = int(self.clearPilotsDelay) * 86400
177+
currentTime = time.time()
178+
179+
for file in files:
180+
fullpath = os.path.join(pilotLogPath, file)
181+
modifTime = os.stat(fullpath).st_mtime
182+
if modifTime < currentTime - seconds:
183+
self.log.debug(f" Deleting old log : {fullpath}")
184+
try:
185+
os.remove(fullpath)
186+
except Exception as excp:
187+
self.log.exception(f"Cannot remove an old log file after {fullpath}", lException=excp)

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,10 +1017,6 @@ def _getPilotOptions(self, queue, **kwargs):
10171017

10181018
pilotOptions.append("--pythonVersion=3")
10191019

1020-
# Debug
1021-
if self.pilotLogLevel.lower() == "debug":
1022-
pilotOptions.append("-ddd")
1023-
10241020
# DIRAC Extensions to be used in pilots
10251021
pilotExtensionsList = opsHelper.getValue("Pilot/Extensions", [])
10261022
extensionsList = []
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
""" Test class for PilotLoggingAgent Agent
2+
"""
3+
import os
4+
import time
5+
import tempfile
6+
7+
import pytest
8+
from unittest.mock import MagicMock, patch
9+
10+
# DIRAC Components
11+
import DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent as plaModule
12+
from DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent import PilotLoggingAgent
13+
from DIRAC import gLogger, gConfig, S_OK, S_ERROR
14+
15+
gLogger.setLevel("DEBUG")
16+
17+
# Mock Objects
18+
mockReply = MagicMock()
19+
mockReply1 = MagicMock()
20+
mockOperations = MagicMock()
21+
mockTornadoClient = MagicMock()
22+
mockDataManager = MagicMock()
23+
mockAM = MagicMock()
24+
mockNone = MagicMock()
25+
mockNone.return_value = None
26+
27+
upDict = {
28+
"OK": True,
29+
"Value": {"User": "proxyUser", "Group": "proxyGroup"},
30+
}
31+
32+
33+
@pytest.fixture
34+
def plaBase(mocker):
35+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.__init__")
36+
mocker.patch(
37+
"DIRAC.WorkloadManagementSystem.Agent.JobCleaningAgent.AgentModule._AgentModule__moduleProperties",
38+
side_effect=lambda x, y=None: y,
39+
create=True,
40+
)
41+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.AgentModule.am_getOption", return_value=mockAM)
42+
mocker.patch(
43+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.getVOs",
44+
return_value={"OK": True, "Value": ["gridpp", "lz"]},
45+
)
46+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.gConfig.getValue", return_value="GridPP")
47+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getValue", side_effect=mockReply)
48+
mocker.patch(
49+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations.getOptionsDict", side_effect=mockReply1
50+
)
51+
pla = PilotLoggingAgent()
52+
pla.log = gLogger
53+
pla._AgentModule__configDefaults = mockAM
54+
return pla
55+
56+
57+
@pytest.fixture
58+
def pla_initialised(mocker, plaBase):
59+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.PilotLoggingAgent.executeForVO")
60+
plaBase.initialize()
61+
return plaBase
62+
63+
64+
@pytest.fixture
65+
def pla(mocker, plaBase):
66+
mocker.patch(
67+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.TornadoPilotLoggingClient",
68+
side_effect=mockTornadoClient,
69+
)
70+
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.Operations", side_effect=mockOperations)
71+
mocker.patch(
72+
"DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.DataManager",
73+
side_effect=mockDataManager,
74+
)
75+
plaBase.initialize()
76+
return plaBase
77+
78+
79+
def test_initialize(plaBase):
80+
res = plaBase.initialize()
81+
assert plaBase.voList == plaModule.getVOs()["Value"]
82+
assert plaBase.setup is not None
83+
assert res == S_OK()
84+
85+
86+
@pytest.mark.parametrize(
87+
"mockReplyInput, expected, expectedExecOut, expected2",
88+
[
89+
("/Pilot/RemoteLogging", [True, False], S_OK(), upDict),
90+
("/Pilot/RemoteLogging", [False, False], S_OK(), upDict),
91+
("/Pilot/RemoteLogging", [True, False], S_ERROR("Execute for VO failed"), upDict),
92+
],
93+
)
94+
def test_execute(pla_initialised, mockReplyInput, expected, expectedExecOut, expected2):
95+
"""Testing a thin version of execute (executeForVO is mocked)"""
96+
assert pla_initialised.voList == plaModule.getVOs()["Value"]
97+
mockReply.side_effect = expected
98+
mockReply1.return_value = expected2
99+
# remote pilot logging on (gridpp only) and off.
100+
pla_initialised.executeForVO.return_value = expectedExecOut
101+
res = pla_initialised.execute()
102+
if not any(expected):
103+
pla_initialised.executeForVO.assert_not_called()
104+
else:
105+
assert pla_initialised.executeForVO.called
106+
pla_initialised.executeForVO.assert_called_with(
107+
"gridpp",
108+
proxyUserName=upDict["Value"]["User"],
109+
proxyUserGroup=upDict["Value"]["Group"],
110+
)
111+
assert res["OK"] == expectedExecOut["OK"]
112+
113+
114+
@pytest.mark.parametrize(
115+
"ppath, files, result",
116+
[
117+
("pilot/log/path/", ["file1.log", "file2.log", "file3.log"], S_OK()),
118+
("pilot/log/path/", [], S_OK()),
119+
],
120+
)
121+
def test_executeForVO(pla, ppath, files, result):
122+
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
123+
# full local temporary path:
124+
filepath = os.path.join(tempfile.TemporaryDirectory().name, ppath)
125+
# this is what getMetadata returns:
126+
resDict = {"OK": True, "Value": {"LogPath": filepath}}
127+
mockTornadoClient.return_value.getMetadata.return_value = resDict
128+
mockDataManager.return_value.putAndRegister.return_value = result
129+
if files:
130+
os.makedirs(os.path.join(filepath, "gridpp"), exist_ok=True)
131+
for elem in files:
132+
open(os.path.join(filepath, "gridpp", elem), "w")
133+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
134+
pla.opsHelper = mockOperations.return_value
135+
# success route
136+
res = pla.executeForVO(vo="gridpp")
137+
mockTornadoClient.assert_called_with(useCertificates=True)
138+
assert mockTornadoClient.return_value.getMetadata.called
139+
# only called with a non-empty file list:
140+
if files:
141+
assert mockDataManager.return_value.putAndRegister.called
142+
assert res == S_OK()
143+
144+
145+
def test_executeForVOMetaFails(pla):
146+
opsHelperValues = {"OK": True, "Value": {"UploadSE": "testUploadSE", "UploadPath": "/gridpp/uploadPath"}}
147+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
148+
pla.opsHelper = mockOperations.return_value
149+
# getMetadata call fails.
150+
mockTornadoClient.return_value.getMetadata.return_value = {"OK": False, "Message": "Failed, sorry.."}
151+
res = pla.executeForVO(vo="anything")
152+
assert res["OK"] is False
153+
154+
155+
@pytest.mark.parametrize(
156+
"opsHelperValues, expectedRes",
157+
[
158+
({"OK": True, "Value": {"UploadPath": "/gridpp/uploadPath"}}, S_ERROR("Upload SE not defined")),
159+
({"OK": True, "Value": {"UploadSE": "testUploadSE"}}, S_ERROR("Upload path on SE testUploadSE not defined")),
160+
({"OK": False}, S_ERROR(f"No pilot section for gridpp vo")),
161+
],
162+
)
163+
def test_executeForVOBadConfig(pla, opsHelperValues, expectedRes):
164+
"""Testing an incomplete configuration"""
165+
mockOperations.return_value.getOptionsDict.return_value = opsHelperValues
166+
pla.opsHelper = mockOperations.return_value
167+
res = pla.executeForVO(vo="gridpp")
168+
assert res["OK"] is False
169+
assert res["Message"] == expectedRes["Message"]
170+
mockTornadoClient.return_value.getMetadata.reset_mock()
171+
mockTornadoClient.return_value.getMetadata.assert_not_called()
172+
173+
174+
@pytest.mark.parametrize(
175+
"filename, fileAge, ageLimit, expectedResult", [("survives.log", 10, 20, True), ("getsdeleted.log", 21, 20, False)]
176+
)
177+
def test_oldLogsCleaner(plaBase, filename, fileAge, ageLimit, expectedResult):
178+
"""Testing old files removal"""
179+
plaBase.clearPilotsDelay = ageLimit
180+
filepath = tempfile.TemporaryDirectory().name
181+
os.makedirs(filepath, exist_ok=True)
182+
testfile = os.path.join(filepath, filename)
183+
fd = open(testfile, "w")
184+
fd.close()
185+
assert os.path.exists(testfile) is True
186+
# cannot patch os.stat globally because os.path.exists uses it !
187+
with patch("DIRAC.WorkloadManagementSystem.Agent.PilotLoggingAgent.os.stat") as mockOSStat:
188+
mockOSStat.return_value.st_mtime = time.time() - fileAge * 86400 # file older that fileAge in seconds
189+
plaBase.clearOldPilotLogs(filepath)
190+
assert os.path.exists(testfile) is expectedResult

0 commit comments

Comments
 (0)