Skip to content

Commit 1e23cae

Browse files
committed
feat: integrate spike suppression into collector pipeline (#141)
1 parent a31d992 commit 1e23cae

File tree

3 files changed

+43
-1
lines changed

3 files changed

+43
-1
lines changed

app/collectors/demo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from datetime import datetime, timedelta, timezone
1414

1515
from .base import Collector, CollectorResult
16-
from ..analyzer import _channel_bitrate_mbps
16+
from ..analyzer import _channel_bitrate_mbps, apply_spike_suppression
1717
from ..gaming_index import compute_gaming_index
1818

1919
log = logging.getLogger("docsis.collector.demo")
@@ -105,6 +105,7 @@ def collect(self) -> CollectorResult:
105105

106106
data = self._generate_data()
107107
analysis = self._analyzer(data)
108+
apply_spike_suppression(analysis, self._storage.get_latest_spike_timestamp())
108109

109110
# MQTT publishing
110111
if self._mqtt_pub:

app/collectors/modem.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import time
55

66
from .base import Collector, CollectorResult
7+
from ..analyzer import apply_spike_suppression
78
from ..gaming_index import compute_gaming_index
89

910
log = logging.getLogger("docsis.collector.modem")
@@ -51,6 +52,7 @@ def collect(self) -> CollectorResult:
5152

5253
data = self._driver.get_docsis_data()
5354
analysis = self._analyzer(data) # Call injected analyzer function
55+
apply_spike_suppression(analysis, self._storage.get_latest_spike_timestamp())
5456

5557
# MQTT publishing
5658
if self._mqtt_pub:

tests/test_collectors.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ def _make_collector(self, mqtt_pub=None):
227227
event_detector.check.return_value = []
228228

229229
storage = MagicMock()
230+
storage.get_latest_spike_timestamp.return_value = None
230231
web = MagicMock()
231232

232233
c = ModemCollector(
@@ -295,6 +296,43 @@ def test_name(self):
295296
assert c.name == "modem"
296297

297298

299+
class TestModemCollectorSpikeSuppression:
300+
"""Verify spike suppression is called in the collector pipeline."""
301+
302+
def test_modem_collector_calls_spike_suppression(self):
303+
"""ModemCollector calls apply_spike_suppression after analyze."""
304+
mock_driver = MagicMock(spec=ModemDriver)
305+
mock_driver.get_docsis_data.return_value = {"channelDs": {"docsis30": []}, "channelUs": {"docsis30": []}}
306+
mock_driver.get_device_info.return_value = {"model": "Test", "sw_version": "1.0"}
307+
mock_driver.get_connection_info.return_value = None
308+
309+
mock_storage = MagicMock()
310+
mock_storage.get_latest_spike_timestamp.return_value = None
311+
mock_web = MagicMock()
312+
mock_web._state = {}
313+
314+
fake_analysis = {
315+
"summary": {"health": "good", "health_issues": [], "ds_total": 0, "us_total": 0},
316+
"ds_channels": [],
317+
"us_channels": [],
318+
}
319+
mock_analyzer = MagicMock(return_value=fake_analysis)
320+
321+
collector = ModemCollector(
322+
driver=mock_driver,
323+
analyzer_fn=mock_analyzer,
324+
event_detector=MagicMock(),
325+
storage=mock_storage,
326+
mqtt_pub=None,
327+
web=mock_web,
328+
poll_interval=60,
329+
)
330+
331+
with patch("app.collectors.modem.apply_spike_suppression") as mock_suppress:
332+
collector.collect()
333+
mock_suppress.assert_called_once_with(fake_analysis, None)
334+
335+
298336
# ── SpeedtestCollector Tests ──
299337

300338

@@ -704,6 +742,7 @@ def _make_collector(self):
704742
analyzer_fn = MagicMock()
705743
event_detector = MagicMock()
706744
storage = MagicMock()
745+
storage.get_latest_spike_timestamp.return_value = None
707746
web = MagicMock()
708747
c = ModemCollector(
709748
driver=driver, analyzer_fn=analyzer_fn, event_detector=event_detector,

0 commit comments

Comments
 (0)