Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import logging

from fastapi import FastAPI
from models_library.api_schemas_dynamic_sidecar.telemetry import DiskUsage
from pydantic import validate_call
from servicelib.rabbitmq import RPCRouter

from ...modules.system_monitor import get_disk_usage_monitor

_logger = logging.getLogger(__name__)

router = RPCRouter()


@router.expose()
@validate_call(config={"arbitrary_types_allowed": True})
async def update_disk_usage(app: FastAPI, *, usage: dict[str, DiskUsage]) -> None:
get_disk_usage_monitor(app).set_disk_usage_for_path(usage)
disk_usage_monitor = get_disk_usage_monitor(app)

if disk_usage_monitor is None:
_logger.warning(
"Disk usage monitor not initialized, could not update disk usage"
)
return

disk_usage_monitor.set_disk_usage_for_path(usage)
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ def create_app():
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
setup_system_monitor(app)

setup_mounted_fs(app)
setup_system_monitor(app)
setup_inputs(app)
setup_outputs(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class DiskUsageMonitor:

@cached_property
def _monitored_paths_set(self) -> set[Path]:
if not self.monitored_paths:
return set()
return set.union(*self.monitored_paths.values())

@cached_property
Expand Down Expand Up @@ -137,7 +139,7 @@ async def _publish_disk_usage(self, usage: dict[MountPathCategory, DiskUsage]):
self.app, user_id=self.user_id, node_id=self.node_id, usage=usage
)

async def _monitor(self) -> None:
async def get_disk_usage(self) -> dict[MountPathCategory, DiskUsage]:
measured_disk_usage = await self._get_measured_disk_usage()

local_disk_usage = self._get_local_disk_usage(measured_disk_usage)
Expand Down Expand Up @@ -167,12 +169,14 @@ async def _monitor(self) -> None:
msg = f"Could not assign {disk_usage=} for {folder_names=}"
raise RuntimeError(msg)

supported_usage = {k: v for k, v in usage.items() if k in _SUPPORTED_ITEMS}
return {k: v for k, v in usage.items() if k in _SUPPORTED_ITEMS}

async def _monitor(self) -> None:
disk_usage = await self.get_disk_usage()
# notify only when usage changes
if self._last_usage != supported_usage:
await self._publish_disk_usage(supported_usage)
self._last_usage = supported_usage
if self._last_usage != disk_usage:
await self._publish_disk_usage(disk_usage)
self._last_usage = disk_usage

async def setup(self) -> None:
self._monitor_task = create_periodic_task(
Expand Down Expand Up @@ -202,25 +206,30 @@ def _get_monitored_paths(app: FastAPI) -> dict[MountPathCategory, set[Path]]:
}


def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor:
disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor
return disk_usage_monitor
def create_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor:
settings: ApplicationSettings = app.state.settings
return DiskUsageMonitor(
app,
user_id=settings.DY_SIDECAR_USER_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL,
monitored_paths=_get_monitored_paths(app),
dy_volumes_mount_dir=settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR,
)


def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor | None:
if hasattr(app.state, "disk_usage_monitor"):
disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor
return disk_usage_monitor
return None


def setup_disk_usage(app: FastAPI) -> None:
async def on_startup() -> None:
with log_context(_logger, logging.INFO, "setup disk monitor"):
settings: ApplicationSettings = app.state.settings

app.state.disk_usage_monitor = disk_usage_monitor = DiskUsageMonitor(
app,
user_id=settings.DY_SIDECAR_USER_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL,
monitored_paths=_get_monitored_paths(app),
dy_volumes_mount_dir=settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR,
)
await disk_usage_monitor.setup()
app.state.disk_usage_monitor = create_disk_usage_monitor(app)
await app.state.disk_usage_monitor.setup()

async def on_shutdown() -> None:
with log_context(_logger, logging.INFO, "shutdown disk monitor"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,42 @@
from servicelib.logging_utils import log_context

from ...core.settings import SystemMonitorSettings
from ._disk_usage import setup_disk_usage
from ._disk_usage import (
create_disk_usage_monitor,
get_disk_usage_monitor,
setup_disk_usage,
)

_logger = logging.getLogger(__name__)


async def _display_current_disk_usage(app: FastAPI) -> None:
disk_usage_monitor = get_disk_usage_monitor(app)
if disk_usage_monitor is None:
disk_usage_monitor = create_disk_usage_monitor(app)

disk_usage = await disk_usage_monitor.get_disk_usage()
for name, entry in disk_usage.items():
_logger.info(
"Disk usage for '%s': total=%s, free=%s, used=%s, used_percent=%s",
name,
entry.total.human_readable(),
entry.free.human_readable(),
entry.used.human_readable(),
entry.used_percent,
)


def setup_system_monitor(app: FastAPI) -> None:
with log_context(_logger, logging.INFO, "setup system monitor"):
settings: SystemMonitorSettings = app.state.settings.SYSTEM_MONITOR_SETTINGS

if not settings.DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE:
if settings.DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE:
setup_disk_usage(app)
else:
_logger.warning("system monitor disabled")
return

setup_disk_usage(app)
async def on_startup() -> None:
await _display_current_disk_usage(app)

app.add_event_handler("startup", on_startup)
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def app(
) -> AsyncIterable[FastAPI]:
mocker.patch(
"simcore_service_dynamic_sidecar.modules.system_monitor._disk_usage._get_monitored_paths",
return_value=[],
return_value={},
)

async with LifespanManager(app):
Expand Down
Loading