Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ def create_app():
setup_background_log_fetcher(app)
setup_resource_tracking(app)
setup_notifications(app)
setup_system_monitor(app)

setup_mounted_fs(app)
setup_system_monitor(app)
setup_inputs(app)
setup_outputs(app)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def _publish_disk_usage(self, usage: dict[MountPathCategory, DiskUsage]):
self.app, user_id=self.user_id, node_id=self.node_id, usage=usage
)

async def _monitor(self) -> None:
async def get_disk_usage(self) -> dict[MountPathCategory, DiskUsage]:
measured_disk_usage = await self._get_measured_disk_usage()

local_disk_usage = self._get_local_disk_usage(measured_disk_usage)
Expand Down Expand Up @@ -167,12 +167,14 @@ async def _monitor(self) -> None:
msg = f"Could not assign {disk_usage=} for {folder_names=}"
raise RuntimeError(msg)

supported_usage = {k: v for k, v in usage.items() if k in _SUPPORTED_ITEMS}
return {k: v for k, v in usage.items() if k in _SUPPORTED_ITEMS}

async def _monitor(self) -> None:
disk_usage = await self.get_disk_usage()
# notify only when usage changes
if self._last_usage != supported_usage:
await self._publish_disk_usage(supported_usage)
self._last_usage = supported_usage
if self._last_usage != disk_usage:
await self._publish_disk_usage(disk_usage)
self._last_usage = disk_usage

async def setup(self) -> None:
self._monitor_task = create_periodic_task(
Expand Down Expand Up @@ -202,25 +204,30 @@ def _get_monitored_paths(app: FastAPI) -> dict[MountPathCategory, set[Path]]:
}


def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor:
disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor
return disk_usage_monitor
def create_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor:
settings: ApplicationSettings = app.state.settings
return DiskUsageMonitor(
app,
user_id=settings.DY_SIDECAR_USER_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL,
monitored_paths=_get_monitored_paths(app),
dy_volumes_mount_dir=settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR,
)


def get_disk_usage_monitor(app: FastAPI) -> DiskUsageMonitor | None:
if hasattr(app.state, "disk_usage_monitor"):
disk_usage_monitor: DiskUsageMonitor = app.state.disk_usage_monitor
return disk_usage_monitor
return None


def setup_disk_usage(app: FastAPI) -> None:
async def on_startup() -> None:
with log_context(_logger, logging.INFO, "setup disk monitor"):
settings: ApplicationSettings = app.state.settings

app.state.disk_usage_monitor = disk_usage_monitor = DiskUsageMonitor(
app,
user_id=settings.DY_SIDECAR_USER_ID,
node_id=settings.DY_SIDECAR_NODE_ID,
interval=settings.DYNAMIC_SIDECAR_TELEMETRY_DISK_USAGE_MONITOR_INTERVAL,
monitored_paths=_get_monitored_paths(app),
dy_volumes_mount_dir=settings.DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR,
)
await disk_usage_monitor.setup()
app.state.disk_usage_monitor = create_disk_usage_monitor(app)
await app.state.disk_usage_monitor.setup()

async def on_shutdown() -> None:
with log_context(_logger, logging.INFO, "shutdown disk monitor"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,42 @@
from servicelib.logging_utils import log_context

from ...core.settings import SystemMonitorSettings
from ._disk_usage import setup_disk_usage
from ._disk_usage import (
create_disk_usage_monitor,
get_disk_usage_monitor,
setup_disk_usage,
)

_logger = logging.getLogger(__name__)


async def _display_current_disk_usage(app: FastAPI) -> None:
disk_usage_monitor = get_disk_usage_monitor(app)
if disk_usage_monitor is None:
disk_usage_monitor = create_disk_usage_monitor(app)

disk_usage = await disk_usage_monitor.get_disk_usage()
for name, entry in disk_usage.items():
_logger.info(
"Disk usage for '%s': total=%s, free=%s, used=%s, used_percent=%s",
name,
entry.total.human_readable(),
entry.free.human_readable(),
entry.used.human_readable(),
entry.used_percent,
)


def setup_system_monitor(app: FastAPI) -> None:
with log_context(_logger, logging.INFO, "setup system monitor"):
settings: SystemMonitorSettings = app.state.settings.SYSTEM_MONITOR_SETTINGS

if not settings.DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE:
if settings.DY_SIDECAR_SYSTEM_MONITOR_TELEMETRY_ENABLE:
setup_disk_usage(app)
else:
_logger.warning("system monitor disabled")
return

setup_disk_usage(app)
async def on_startup() -> None:
await _display_current_disk_usage(app)

app.add_event_handler("startup", on_startup)
Loading