Skip to content

Commit 4320b3b

Browse files
joaomariolagopatrickelectric
authored andcommitted
core:services:kraken: Publish ext logs to zenoh
1 parent f4a3e3b commit 4320b3b

File tree

3 files changed

+164
-0
lines changed

3 files changed

+164
-0
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import asyncio
2+
import json
3+
import time
4+
from typing import Callable, Dict, List, Optional, Tuple, cast
5+
6+
from commonwealth.utils.zenoh_helper import ZenohSession
7+
from config import SERVICE_NAME
8+
from extension.extension import Extension
9+
from harbor import ContainerManager
10+
from loguru import logger
11+
from settings import ExtensionSettings
12+
13+
14+
class ExtensionLogPublisher:
15+
_LEVEL_MAP: Dict[str, int] = {
16+
"FATAL": 5,
17+
"ERROR": 4,
18+
"ERR": 4,
19+
"WARNING": 3,
20+
"WARN": 3,
21+
"INFO": 2,
22+
"DEBUG": 1,
23+
"TRACE": 1,
24+
"UNKNOWN": 0,
25+
}
26+
27+
def __init__(self) -> None:
28+
self._zenoh_session = ZenohSession(SERVICE_NAME)
29+
self._tasks: Dict[str, asyncio.Task[None]] = {}
30+
31+
async def sync_with_running_extensions(self) -> None:
32+
desired_streams = await self._collect_desired_streams()
33+
if desired_streams is None:
34+
return
35+
self._start_missing_streams(desired_streams)
36+
self._stop_removed_streams(desired_streams)
37+
38+
async def shutdown(self) -> None:
39+
if not self._tasks:
40+
return
41+
for task in self._tasks.values():
42+
task.cancel()
43+
await asyncio.gather(*self._tasks.values(), return_exceptions=True)
44+
self._tasks.clear()
45+
46+
async def _collect_desired_streams(self) -> Optional[Dict[str, ExtensionSettings]]:
47+
try:
48+
running_containers = await ContainerManager.get_running_containers()
49+
except Exception as error:
50+
logger.debug(f"Unable to fetch running containers for extension logs: {error}")
51+
return None
52+
53+
running_names = {container.name.lstrip("/") for container in running_containers}
54+
extensions = cast(List[ExtensionSettings], Extension._fetch_settings())
55+
56+
desired: Dict[str, ExtensionSettings] = {}
57+
for extension in extensions:
58+
if not extension.enabled:
59+
continue
60+
container_name = extension.container_name()
61+
if container_name in running_names:
62+
desired[container_name] = extension
63+
return desired
64+
65+
def _start_missing_streams(self, desired_streams: Dict[str, ExtensionSettings]) -> None:
66+
for container_name, extension in desired_streams.items():
67+
if container_name in self._tasks:
68+
continue
69+
task = asyncio.create_task(self._stream_logs(extension))
70+
task.add_done_callback(self._make_cleanup_callback(container_name))
71+
self._tasks[container_name] = task
72+
73+
def _stop_removed_streams(self, desired_streams: Dict[str, ExtensionSettings]) -> None:
74+
for container_name in list(self._tasks.keys()):
75+
if container_name in desired_streams:
76+
continue
77+
task = self._tasks.pop(container_name)
78+
task.cancel()
79+
80+
def _make_cleanup_callback(self, container_name: str) -> Callable[[asyncio.Task[None]], None]:
81+
def _cleanup(task: asyncio.Task[None]) -> None:
82+
saved = self._tasks.get(container_name)
83+
if saved is task:
84+
self._tasks.pop(container_name, None)
85+
if task.cancelled():
86+
return
87+
exception = task.exception()
88+
if exception:
89+
logger.debug(f"Extension log stream for {container_name} ended with error: {exception}")
90+
91+
return _cleanup
92+
93+
async def _stream_logs(self, extension: ExtensionSettings) -> None:
94+
container_name = extension.container_name()
95+
topic = self._topic_for(extension)
96+
logger.debug(f"Starting extension log stream for {container_name} -> {topic}")
97+
98+
try:
99+
async for raw_line in ContainerManager.get_container_log_by_name(container_name):
100+
payload = self._format_log_payload(container_name, raw_line.rstrip("\n"))
101+
self._publish(topic, payload)
102+
except asyncio.CancelledError:
103+
logger.debug(f"Extension log stream for {container_name} cancelled")
104+
raise
105+
except Exception as error:
106+
logger.debug(f"Extension log stream for {container_name} stopped: {error}")
107+
108+
def _publish(self, topic: str, log_line: str) -> None:
109+
session = self._zenoh_session.session
110+
if session is None:
111+
return
112+
try:
113+
session.put(topic, log_line.encode("utf-8"))
114+
except Exception as error:
115+
logger.debug(f"Failed to publish extension log to {topic}: {error}")
116+
117+
@staticmethod
118+
def _topic_for(extension: ExtensionSettings) -> str:
119+
name = extension.identifier or extension.name or extension.container_name()
120+
safe_name = name.replace("/", "_").replace(" ", "_")
121+
return f"extensions/logs/{safe_name}"
122+
123+
@classmethod
124+
def _format_log_payload(cls, container_name: str, message: str) -> str:
125+
level, normalized_message = cls._extract_level(message)
126+
seconds, nanos = divmod(time.time_ns(), 1_000_000_000)
127+
payload = {
128+
"timestamp": {"sec": seconds, "nsec": nanos},
129+
"level": level,
130+
"message": normalized_message,
131+
"name": container_name,
132+
"file": "",
133+
"line": 0,
134+
}
135+
return json.dumps(payload)
136+
137+
@classmethod
138+
def _extract_level(cls, message: str) -> Tuple[int, str]:
139+
stripped = message.lstrip()
140+
upper = stripped.upper()
141+
for name, level in cls._LEVEL_MAP.items():
142+
prefixes = (
143+
f"{name}:",
144+
f"{name} ",
145+
f"[{name}]",
146+
f"{name}|",
147+
)
148+
for prefix in prefixes:
149+
if upper.startswith(prefix):
150+
remainder = stripped[len(prefix) :].lstrip()
151+
return level, remainder or stripped
152+
return 0, stripped

core/services/kraken/kraken.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from extension.exceptions import IncompatibleExtension
1010
from extension.extension import Extension
1111
from extension.models import ExtensionSource
12+
from extension_logs import ExtensionLogPublisher
1213
from harbor import ContainerManager
1314
from jobs import JobsManager
1415
from jobs.models import Job, JobMethod
@@ -24,6 +25,7 @@ def __init__(self) -> None:
2425
self._settings = self._manager.settings
2526
self.is_running = True
2627
self.manifest = ManifestManager.instance()
28+
self.extension_log_publisher = ExtensionLogPublisher()
2729

2830
def _extension_start_try_valid(self, extension: ExtensionSettings) -> bool:
2931
unique_entry = f"{extension.identifier}{extension.tag}"
@@ -178,5 +180,14 @@ async def start_cleaner_task(self) -> None:
178180

179181
await asyncio.sleep(60)
180182

183+
async def start_extension_logs_task(self) -> None:
184+
while self.is_running:
185+
try:
186+
await self.extension_log_publisher.sync_with_running_extensions()
187+
except Exception as error:
188+
logger.debug(f"Failed to sync extension log streams: {error}")
189+
await asyncio.sleep(2)
190+
181191
async def stop(self) -> None:
182192
self.is_running = False
193+
await self.extension_log_publisher.shutdown()

core/services/kraken/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ async def main() -> None:
3838
# Launch background tasks
3939
asyncio.create_task(kraken.start_cleaner_task())
4040
asyncio.create_task(kraken.start_starter_task())
41+
asyncio.create_task(kraken.start_extension_logs_task())
4142
asyncio.create_task(jobs.start())
4243

4344
await server.serve()

0 commit comments

Comments
 (0)