Skip to content

Commit efa94db

Browse files
authored
Merge pull request #7105 from martynia/integration_janusz_pilotlogsWrapper_get_dev
[Integration] Modify dirac-admin-get-pilot-output to get remote pilot log
2 parents c728ad4 + bad7e9e commit efa94db

File tree

7 files changed

+239
-9
lines changed

7 files changed

+239
-9
lines changed

src/DIRAC/Interfaces/API/DiracAdmin.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ def getJobPilotOutput(self, jobID, directory=""):
425425
426426
:param job: JobID
427427
:type job: integer or string
428+
:param str directory: a directory to download logs to.
428429
:return: S_OK,S_ERROR
429430
"""
430431
if not directory:
@@ -468,13 +469,13 @@ def getJobPilotOutput(self, jobID, directory=""):
468469

469470
#############################################################################
470471
def getPilotOutput(self, gridReference, directory=""):
471-
"""Retrieve the pilot output (std.out and std.err) for an existing job in the WMS.
472+
"""Retrieve the pilot output (std.out and std.err) for an existing pilot reference.
472473
473474
>>> gLogger.notice(dirac.getJobPilotOutput(12345))
474475
{'OK': True, 'Value': {}}
475476
476-
:param job: JobID
477-
:type job: integer or string
477+
:param str gridReference: pilot reference
478+
:param str directory: a directory to download logs to.
478479
:return: S_OK,S_ERROR
479480
"""
480481
if not isinstance(gridReference, str):
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""
2+
Pilot logging plugin abstract class.
3+
"""
4+
from abc import ABC, abstractmethod
5+
from DIRAC import S_OK, S_ERROR, gLogger
6+
7+
sLog = gLogger.getSubLogger(__name__)
8+
9+
10+
class DownloadPlugin(ABC):
11+
"""
12+
Remote pilot log retriever base abstract class. It defines abstract methods used to download log files from a remote
13+
storage to the server.
14+
Any pilot logger download plugin should inherit from this class and implement a (sub)set of methods required by
15+
:class:`PilotManagerHandler`.
16+
"""
17+
18+
@abstractmethod
19+
def getRemotePilotLogs(self, pilotStamp, vo):
20+
"""
21+
Pilot log getter method, carrying the unique pilot identity and a VO name.
22+
23+
:param str pilotStamp: pilot stamp.
24+
:param str vo: VO name of a pilot which generated the logs.
25+
:return: S_OK or S_ERROR
26+
:rtype: dict
27+
"""
28+
29+
pass
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
"""
2+
File cache pilot log downloader.
3+
"""
4+
import os
5+
import tempfile
6+
from DIRAC import S_OK, S_ERROR, gLogger, gConfig
7+
from DIRAC.DataManagementSystem.Client.DataManager import DataManager
8+
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
9+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
10+
from DIRAC.Core.Utilities.Proxy import executeWithUserProxy
11+
from DIRAC.WorkloadManagementSystem.Client.PilotLoggingPlugins.DownloadPlugin import DownloadPlugin
12+
from DIRAC.WorkloadManagementSystem.Client.TornadoPilotLoggingClient import TornadoPilotLoggingClient
13+
14+
sLog = gLogger.getSubLogger(__name__)
15+
16+
17+
class FileCacheDownloadPlugin(DownloadPlugin):
18+
"""
19+
Class to handle log file download from an SE
20+
"""
21+
22+
def __init__(self):
23+
"""
24+
Sets the client for downloading incomplete log files from the server cache.
25+
26+
"""
27+
self.tornadoClient = TornadoPilotLoggingClient()
28+
29+
def getRemotePilotLogs(self, pilotStamp, vo=None):
30+
"""
31+
Pilot log getter method, carrying the unique pilot identity and a VO name.
32+
33+
:param str pilotStamp: pilot stamp.
34+
:param str vo: VO name of a user/pilot which generated the logs.
35+
:return: S_OK or S_ERROR
36+
:rtype: dict
37+
"""
38+
39+
opsHelper = Operations(vo=vo)
40+
uploadPath = opsHelper.getValue("Pilot/UploadPath", "")
41+
lfn = os.path.join(uploadPath, pilotStamp + ".log")
42+
sLog.info("LFN to download: ", lfn)
43+
44+
# get pilot credentials which uploaded logs to an external storage:
45+
res = opsHelper.getOptionsDict("Shifter/DataManager")
46+
if not res["OK"]:
47+
message = f"No shifter defined for VO: {vo} - needed to retrieve the logs !"
48+
sLog.error(message)
49+
return S_ERROR(message)
50+
51+
proxyUser = res["Value"].get("User")
52+
proxyGroup = res["Value"].get("Group")
53+
54+
sLog.info(f"Proxy used for retrieving pilot logs: VO: {vo}, User: {proxyUser}, Group: {proxyGroup}")
55+
56+
# attempt to get logs from server first:
57+
res = self._getLogsFromServer(pilotStamp, vo)
58+
if not res["OK"]:
59+
# from SE:
60+
res = self._downloadLogs( # pylint: disable=unexpected-keyword-arg
61+
lfn, pilotStamp, proxyUserName=proxyUser, proxyUserGroup=proxyGroup
62+
)
63+
64+
return res
65+
66+
@executeWithUserProxy
67+
def _downloadLogs(self, lfn, pilotStamp):
68+
filepath = tempfile.TemporaryDirectory().name
69+
os.makedirs(filepath, exist_ok=True)
70+
71+
res = DataManager().getFile(lfn, destinationDir=filepath)
72+
sLog.debug("getFile result:", res)
73+
if not res["OK"]:
74+
sLog.error(f"Failed to contact storage")
75+
return res
76+
if lfn in res["Value"]["Failed"]:
77+
sLog.error("Failed to retrieve a log file:", res["Value"]["Failed"])
78+
return S_ERROR(f"Failed to retrieve a log file: {res['Value']['Failed']}")
79+
try:
80+
filename = os.path.join(filepath, pilotStamp + ".log")
81+
with open(filename) as f:
82+
stdout = f.read()
83+
except FileNotFoundError as err:
84+
sLog.error(f"Error opening a log file:{filename}", err)
85+
return S_ERROR(repr(err))
86+
87+
resultDict = {}
88+
resultDict["StdOut"] = stdout
89+
return S_OK(resultDict)
90+
91+
@executeWithUserProxy
92+
def _getLogsFromServer(self, logfile, vo):
93+
"""
94+
Get a file from the server cache area. The file is most likely not finalised, since finalised files
95+
are copied to an SE and deleted. Both logfile.log and logfile are tried should the finalised file still
96+
be on the server.
97+
98+
:param str logfile: pilot log filename
99+
:param str vo: VO name
100+
:return: S_OK or S_ERROR
101+
:rtype: dict
102+
"""
103+
104+
res = self.tornadoClient.getLogs(logfile, vo)
105+
if not res["OK"]:
106+
res = self.tornadoClient.getLogs(logfile + ".log", vo)
107+
return res

src/DIRAC/WorkloadManagementSystem/Client/PilotLoggingPlugins/FileCacheLoggingPlugin.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,26 @@ def getMeta(self):
104104
return S_OK(self.meta)
105105
return S_ERROR("No Pilot logging directory defined")
106106

107+
def getLogs(self, logfile, vo):
108+
"""
109+
Get the "instant" logs from Tornado log storage area. There are not finalised (incomplete) logs.
110+
111+
:return: Dirac S_OK containing the logs
112+
:rtype: dict
113+
"""
114+
115+
filename = os.path.join(self.meta["LogPath"], vo, logfile)
116+
resultDict = {}
117+
try:
118+
with open(filename) as f:
119+
stdout = f.read()
120+
resultDict["StdOut"] = stdout
121+
except FileNotFoundError as err:
122+
sLog.error(f"Error opening a log file:{filename}", err)
123+
return S_ERROR(repr(err))
124+
125+
return S_OK(resultDict)
126+
107127
def _verifyUUIDPattern(self, logfile):
108128
"""
109129
Verify if the name of the log file matches the required pattern.

src/DIRAC/WorkloadManagementSystem/Service/PilotManagerHandler.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
from DIRAC import S_OK, S_ERROR
88
import DIRAC.Core.Utilities.TimeUtilities as TimeUtilities
9-
10-
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
119
from DIRAC.Core.Utilities.Decorators import deprecated
10+
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
11+
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOForGroup
12+
from DIRAC.Core.DISET.RequestHandler import getServiceOption
1213
from DIRAC.Core.DISET.RequestHandler import RequestHandler
1314
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
14-
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getUsernameForDN, getDNForUsername
1515
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
1616
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import (
1717
getPilotCE,
@@ -35,6 +35,10 @@ def initializeHandler(cls, serviceInfoDict):
3535
except RuntimeError as excp:
3636
return S_ERROR(f"Can't connect to DB: {excp}")
3737

38+
# prepare remote pilot plugin initialization
39+
defaultOption, defaultClass = "DownloadPlugin", "FileCacheDownloadPlugin"
40+
cls.configValue = getServiceOption(serviceInfoDict, defaultOption, defaultClass)
41+
cls.loggingPlugin = None
3842
return S_OK()
3943

4044
##############################################################################
@@ -92,9 +96,16 @@ def export_addPilotTQRef(cls, pilotRef, taskQueueID, ownerGroup, gridType="DIRAC
9296
types_getPilotOutput = [str]
9397

9498
def export_getPilotOutput(self, pilotReference):
95-
"""Get the pilot job standard output and standard error files for the Grid
96-
job reference
9799
"""
100+
Get the pilot job standard output and standard error files for a pilot reference.
101+
Handles both classic, CE-based logs and remote logs. The type of logs returned is determined
102+
by the server.
103+
104+
:param str pilotReference:
105+
:return: S_OK or S_ERROR Dirac object
106+
:rtype: dict
107+
"""
108+
98109
result = self.pilotAgentsDB.getPilotInfo(pilotReference)
99110
if not result["OK"]:
100111
self.log.error("Failed to get info for pilot", result["Message"])
@@ -104,6 +115,26 @@ def export_getPilotOutput(self, pilotReference):
104115
return S_ERROR("Pilot info is empty")
105116

106117
pilotDict = result["Value"][pilotReference]
118+
vo = getVOForGroup(pilotDict["OwnerGroup"])
119+
opsHelper = Operations(vo=vo)
120+
remote = opsHelper.getValue("Pilot/RemoteLogsPriority", False)
121+
# classic logs first, by default
122+
funcs = [self._getPilotOutput, self._getRemotePilotOutput]
123+
if remote:
124+
funcs.reverse()
125+
126+
result = funcs[0](pilotReference, pilotDict)
127+
if not result["OK"]:
128+
self.log.warn("Pilot log retrieval failed (first attempt), remote ?", remote)
129+
result = funcs[1](pilotReference, pilotDict)
130+
return result
131+
else:
132+
return result
133+
134+
def _getPilotOutput(self, pilotReference, pilotDict):
135+
"""Get the pilot job standard output and standard error files for the Grid
136+
job reference
137+
"""
107138

108139
group = pilotDict["OwnerGroup"]
109140

@@ -158,6 +189,39 @@ def export_getPilotOutput(self, pilotReference):
158189
shutil.rmtree(ce.ceParameters["WorkingDirectory"])
159190
return S_OK(resultDict)
160191

192+
def _getRemotePilotOutput(self, pilotReference, pilotDict):
193+
"""
194+
Get remote pilot log files.
195+
196+
:param str pilotReference:
197+
:return: S_OK Dirac object
198+
:rtype: dict
199+
"""
200+
201+
pilotStamp = pilotDict["PilotStamp"]
202+
group = pilotDict["OwnerGroup"]
203+
vo = getVOForGroup(group)
204+
205+
if self.loggingPlugin is None:
206+
result = ObjectLoader().loadObject(
207+
f"WorkloadManagementSystem.Client.PilotLoggingPlugins.{self.configValue}", self.configValue
208+
)
209+
if not result["OK"]:
210+
self.log.error("Failed to load LoggingPlugin", f"{self.configValue}: {result['Message']}")
211+
return result
212+
213+
componentClass = result["Value"]
214+
self.loggingPlugin = componentClass()
215+
self.log.info("Loaded: PilotLoggingPlugin class", self.configValue)
216+
217+
res = self.loggingPlugin.getRemotePilotLogs(pilotStamp, vo)
218+
219+
if res["OK"]:
220+
res["Value"]["OwnerGroup"] = group
221+
res["Value"]["FileList"] = []
222+
# return res, correct or not
223+
return res
224+
161225
##############################################################################
162226
types_getPilotInfo = [[list, str]]
163227

src/DIRAC/WorkloadManagementSystem/Service/TornadoPilotLoggingHandler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,16 @@ def export_getMetadata(self):
8080
"""
8181
return self.loggingPlugin.getMeta()
8282

83+
def export_getLogs(self, logfile, vo):
84+
"""
85+
Get (not yet finalised) logs from the server.
86+
87+
:return: S_OK containing a metadata dictionary
88+
:rtype: dict
89+
"""
90+
91+
return self.loggingPlugin.getLogs(logfile, vo)
92+
8393
def export_finaliseLogs(self, payload, pilotUUID):
8494
"""
8595
Finalise a log file. Finalised logfile can be copied to a secure location, if a file cache is used.

src/DIRAC/WorkloadManagementSystem/Service/WMSAdministratorHandler.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,6 @@ def export_getJobPilotOutput(self, jobID):
178178
job reference
179179
180180
:param str jobID: job ID
181-
182181
:return: S_OK(dict)/S_ERROR()
183182
"""
184183
pilotReference = ""

0 commit comments

Comments
 (0)