Skip to content

Commit 520ff81

Browse files
core: Add health_monitor service
Signed-off-by: Patrick José Pereira <patrickelectric@gmail.com>
1 parent 4320b3b commit 520ff81

File tree

7 files changed

+1201
-0
lines changed

7 files changed

+1201
-0
lines changed

core/pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies = [
1313
"cable_guy",
1414
"commander",
1515
"disk_usage",
16+
"health_monitor",
1617
"helper",
1718
"kraken",
1819
"nmea_injector",
@@ -58,6 +59,7 @@ bridget = { workspace = true }
5859
cable_guy = { workspace = true }
5960
commander = { workspace = true }
6061
disk_usage = { workspace = true }
62+
health_monitor = { workspace = true }
6163
helper = { workspace = true }
6264
kraken = { workspace = true }
6365
nmea_injector = { workspace = true }
Lines changed: 361 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,361 @@
1+
#! /usr/bin/env python3
2+
3+
import asyncio
4+
import logging
5+
import os
6+
from collections import deque
7+
from typing import Any, Dict, List, Optional
8+
9+
import aiohttp
10+
import zenoh
11+
from commonwealth.mavlink_comm.MavlinkComm import MavlinkMessenger
12+
from commonwealth.utils.apis import GenericErrorHandlingRoute, PrettyJSONResponse
13+
from commonwealth.utils.logs import InterceptHandler, init_logger
14+
from commonwealth.utils.sentry_config import init_sentry_async
15+
from commonwealth.utils.zenoh_helper import ZenohSession
16+
from fastapi import APIRouter, FastAPI, Query, status
17+
from fastapi_versioning import VersionedFastAPI, versioned_api_route
18+
from loguru import logger
19+
from pydantic import BaseModel, Field
20+
from uvicorn import Config, Server
21+
22+
from monitor import (
23+
HealthHistory,
24+
HealthStateTracker,
25+
HealthSummary,
26+
KernelErrorTracker,
27+
UsbTracker,
28+
collect_extension_container_names,
29+
evaluate_disk,
30+
evaluate_extension_resources,
31+
evaluate_factory_mode,
32+
evaluate_memory,
33+
evaluate_packet_loss,
34+
evaluate_sysid_mismatch,
35+
evaluate_update_available,
36+
evaluate_voltage,
37+
merge_results,
38+
now_ms,
39+
)
40+
41+
SERVICE_NAME = "health-monitor"
42+
PORT = 9152
43+
44+
LINUX2REST_BASE = "http://127.0.0.1:6030"
45+
KRAKEN_BASE = "http://127.0.0.1:9134"
46+
VERSION_CHOOSER_BASE = "http://127.0.0.1:8081/v1.0"
47+
48+
EVENT_TOPIC = f"services/{SERVICE_NAME}/events"
49+
50+
DEFAULT_CHECK_INTERVAL = float(os.getenv("HEALTH_MONITOR_INTERVAL_SEC", "10"))
51+
DEFAULT_HISTORY_LIMIT = int(os.getenv("HEALTH_MONITOR_HISTORY_LIMIT", "500"))
52+
53+
DISK_FREE_BYTES_THRESHOLD = int(os.getenv("HEALTH_MONITOR_DISK_FREE_BYTES", str(2 * 1024**3)))
54+
DISK_FREE_PERCENT_THRESHOLD = float(os.getenv("HEALTH_MONITOR_DISK_FREE_PERCENT", "10"))
55+
56+
MEMORY_WARN_PERCENT = float(os.getenv("HEALTH_MONITOR_MEMORY_WARN_PERCENT", "90"))
57+
MEMORY_ERROR_PERCENT = float(os.getenv("HEALTH_MONITOR_MEMORY_ERROR_PERCENT", "95"))
58+
59+
KERNEL_ERROR_WINDOW_MS = int(os.getenv("HEALTH_MONITOR_KERNEL_WINDOW_MS", str(10 * 60 * 1000)))
60+
61+
PACKET_LOSS_ERROR_RATIO = float(os.getenv("HEALTH_MONITOR_PACKET_LOSS_RATIO", "0.02"))
62+
PACKET_LOSS_ERROR_COUNT = int(os.getenv("HEALTH_MONITOR_PACKET_LOSS_COUNT", "10"))
63+
64+
EXTENSION_CPU_THRESHOLD = float(os.getenv("HEALTH_MONITOR_EXTENSION_CPU_PERCENT", "80"))
65+
EXTENSION_MEMORY_THRESHOLD = float(os.getenv("HEALTH_MONITOR_EXTENSION_MEMORY_PERCENT", "80"))
66+
EXTENSION_DISK_THRESHOLD = float(os.getenv("HEALTH_MONITOR_EXTENSION_DISK_PERCENT", "80"))
67+
68+
VEHICLE_VOLTAGE_LOW = float(os.getenv("HEALTH_MONITOR_VOLTAGE_LOW", "10.5"))
69+
VEHICLE_VOLTAGE_HIGH = float(os.getenv("HEALTH_MONITOR_VOLTAGE_HIGH", "16.8"))
70+
71+
CONTAINER_SYSID = int(os.getenv("MAV_SYSTEM_ID", "1"))
72+
73+
logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG)
74+
init_logger(SERVICE_NAME)
75+
logger.info("Starting Health Monitor service")
76+
77+
78+
class HealthConfig(BaseModel):
79+
interval_sec: float
80+
history_limit: int
81+
disk_free_bytes: int
82+
disk_free_percent: float
83+
memory_warn_percent: float
84+
memory_error_percent: float
85+
kernel_error_window_ms: int
86+
packet_loss_ratio: float
87+
packet_loss_count: int
88+
extension_cpu_percent: float
89+
extension_memory_percent: float
90+
extension_disk_percent: float
91+
voltage_low: float
92+
voltage_high: float
93+
94+
95+
class HealthMonitor:
96+
def __init__(self) -> None:
97+
self._state = HealthStateTracker()
98+
self._history: deque = deque(maxlen=DEFAULT_HISTORY_LIMIT)
99+
self._kernel_tracker = KernelErrorTracker(KERNEL_ERROR_WINDOW_MS)
100+
self._usb_tracker = UsbTracker()
101+
self._zenoh = ZenohSession(SERVICE_NAME)
102+
self._mavlink = MavlinkMessenger()
103+
self._stop_event = asyncio.Event()
104+
self._http_session: Optional[aiohttp.ClientSession] = None
105+
106+
def stop(self) -> None:
107+
self._stop_event.set()
108+
self._zenoh.close()
109+
if self._http_session and not self._http_session.closed:
110+
asyncio.create_task(self._http_session.close())
111+
112+
async def run(self) -> None:
113+
while not self._stop_event.is_set():
114+
await self.evaluate_once()
115+
try:
116+
await asyncio.wait_for(self._stop_event.wait(), timeout=DEFAULT_CHECK_INTERVAL)
117+
except asyncio.TimeoutError:
118+
continue
119+
120+
async def evaluate_once(self) -> None:
121+
now = now_ms()
122+
checks: List[Any] = []
123+
124+
disks = await self._fetch_json(f"{LINUX2REST_BASE}/system/disk")
125+
if isinstance(disks, list):
126+
checks.append(evaluate_disk(disks, DISK_FREE_BYTES_THRESHOLD, DISK_FREE_PERCENT_THRESHOLD, now))
127+
128+
memory = await self._fetch_json(f"{LINUX2REST_BASE}/system/memory")
129+
if isinstance(memory, dict):
130+
checks.append(evaluate_memory(memory, MEMORY_WARN_PERCENT, MEMORY_ERROR_PERCENT, now))
131+
132+
kernel_buffer = await self._fetch_json(f"{LINUX2REST_BASE}/kernel_buffer")
133+
if isinstance(kernel_buffer, list):
134+
checks.append(self._kernel_tracker.evaluate(kernel_buffer, now))
135+
136+
serial_ports = await self._fetch_json(f"{LINUX2REST_BASE}/serial?udev=true")
137+
if isinstance(serial_ports, dict):
138+
checks.append(self._usb_tracker.evaluate(serial_ports, now))
139+
140+
networks = await self._fetch_json(f"{LINUX2REST_BASE}/system/network")
141+
if isinstance(networks, list):
142+
checks.append(evaluate_packet_loss(networks, PACKET_LOSS_ERROR_RATIO, PACKET_LOSS_ERROR_COUNT, now))
143+
144+
kraken_extensions = await self._fetch_json(f"{KRAKEN_BASE}/v1.0/installed_extensions")
145+
kraken_stats = await self._fetch_json(f"{KRAKEN_BASE}/v1.0/stats")
146+
if isinstance(kraken_extensions, list) and isinstance(kraken_stats, dict):
147+
extension_names = collect_extension_container_names(kraken_extensions)
148+
extension_stats = {name: kraken_stats.get(name, {}) for name in extension_names}
149+
checks.append(
150+
evaluate_extension_resources(
151+
extension_stats,
152+
EXTENSION_CPU_THRESHOLD,
153+
EXTENSION_MEMORY_THRESHOLD,
154+
EXTENSION_DISK_THRESHOLD,
155+
now,
156+
)
157+
)
158+
159+
vehicle_sysid = await self._get_vehicle_sysid()
160+
checks.append(evaluate_sysid_mismatch(vehicle_sysid, CONTAINER_SYSID, now))
161+
162+
voltage_mv = await self._get_vehicle_voltage(vehicle_sysid)
163+
checks.append(evaluate_voltage(voltage_mv, VEHICLE_VOLTAGE_LOW, VEHICLE_VOLTAGE_HIGH, now))
164+
165+
factory_mode = await self._is_factory_mode()
166+
checks.append(evaluate_factory_mode(factory_mode, now))
167+
168+
core_current = await self._fetch_json(f"{VERSION_CHOOSER_BASE}/version/current")
169+
core_available = await self._fetch_json(f"{VERSION_CHOOSER_BASE}/version/available/bluerobotics/blueos-core")
170+
if isinstance(core_available, dict):
171+
core_available_versions = core_available.get("remote", [])
172+
else:
173+
core_available_versions = []
174+
checks.append(
175+
evaluate_update_available(
176+
core_current if isinstance(core_current, dict) else None,
177+
core_available_versions,
178+
"updates.blueos_core.available",
179+
"BlueOS update available",
180+
now,
181+
)
182+
)
183+
184+
bootstrap_current_tag = await self._bootstrap_tag()
185+
bootstrap_available = await self._fetch_json(
186+
f"{VERSION_CHOOSER_BASE}/version/available/bluerobotics/blueos-bootstrap"
187+
)
188+
if isinstance(bootstrap_available, dict):
189+
bootstrap_versions = bootstrap_available.get("remote", [])
190+
else:
191+
bootstrap_versions = []
192+
if bootstrap_current_tag:
193+
checks.append(
194+
evaluate_update_available(
195+
{"tag": bootstrap_current_tag},
196+
bootstrap_versions,
197+
"updates.bootstrap.available",
198+
"Bootstrap update available",
199+
now,
200+
)
201+
)
202+
203+
# TODO(health_monitor): needs source - corrupt configuration files detection.
204+
# TODO(health_monitor): needs source - vehicle calibration status.
205+
# TODO(health_monitor): needs source - extension-published warnings channel.
206+
# TODO(health_monitor): needs source - output channel routing.
207+
208+
merged = merge_results(checks)
209+
events = self._state.diff_and_update(merged.active, merged.resolved)
210+
for event in events:
211+
self._publish_event(event)
212+
self._history.append(event)
213+
214+
def summary(self) -> HealthSummary:
215+
return HealthSummary(active=self._state.active_problems(), updated_at=now_ms())
216+
217+
def history_view(self, limit: int) -> HealthHistory:
218+
events = list(self._history)[-limit:]
219+
return HealthHistory(events=events)
220+
221+
def config(self) -> HealthConfig:
222+
return HealthConfig(
223+
interval_sec=DEFAULT_CHECK_INTERVAL,
224+
history_limit=DEFAULT_HISTORY_LIMIT,
225+
disk_free_bytes=DISK_FREE_BYTES_THRESHOLD,
226+
disk_free_percent=DISK_FREE_PERCENT_THRESHOLD,
227+
memory_warn_percent=MEMORY_WARN_PERCENT,
228+
memory_error_percent=MEMORY_ERROR_PERCENT,
229+
kernel_error_window_ms=KERNEL_ERROR_WINDOW_MS,
230+
packet_loss_ratio=PACKET_LOSS_ERROR_RATIO,
231+
packet_loss_count=PACKET_LOSS_ERROR_COUNT,
232+
extension_cpu_percent=EXTENSION_CPU_THRESHOLD,
233+
extension_memory_percent=EXTENSION_MEMORY_THRESHOLD,
234+
extension_disk_percent=EXTENSION_DISK_THRESHOLD,
235+
voltage_low=VEHICLE_VOLTAGE_LOW,
236+
voltage_high=VEHICLE_VOLTAGE_HIGH,
237+
)
238+
239+
async def _fetch_json(self, url: str, timeout: float = 2.0) -> Any:
240+
if self._http_session is None or self._http_session.closed:
241+
self._http_session = aiohttp.ClientSession()
242+
try:
243+
async with self._http_session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
244+
response.raise_for_status()
245+
return await response.json()
246+
except Exception as error:
247+
logger.debug(f"Failed to fetch {url}: {error}")
248+
return None
249+
250+
async def _get_vehicle_sysid(self) -> Optional[int]:
251+
try:
252+
return await self._mavlink.get_most_recent_vehicle_id()
253+
except Exception as error:
254+
logger.debug(f"Failed to fetch vehicle SYSID: {error}")
255+
return None
256+
257+
async def _get_vehicle_voltage(self, vehicle_sysid: Optional[int]) -> Optional[float]:
258+
if vehicle_sysid is None:
259+
return None
260+
try:
261+
message = await self._mavlink.get_mavlink_message("SYS_STATUS", vehicle_sysid, 1)
262+
return message.get("message", {}).get("voltage_battery")
263+
except Exception as error:
264+
logger.debug(f"Failed to fetch vehicle voltage: {error}")
265+
return None
266+
267+
async def _is_factory_mode(self) -> bool:
268+
current = await self._fetch_json(f"{VERSION_CHOOSER_BASE}/version/current")
269+
if isinstance(current, dict):
270+
return current.get("tag") == "factory"
271+
return False
272+
273+
async def _bootstrap_tag(self) -> Optional[str]:
274+
current = await self._fetch_json(f"{VERSION_CHOOSER_BASE}/bootstrap/current")
275+
if isinstance(current, str) and ":" in current:
276+
return current.split(":")[-1]
277+
if isinstance(current, str):
278+
return current
279+
return None
280+
281+
def _publish_event(self, event: BaseModel) -> None:
282+
session = self._zenoh.session
283+
if session is None:
284+
return
285+
payload = event.json()
286+
try:
287+
session.put(EVENT_TOPIC, payload, encoding=zenoh.Encoding.APPLICATION_JSON)
288+
except Exception as error:
289+
logger.debug(f"Failed to publish event: {error}")
290+
291+
292+
health_router = APIRouter(
293+
prefix="/health",
294+
tags=["health_monitor_v1"],
295+
route_class=versioned_api_route(1, 0),
296+
responses={status.HTTP_404_NOT_FOUND: {"description": "Not found"}},
297+
)
298+
299+
monitor = HealthMonitor()
300+
301+
302+
@health_router.get("/summary", response_model=HealthSummary, summary="Return active health issues.")
303+
async def get_summary() -> HealthSummary:
304+
return monitor.summary()
305+
306+
307+
@health_router.get("/history", response_model=HealthHistory, summary="Return recent health events.")
308+
async def get_history(limit: int = Query(200, ge=1, le=1000)) -> HealthHistory:
309+
return monitor.history_view(limit)
310+
311+
312+
@health_router.get("/config", response_model=HealthConfig, summary="Return health monitor configuration.")
313+
async def get_config() -> HealthConfig:
314+
return monitor.config()
315+
316+
317+
fast_api_app = FastAPI(
318+
title="Health Monitor API",
319+
description="Monitor system and vehicle health and emit events.",
320+
default_response_class=PrettyJSONResponse,
321+
)
322+
fast_api_app.router.route_class = GenericErrorHandlingRoute
323+
fast_api_app.include_router(health_router)
324+
325+
app = VersionedFastAPI(
326+
fast_api_app,
327+
version="1.0.0",
328+
prefix_format="/v{major}.{minor}",
329+
enable_latest=True,
330+
)
331+
332+
333+
@app.get("/")
334+
async def root() -> Dict[str, str]:
335+
return {"service": SERVICE_NAME}
336+
337+
338+
@app.on_event("startup")
339+
async def start_background_tasks() -> None:
340+
asyncio.create_task(monitor.run())
341+
342+
343+
@app.on_event("shutdown")
344+
async def shutdown_background_tasks() -> None:
345+
monitor.stop()
346+
347+
348+
async def main() -> None:
349+
try:
350+
await init_sentry_async(SERVICE_NAME)
351+
352+
config = Config(app=app, host="0.0.0.0", port=PORT, log_config=None)
353+
server = Server(config)
354+
355+
await server.serve()
356+
finally:
357+
logger.info("Health Monitor service stopped")
358+
359+
360+
if __name__ == "__main__":
361+
asyncio.run(main())

0 commit comments

Comments
 (0)