Skip to content

Commit 85ef7dc

Browse files
nicoschmdtpatrickelectric
authored andcommitted
kraken: get historical extension logs through zenoh queryable
1 parent a39882e commit 85ef7dc

6 files changed

Lines changed: 81 additions & 0 deletions

File tree

core/services/kraken/api/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ async def lifespan(fastapi_app: FastAPI) -> AsyncGenerator[None, None]: # pylin
4949
zenoh_router = ZenohRouter(SERVICE_NAME)
5050
zenoh_router.add_routes_to_zenoh(application)
5151

52+
5253
application = VersionedFastAPI(application, prefix_format="/v{major}.{minor}", enable_latest=True, lifespan=lifespan)
5354

5455

core/services/kraken/harbor/container.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ async def get_container_log_by_name(cls, container_name: str) -> AsyncGenerator[
123123
yield log_line
124124
logger.info(f"Finished streaming logs for {container_name}")
125125

126+
@classmethod
127+
async def get_container_historical_logs(cls, container_name: str) -> List[str]:
128+
async with DockerCtx() as client:
129+
try:
130+
container = await cls.get_raw_container_by_name(client, container_name)
131+
except ContainerNotFound as error:
132+
raise StackedHTTPException(status_code=status.HTTP_404_NOT_FOUND, error=error) from error
133+
134+
return await container.log(stdout=True, stderr=True, follow=False, stream=False) # type: ignore
135+
126136
@classmethod
127137
async def get_containers_stats(cls) -> Dict[str, ContainerUsageModel]:
128138
async with DockerCtx() as client:

core/services/kraken/main.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
from api import application
1616
from jobs import JobsManager
1717
from kraken import Kraken
18+
from zenoh_handlers import ( # noqa: F401 # pylint: disable=unused-import
19+
app as zenoh_app,
20+
)
1821

1922
kraken = Kraken()
2023
jobs = JobsManager()
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from . import app
2+
3+
__all__ = ["app"]
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from commonwealth.utils.zenoh_helper import ZenohRouter, ZenohSession
2+
from config import SERVICE_NAME
3+
from zenoh_handlers.extension_handler import ExtensionHandlers
4+
5+
session = ZenohSession(SERVICE_NAME)
6+
router = ZenohRouter(SERVICE_NAME)
7+
8+
# Extension
9+
extension_handlers = ExtensionHandlers(router)
10+
extension_handlers.register_queryables()
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from typing import Any, List, cast
2+
3+
from commonwealth.utils.zenoh_helper import ZenohRouter
4+
from extension.extension import Extension
5+
from extension_logs import ExtensionLogPublisher
6+
from harbor import ContainerManager
7+
from loguru import logger
8+
from settings import ExtensionSettings
9+
10+
11+
class ExtensionHandlers:
12+
def __init__(self, router: ZenohRouter) -> None:
13+
self.router = router
14+
15+
async def logs_request_handler(self, extension_name: str) -> dict[str, Any]:
16+
if not extension_name:
17+
return {"error": "extension_name parameter is required"}
18+
19+
try:
20+
extensions = cast(List[ExtensionSettings], Extension._fetch_settings())
21+
extension = next((ext for ext in extensions if extension_name in (ext.identifier, ext.name)), None)
22+
23+
if not extension:
24+
return {"error": f"Extension {extension_name} not found"}
25+
26+
if not extension.enabled:
27+
return {"error": f"Extension {extension_name} is not enabled"}
28+
29+
topic = ExtensionLogPublisher._topic_for(extension)
30+
31+
container_name = extension.container_name()
32+
33+
raw_logs = await ContainerManager.get_container_historical_logs(container_name)
34+
formatted_messages = []
35+
for raw_line in raw_logs:
36+
level, _ = ExtensionLogPublisher._extract_level(raw_line)
37+
formatted_messages.append(
38+
{
39+
"level": level,
40+
"message": raw_line,
41+
}
42+
)
43+
return {
44+
"status": "success",
45+
"messages": formatted_messages,
46+
"total_lines": len(formatted_messages),
47+
"topic": topic,
48+
}
49+
except Exception as e:
50+
logger.exception(f"Error handling logs request for {extension_name}")
51+
return {"error": str(e), "error_type": type(e).__name__}
52+
53+
def register_queryables(self) -> None:
54+
self.router.add_queryable("extension/logs/request", self.logs_request_handler)

0 commit comments

Comments
 (0)