|
| 1 | +"""Segment utilization collector for FritzBox cable modems.""" |
| 2 | + |
| 3 | +import logging |
| 4 | +import time |
| 5 | + |
| 6 | +import requests |
| 7 | + |
| 8 | +from app import fritzbox as fb |
| 9 | +from app.collectors.base import Collector, CollectorResult |
| 10 | +from app.storage.segment_utilization import SegmentUtilizationStorage |
| 11 | + |
| 12 | +log = logging.getLogger("docsis.collector.segment_utilization") |
| 13 | + |
| 14 | +MAINTENANCE_INTERVAL = 86400 # Run downsample + cleanup once per day |
| 15 | + |
| 16 | + |
| 17 | +def _last_non_null(values): |
| 18 | + """Return the last non-None value from a list, or None if all are None/empty.""" |
| 19 | + for v in reversed(values): |
| 20 | + if v is not None: |
| 21 | + return v |
| 22 | + return None |
| 23 | + |
| 24 | + |
| 25 | +class SegmentUtilizationCollector(Collector): |
| 26 | + """Polls FritzBox /api/v0/monitor/segment/0 for cable segment utilization.""" |
| 27 | + |
| 28 | + def __init__(self, config_mgr, storage, web=None, **kwargs): |
| 29 | + super().__init__(poll_interval_seconds=300) |
| 30 | + self._config = config_mgr |
| 31 | + self._storage = SegmentUtilizationStorage(storage.db_path) |
| 32 | + self._web = web |
| 33 | + self._last_maintenance: float = 0.0 |
| 34 | + |
| 35 | + @property |
| 36 | + def name(self): |
| 37 | + return "segment_utilization" |
| 38 | + |
| 39 | + def is_enabled(self): |
| 40 | + return self._config.get("modem_type") == "fritzbox" |
| 41 | + |
| 42 | + def collect(self): |
| 43 | + url = self._config.get("modem_url") |
| 44 | + try: |
| 45 | + sid = fb.login( |
| 46 | + url, |
| 47 | + self._config.get("modem_user"), |
| 48 | + self._config.get("modem_password"), |
| 49 | + ) |
| 50 | + except Exception as e: |
| 51 | + return CollectorResult.failure(self.name, str(e)) |
| 52 | + |
| 53 | + try: |
| 54 | + resp = requests.get( |
| 55 | + f"{url}/api/v0/monitor/segment/0", |
| 56 | + headers={"AUTHORIZATION": f"AVM-SID {sid}"}, |
| 57 | + timeout=15, |
| 58 | + ) |
| 59 | + resp.raise_for_status() |
| 60 | + except Exception as e: |
| 61 | + return CollectorResult.failure(self.name, f"API request failed: {e}") |
| 62 | + |
| 63 | + try: |
| 64 | + from datetime import datetime, timezone |
| 65 | + |
| 66 | + body = resp.json() |
| 67 | + data_items = body["data"] |
| 68 | + own = next(d for d in data_items if d["type"] == "own") |
| 69 | + total = next(d for d in data_items if d["type"] == "total") |
| 70 | + |
| 71 | + last_sample_time = body.get("lastSampleTime", 0) |
| 72 | + sample_interval_ms = body.get("sampleInterval", 60000) |
| 73 | + sample_interval_s = sample_interval_ms / 1000 |
| 74 | + n_samples = len(total["downstream"]) |
| 75 | + |
| 76 | + saved = 0 |
| 77 | + for i in range(n_samples): |
| 78 | + ds_t = total["downstream"][i] |
| 79 | + us_t = total["upstream"][i] |
| 80 | + ds_o = own["downstream"][i] |
| 81 | + us_o = own["upstream"][i] |
| 82 | + if ds_t is None and us_t is None: |
| 83 | + continue |
| 84 | + sample_epoch = last_sample_time - (n_samples - 1 - i) * sample_interval_s |
| 85 | + ts = datetime.fromtimestamp(sample_epoch, tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
| 86 | + self._storage.save_at(ts, ds_t, us_t, ds_o, us_o) |
| 87 | + saved += 1 |
| 88 | + |
| 89 | + ds_total = _last_non_null(total["downstream"]) |
| 90 | + us_total = _last_non_null(total["upstream"]) |
| 91 | + ds_own = _last_non_null(own["downstream"]) |
| 92 | + us_own = _last_non_null(own["upstream"]) |
| 93 | + |
| 94 | + log.info( |
| 95 | + "Segment utilization: DS %.1f%% (own %.2f%%), US %.1f%% (own %.2f%%) [%d samples stored]", |
| 96 | + ds_total or 0, ds_own or 0, us_total or 0, us_own or 0, saved, |
| 97 | + ) |
| 98 | + |
| 99 | + self._run_maintenance() |
| 100 | + |
| 101 | + return CollectorResult.ok( |
| 102 | + self.name, |
| 103 | + {"ds_total": ds_total, "us_total": us_total, "ds_own": ds_own, "us_own": us_own}, |
| 104 | + ) |
| 105 | + except Exception as e: |
| 106 | + return CollectorResult.failure(self.name, f"Parse failed: {e}") |
| 107 | + |
| 108 | + def _run_maintenance(self): |
| 109 | + """Run downsample + cleanup once per day.""" |
| 110 | + now = time.time() |
| 111 | + if (now - self._last_maintenance) < MAINTENANCE_INTERVAL: |
| 112 | + return |
| 113 | + self._last_maintenance = now |
| 114 | + try: |
| 115 | + removed = self._storage.downsample() |
| 116 | + if removed: |
| 117 | + log.info("Downsampled segment utilization: %d rows aggregated", removed) |
| 118 | + deleted = self._storage.cleanup() |
| 119 | + if deleted: |
| 120 | + log.info("Cleaned up segment utilization: %d old rows deleted", deleted) |
| 121 | + except Exception as e: |
| 122 | + log.warning("Segment utilization maintenance failed: %s", e) |
0 commit comments