Skip to content

Commit 8cfee9c

Browse files
authored
Merge pull request #241 from itsDNNS/feat/235-smart-capture-engine
feat: Smart Capture execution engine, storage, and guardrails
2 parents 90603d9 + afd164b commit 8cfee9c

21 files changed

+1213
-12
lines changed

app/collectors/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ def data_dir(self):
5555
}
5656

5757

58-
def discover_collectors(config_mgr, storage, event_detector, mqtt_pub, web, analyzer, notifier=None):
58+
def discover_collectors(config_mgr, storage, event_detector, mqtt_pub, web, analyzer, notifier=None, smart_capture=None):
5959
"""Discover and instantiate all available collectors based on config.
6060
6161
Args:
@@ -84,6 +84,7 @@ def discover_collectors(config_mgr, storage, event_detector, mqtt_pub, web, anal
8484
web=web,
8585
poll_interval=config["poll_interval"],
8686
notifier=notifier,
87+
smart_capture=smart_capture,
8788
))
8889
# Modem collector (available if modem configured)
8990
elif config_mgr.is_configured():
@@ -107,6 +108,7 @@ def discover_collectors(config_mgr, storage, event_detector, mqtt_pub, web, anal
107108
web=web,
108109
poll_interval=config["poll_interval"],
109110
notifier=notifier,
111+
smart_capture=smart_capture,
110112
))
111113

112114
# Segment utilization collector (FritzBox only)

app/collectors/demo.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,15 @@ class DemoCollector(Collector):
4545

4646
name = "demo"
4747

48-
def __init__(self, analyzer_fn, event_detector, storage, mqtt_pub, web, poll_interval, notifier=None):
48+
def __init__(self, analyzer_fn, event_detector, storage, mqtt_pub, web, poll_interval, notifier=None, smart_capture=None):
4949
super().__init__(poll_interval)
5050
self._analyzer = analyzer_fn
5151
self._event_detector = event_detector
5252
self._storage = storage
5353
self._mqtt_pub = mqtt_pub
5454
self._web = web
5555
self._notifier = notifier
56+
self._smart_capture = smart_capture
5657
self._discovery_published = False
5758
self._poll_count = 0
5859
self._device_info = {
@@ -184,10 +185,12 @@ def collect(self) -> CollectorResult:
184185
# Event detection
185186
events = self._event_detector.check(analysis)
186187
if events:
187-
self._storage.save_events(events)
188+
self._storage.save_events_with_ids(events)
188189
log.info("Demo: detected %d event(s)", len(events))
189190
if self._notifier:
190191
self._notifier.dispatch(events)
192+
if self._smart_capture:
193+
self._smart_capture.evaluate(events)
191194

192195
return CollectorResult(source=self.name, data=analysis)
193196

app/collectors/modem.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class ModemCollector(Collector):
1616

1717
name = "modem"
1818

19-
def __init__(self, driver, analyzer_fn, event_detector, storage, mqtt_pub, web, poll_interval, notifier=None):
19+
def __init__(self, driver, analyzer_fn, event_detector, storage, mqtt_pub, web, poll_interval, notifier=None, smart_capture=None):
2020
super().__init__(poll_interval)
2121
self._driver = driver
2222
self._analyzer = analyzer_fn
@@ -25,6 +25,7 @@ def __init__(self, driver, analyzer_fn, event_detector, storage, mqtt_pub, web,
2525
self._mqtt_pub = mqtt_pub
2626
self._web = web
2727
self._notifier = notifier
28+
self._smart_capture = smart_capture
2829
self._device_info = None
2930
self._connection_info = None
3031
self._discovery_published = False
@@ -74,9 +75,11 @@ def collect(self) -> CollectorResult:
7475
# Event detection
7576
events = self._event_detector.check(analysis)
7677
if events:
77-
self._storage.save_events(events)
78+
self._storage.save_events_with_ids(events)
7879
log.info("Detected %d event(s)", len(events))
7980
if self._notifier:
8081
self._notifier.dispatch(events)
82+
if self._smart_capture:
83+
self._smart_capture.evaluate(events)
8184

8285
return CollectorResult(source=self.name, data=analysis)

app/config.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@
4848
"active_theme": "", # Module ID of active theme (empty = first available)
4949
"theme_registry_url": "https://raw.githubusercontent.com/itsDNNS/docsight-themes/main/registry.json",
5050
"health_hysteresis": 0,
51+
"sc_enabled": False,
52+
"sc_global_cooldown": 300,
53+
"sc_trigger_cooldown": 900,
54+
"sc_max_actions_per_hour": 4,
55+
"sc_flapping_window": 3600,
56+
"sc_flapping_threshold": 3,
5157
}
5258

5359
ENV_MAP = {
@@ -88,6 +94,12 @@
8894
"weather_latitude": "WEATHER_LATITUDE",
8995
"weather_longitude": "WEATHER_LONGITUDE",
9096
"health_hysteresis": "HEALTH_HYSTERESIS",
97+
"sc_enabled": "SC_ENABLED",
98+
"sc_global_cooldown": "SC_GLOBAL_COOLDOWN",
99+
"sc_trigger_cooldown": "SC_TRIGGER_COOLDOWN",
100+
"sc_max_actions_per_hour": "SC_MAX_ACTIONS_PER_HOUR",
101+
"sc_flapping_window": "SC_FLAPPING_WINDOW",
102+
"sc_flapping_threshold": "SC_FLAPPING_THRESHOLD",
91103
}
92104

93105
# Deprecated env vars (FRITZ_* -> MODEM_*) - checked as fallback
@@ -104,8 +116,10 @@
104116
"fritz_password": "modem_password",
105117
}
106118

107-
INT_KEYS = {"poll_interval", "web_port", "history_days", "notify_cooldown", "health_hysteresis"}
108-
BOOL_KEYS = {"demo_mode", "gaming_quality_enabled", "segment_utilization_enabled"}
119+
INT_KEYS = {"poll_interval", "web_port", "history_days", "notify_cooldown", "health_hysteresis",
120+
"sc_global_cooldown", "sc_trigger_cooldown", "sc_max_actions_per_hour",
121+
"sc_flapping_window", "sc_flapping_threshold"}
122+
BOOL_KEYS = {"demo_mode", "gaming_quality_enabled", "segment_utilization_enabled", "sc_enabled"}
109123

110124
URL_KEYS = {"modem_url", "bqm_url", "speedtest_tracker_url", "notify_webhook_url"}
111125
_ALLOWED_URL_SCHEMES = {"http", "https"}

app/main.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,26 @@ def polling_loop(config_mgr, storage, stop_event):
106106
notifier = NotificationDispatcher(config_mgr)
107107
log.info("Notifications: webhook configured")
108108

109+
# Smart Capture (optional)
110+
smart_capture = None
111+
if config_mgr.get("sc_enabled", False):
112+
from .smart_capture import SmartCaptureEngine, Trigger
113+
smart_capture = SmartCaptureEngine(storage, config_mgr)
114+
# Default trigger: modulation downgrades (v1 scope)
115+
smart_capture.register_trigger(Trigger(
116+
event_type="modulation_change",
117+
action_type="capture",
118+
min_severity="warning",
119+
require_details={"direction": "downgrade"},
120+
))
121+
log.info("Smart Capture: enabled with %d trigger(s)", len(smart_capture.triggers))
122+
109123
web.update_state(poll_interval=config["poll_interval"])
110124

111125
event_detector = EventDetector(hysteresis=config_mgr.get("health_hysteresis", 0))
112126
collectors = discover_collectors(
113127
config_mgr, storage, event_detector, mqtt_pub, web, analyzer,
114-
notifier=notifier,
128+
notifier=notifier, smart_capture=smart_capture,
115129
)
116130

117131
# Inject collectors into web layer for manual polling and status endpoint
@@ -171,6 +185,7 @@ def _run_collector(collector):
171185
web=web,
172186
poll_interval=config_mgr.get("poll_interval", 900),
173187
notifier=notifier,
188+
smart_capture=smart_capture,
174189
)
175190
collectors = [
176191
new_modem if c is modem_collector else c

app/smart_capture/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Smart Capture — event-driven execution engine with guardrails."""
2+
3+
from .engine import SmartCaptureEngine
4+
from .types import ExecutionStatus, Trigger
5+
6+
__all__ = ["SmartCaptureEngine", "ExecutionStatus", "Trigger"]

app/smart_capture/engine.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
"""Smart Capture execution engine — evaluates events, applies guardrails, records executions."""
2+
3+
import logging
4+
5+
from .guardrails import GuardrailChain
6+
from .types import ExecutionStatus, Trigger
7+
8+
log = logging.getLogger("docsis.smart_capture.engine")
9+
10+
11+
class SmartCaptureEngine:
12+
"""Core Smart Capture engine.
13+
14+
Sits alongside the NotificationDispatcher as an event consumer.
15+
Modules register triggers; the engine evaluates incoming events against
16+
them, applies guardrails, and writes execution records to storage.
17+
"""
18+
19+
def __init__(self, storage, config_mgr):
20+
self._storage = storage
21+
self._config = config_mgr
22+
self._guardrails = GuardrailChain(config_mgr)
23+
self._triggers: list[Trigger] = []
24+
25+
@property
26+
def triggers(self) -> list[Trigger]:
27+
return list(self._triggers)
28+
29+
def register_trigger(self, trigger: Trigger):
30+
"""Register a trigger. Duplicates (same event_type + action_type) are ignored."""
31+
if trigger not in self._triggers:
32+
self._triggers.append(trigger)
33+
log.info("Registered trigger: %s -> %s", trigger.event_type, trigger.action_type)
34+
35+
def evaluate(self, events: list[dict]):
36+
"""Evaluate a batch of events against registered triggers.
37+
38+
For each event, collect matching triggers, then pass them through
39+
check_batch() for batch-aware guardrail evaluation.
40+
Called from the modem collector after event detection.
41+
"""
42+
if not self._is_enabled():
43+
return
44+
if not events or not self._triggers:
45+
return
46+
47+
for event in events:
48+
self._evaluate_event(event)
49+
50+
def _is_enabled(self) -> bool:
51+
val = self._config.get("sc_enabled", False)
52+
if isinstance(val, str):
53+
return val.lower() in ("true", "1", "yes", "on")
54+
return bool(val)
55+
56+
def _evaluate_event(self, event: dict):
57+
# Collect all matching triggers for this event
58+
matches = [(t, event) for t in self._triggers if t.matches(event)]
59+
if not matches:
60+
return
61+
62+
# Evaluate as a batch — global cooldown applies once per event
63+
results = self._guardrails.check_batch(matches)
64+
65+
for trigger, ev, allowed, reason in results:
66+
if allowed:
67+
self._storage.save_execution(
68+
trigger_type=ev["event_type"],
69+
action_type=trigger.action_type,
70+
status=ExecutionStatus.PENDING,
71+
trigger_event_id=ev.get("_id"),
72+
trigger_timestamp=ev.get("timestamp"),
73+
details=ev.get("details"),
74+
)
75+
log.info("Smart Capture: pending execution for %s -> %s",
76+
ev["event_type"], trigger.action_type)
77+
else:
78+
self._storage.save_execution(
79+
trigger_type=ev["event_type"],
80+
action_type=trigger.action_type,
81+
status=ExecutionStatus.SUPPRESSED,
82+
trigger_event_id=ev.get("_id"),
83+
trigger_timestamp=ev.get("timestamp"),
84+
suppression_reason=reason,
85+
details=ev.get("details"),
86+
)
87+
log.info("Smart Capture: suppressed %s -> %s (%s)",
88+
ev["event_type"], trigger.action_type, reason)

app/smart_capture/guardrails.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""Smart Capture guardrails — rate limiting and flapping protection."""
2+
3+
import logging
4+
import threading
5+
import time
6+
7+
log = logging.getLogger("docsis.smart_capture.guardrails")
8+
9+
10+
class GuardrailChain:
11+
"""Applies guardrails in order: flapping, global cooldown, per-trigger
12+
cooldown, max actions per hour.
13+
14+
Thread-safe — all state access is protected by a lock.
15+
16+
Cooldown semantics: 0 = disabled (no cooldown, always allow).
17+
This differs from NotificationDispatcher where 0 = never send.
18+
19+
Global cooldown is batch-aware: it is checked once per source event
20+
(not per trigger), and updated only after all triggers for that event
21+
have been evaluated. This prevents a second trigger for the same event
22+
from being suppressed by the first trigger's fire.
23+
"""
24+
25+
def __init__(self, config_mgr):
26+
self._config = config_mgr
27+
self._lock = threading.Lock()
28+
self._last_global_fire: float = 0.0
29+
self._last_trigger_fire: dict[str, float] = {}
30+
self._hourly_fires: list[float] = []
31+
self._trigger_match_history: dict[str, list[float]] = {}
32+
33+
def check_batch(self, trigger_results):
34+
"""Evaluate guardrails for a batch of (trigger, event) pairs from one source event.
35+
36+
Args:
37+
trigger_results: list of (trigger, event) tuples that matched.
38+
39+
Returns:
40+
list of (trigger, event, allowed, reason) tuples.
41+
42+
Global cooldown is checked once against the batch. Per-trigger
43+
cooldown and hourly limit are checked per trigger. Flapping counts
44+
all matches (not just allowed ones) to detect chattering input.
45+
"""
46+
if not trigger_results:
47+
return []
48+
49+
results = []
50+
with self._lock:
51+
now = time.monotonic()
52+
53+
# 1. Global cooldown — checked once for the whole batch
54+
global_cd = int(self._config.get("sc_global_cooldown", 300))
55+
global_blocked = (
56+
global_cd > 0
57+
and (now - self._last_global_fire) < global_cd
58+
)
59+
global_reason = None
60+
if global_blocked:
61+
remaining = int(global_cd - (now - self._last_global_fire))
62+
global_reason = f"global_cooldown: {remaining}s remaining"
63+
64+
any_allowed = False
65+
for trigger, event in trigger_results:
66+
trigger_key = f"{trigger.event_type}:{trigger.action_type}"
67+
68+
# Record match for flapping (counts ALL matches, not just fires)
69+
history = self._trigger_match_history.get(trigger_key, [])
70+
window = int(self._config.get("sc_flapping_window", 3600))
71+
history = [t for t in history if (now - t) < window]
72+
history.append(now)
73+
self._trigger_match_history[trigger_key] = history
74+
75+
# 2. Flapping — checked before cooldowns
76+
threshold = int(self._config.get("sc_flapping_threshold", 3))
77+
if threshold > 0 and len(history) > threshold:
78+
results.append((trigger, event, False,
79+
f"flapping: {len(history)} matches in {window}s for {trigger_key}"))
80+
continue
81+
82+
# 3. Global cooldown
83+
if global_blocked:
84+
results.append((trigger, event, False, global_reason))
85+
continue
86+
87+
# 4. Per-trigger cooldown
88+
trigger_cd = int(self._config.get("sc_trigger_cooldown", 900))
89+
last_fire = self._last_trigger_fire.get(trigger_key, 0.0)
90+
if trigger_cd > 0 and (now - last_fire) < trigger_cd:
91+
remaining = int(trigger_cd - (now - last_fire))
92+
results.append((trigger, event, False,
93+
f"trigger_cooldown: {remaining}s remaining for {trigger_key}"))
94+
continue
95+
96+
# 5. Max actions per hour
97+
max_per_hour = int(self._config.get("sc_max_actions_per_hour", 4))
98+
cutoff = now - 3600
99+
self._hourly_fires = [t for t in self._hourly_fires if t > cutoff]
100+
if max_per_hour > 0 and len(self._hourly_fires) >= max_per_hour:
101+
results.append((trigger, event, False,
102+
f"max_actions_per_hour: {len(self._hourly_fires)}/{max_per_hour} used"))
103+
continue
104+
105+
# All passed
106+
self._last_trigger_fire[trigger_key] = now
107+
self._hourly_fires.append(now)
108+
any_allowed = True
109+
results.append((trigger, event, True, None))
110+
111+
# Update global cooldown only if at least one execution was allowed
112+
if any_allowed:
113+
self._last_global_fire = now
114+
115+
return results

0 commit comments

Comments
 (0)