Skip to content

Commit 083e1ae

Browse files
authored
Merge pull request #242 from itsDNNS/feat/236-smart-capture-stt-adapter
feat: Smart Capture STT action adapter with async result linking
2 parents fc60ce3 + aa59576 commit 083e1ae

File tree

12 files changed

+861
-10
lines changed

12 files changed

+861
-10
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ QUICK-TEST-*.md
2424
phase*.png
2525
mockup*.png
2626

27+
# Virtual environments
28+
.venv/
29+
venv/
30+
2731
# Development artifacts
2832
*.backup*
2933
*.bak

app/main.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,16 @@ def polling_loop(config_mgr, storage, stop_event):
128128
notifier=notifier, smart_capture=smart_capture,
129129
)
130130

131+
# Wire STT adapter if Smart Capture enabled, STT configured, and not demo mode
132+
if smart_capture and config_mgr.is_speedtest_configured() and not config_mgr.is_demo_mode():
133+
from .smart_capture.adapters.speedtest import SpeedtestAdapter
134+
stt_adapter = SpeedtestAdapter(storage, config_mgr)
135+
smart_capture.register_adapter("capture", stt_adapter)
136+
stt_collector = next((c for c in collectors if c.name == "speedtest"), None)
137+
if stt_collector:
138+
stt_collector.on_import = stt_adapter.on_results_imported
139+
log.info("Smart Capture: STT adapter wired to speedtest collector")
140+
131141
# Inject collectors into web layer for manual polling and status endpoint
132142
modem_collector = next((c for c in collectors if c.name in ("modem", "demo")), None)
133143
if modem_collector:
@@ -234,6 +244,21 @@ def _run_collector(collector):
234244
log.error("%s: timed out after 120s", collector.name)
235245
future.cancel()
236246

247+
# ── Smart Capture expiry check (every 60s) ──
248+
if smart_capture and smart_capture.adapter_action_types:
249+
if not hasattr(polling_loop, '_sc_expiry_counter'):
250+
polling_loop._sc_expiry_counter = 0
251+
polling_loop._sc_expiry_counter += 1
252+
if polling_loop._sc_expiry_counter >= 60:
253+
polling_loop._sc_expiry_counter = 0
254+
from .tz import utc_cutoff
255+
cutoff = utc_cutoff(minutes=10)
256+
for action_type in smart_capture.adapter_action_types:
257+
expired = storage.expire_stale_fired(cutoff, action_type=action_type)
258+
if expired:
259+
log.info("Smart Capture: expired %d stale %s executions",
260+
expired, action_type)
261+
237262
stop_event.wait(1)
238263
finally:
239264
executor.shutdown(wait=False, cancel_futures=True)

app/modules/speedtest/collector.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(self, config_mgr, storage, web, poll_interval=300, **kwargs):
2121
self._web = web
2222
self._client = None
2323
self._last_url = None
24+
self.on_import = None # Optional callback for Smart Capture
2425

2526
def is_enabled(self) -> bool:
2627
return self._config_mgr.is_speedtest_configured()
@@ -49,17 +50,23 @@ def collect(self) -> CollectorResult:
4950
try:
5051
last_id = self._storage.get_latest_speedtest_id()
5152
cached_count = self._storage.get_speedtest_count()
52-
if cached_count < 50:
53+
is_backfill = cached_count < 50
54+
if is_backfill:
5355
new_results = self._client.get_results(per_page=2000)
5456
else:
5557
new_results = self._client.get_newer_than(last_id)
5658
if new_results:
59+
genuinely_new = [r for r in new_results if r.get("id", 0) > last_id]
5760
self._storage.save_speedtest_results(new_results)
5861
log.info(
5962
"Cached %d new speedtest results (total: %d)",
6063
len(new_results),
6164
cached_count + len(new_results),
6265
)
66+
# Skip on_import during initial backfill to avoid matching
67+
# historical results to fresh FIRED executions
68+
if genuinely_new and self.on_import and not is_backfill:
69+
self.on_import(genuinely_new)
6370
except Exception as e:
6471
log.warning("Speedtest delta cache failed: %s", e)
6572

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Smart Capture action adapters."""

app/smart_capture/adapters/base.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Base class for Smart Capture action adapters."""
2+
3+
4+
class ActionAdapter:
5+
"""Base class for Smart Capture action adapters.
6+
7+
Subclasses implement execute() to fire an action when a pending
8+
execution is created, and optionally on_results_imported() for
9+
async result linking.
10+
"""
11+
12+
def __init__(self, action_type: str):
13+
self.action_type = action_type
14+
15+
def execute(self, execution_id: int, event: dict) -> tuple[bool, str | None]:
16+
"""Fire the action. Returns (success, error_message).
17+
18+
Called by the engine immediately after save_execution(PENDING).
19+
On success: adapter should update execution to FIRED.
20+
On failure: adapter should update execution to EXPIRED with last_error.
21+
"""
22+
raise NotImplementedError
23+
24+
def on_results_imported(self, results: list[dict]):
25+
"""Called when new results are available for matching. Optional."""
26+
pass
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
"""Speedtest Tracker action adapter — triggers STT runs and links results."""
2+
3+
import logging
4+
from datetime import datetime, timezone, timedelta
5+
6+
import requests
7+
8+
from ..types import ExecutionStatus
9+
from .base import ActionAdapter
10+
11+
log = logging.getLogger("docsis.smart_capture.adapters.speedtest")
12+
13+
MATCH_WINDOW_SECONDS = 300 # 5 minutes
14+
15+
16+
class SpeedtestAdapter(ActionAdapter):
17+
"""Triggers a Speedtest Tracker run and matches imported results."""
18+
19+
def __init__(self, storage, config_mgr):
20+
super().__init__(action_type="capture")
21+
self._storage = storage
22+
self._config = config_mgr
23+
url = config_mgr.get("speedtest_tracker_url", "").rstrip("/")
24+
token = config_mgr.get("speedtest_tracker_token", "")
25+
self._run_url = f"{url}/api/v1/speedtests/run"
26+
self._session = requests.Session()
27+
self._session.headers.update({
28+
"Authorization": f"Bearer {token}",
29+
"Accept": "application/json",
30+
})
31+
32+
def execute(self, execution_id: int, event: dict) -> tuple[bool, str | None]:
33+
"""POST to STT run endpoint. Updates execution to FIRED or EXPIRED."""
34+
try:
35+
resp = self._session.post(self._run_url, timeout=15)
36+
if resp.status_code == 201:
37+
from ...tz import utc_now
38+
self._storage.update_execution(
39+
execution_id,
40+
status=ExecutionStatus.FIRED,
41+
fired_at=utc_now(),
42+
)
43+
log.info("Smart Capture: triggered STT run (execution #%d)", execution_id)
44+
return True, None
45+
else:
46+
error = f"STT returned {resp.status_code}: {resp.text[:200]}"
47+
self._storage.update_execution(
48+
execution_id,
49+
status=ExecutionStatus.EXPIRED,
50+
last_error=error,
51+
attempt_count=1,
52+
)
53+
log.warning("Smart Capture: STT trigger failed for #%d: %s",
54+
execution_id, error)
55+
return False, error
56+
except Exception as e:
57+
error = str(e)
58+
self._storage.update_execution(
59+
execution_id,
60+
status=ExecutionStatus.EXPIRED,
61+
last_error=error,
62+
attempt_count=1,
63+
)
64+
log.warning("Smart Capture: STT trigger error for #%d: %s",
65+
execution_id, error)
66+
return False, error
67+
68+
def on_results_imported(self, results: list[dict]):
69+
"""Match newly imported speedtest results to FIRED executions.
70+
71+
For each FIRED execution (FIFO, oldest first), find the closest
72+
result within the match window. This ensures the nearest result
73+
is selected when multiple results fall in the same window.
74+
"""
75+
fired = self._storage.get_fired_unmatched(self.action_type)
76+
if not fired:
77+
return
78+
79+
# Parse all result timestamps upfront
80+
parsed_results = []
81+
for result in results:
82+
result_ts = self._parse_timestamp(result.get("timestamp", ""))
83+
if result_ts is not None:
84+
parsed_results.append((result, result_ts))
85+
86+
if not parsed_results:
87+
return
88+
89+
matched_result_ids = set()
90+
91+
for execution in fired:
92+
fired_ts = self._parse_timestamp(execution.get("fired_at", ""))
93+
if fired_ts is None:
94+
continue
95+
96+
# Find the closest result within [fired_at, fired_at + 5min]
97+
window_end = fired_ts + timedelta(seconds=MATCH_WINDOW_SECONDS)
98+
best_result = None
99+
best_distance = None
100+
101+
for result, result_ts in parsed_results:
102+
if result.get("id") in matched_result_ids:
103+
continue
104+
if fired_ts <= result_ts <= window_end:
105+
distance = abs((result_ts - fired_ts).total_seconds())
106+
if best_distance is None or distance < best_distance:
107+
best_result = result
108+
best_distance = distance
109+
110+
if best_result is not None:
111+
from ...tz import utc_now
112+
ok = self._storage.claim_execution(
113+
execution["id"],
114+
expected_status="fired",
115+
new_status=ExecutionStatus.COMPLETED,
116+
completed_at=utc_now(),
117+
linked_result_id=best_result["id"],
118+
)
119+
if ok:
120+
log.info("Smart Capture: linked execution #%d to speedtest #%d",
121+
execution["id"], best_result["id"])
122+
matched_result_ids.add(best_result["id"])
123+
124+
@staticmethod
125+
def _parse_timestamp(ts: str) -> datetime | None:
126+
"""Parse ISO-8601 timestamp to UTC datetime via fromisoformat().
127+
128+
Handles Z-suffix, offset-bearing timestamps (+00:00, +02:00),
129+
and fractional seconds. All results are converted to UTC.
130+
"""
131+
if not ts:
132+
return None
133+
try:
134+
# fromisoformat handles offsets and fractional seconds natively
135+
# but needs Z replaced with +00:00 on Python < 3.11
136+
normalized = ts.replace("Z", "+00:00") if ts.endswith("Z") else ts
137+
dt = datetime.fromisoformat(normalized)
138+
# Convert to UTC if offset-aware
139+
if dt.tzinfo is not None:
140+
dt = dt.astimezone(timezone.utc)
141+
else:
142+
dt = dt.replace(tzinfo=timezone.utc)
143+
return dt
144+
except (ValueError, TypeError):
145+
return None

app/smart_capture/engine.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,36 @@ class SmartCaptureEngine:
1414
Sits alongside the NotificationDispatcher as an event consumer.
1515
Modules register triggers; the engine evaluates incoming events against
1616
them, applies guardrails, and writes execution records to storage.
17+
Registered adapters are called to execute actions for pending executions.
1718
"""
1819

1920
def __init__(self, storage, config_mgr):
2021
self._storage = storage
2122
self._config = config_mgr
2223
self._guardrails = GuardrailChain(config_mgr)
2324
self._triggers: list[Trigger] = []
25+
self._adapters: dict[str, object] = {}
2426

2527
@property
2628
def triggers(self) -> list[Trigger]:
2729
return list(self._triggers)
2830

31+
@property
32+
def adapter_action_types(self) -> list[str]:
33+
"""Return list of action types with registered adapters."""
34+
return list(self._adapters.keys())
35+
2936
def register_trigger(self, trigger: Trigger):
3037
"""Register a trigger. Duplicates (same event_type + action_type) are ignored."""
3138
if trigger not in self._triggers:
3239
self._triggers.append(trigger)
3340
log.info("Registered trigger: %s -> %s", trigger.event_type, trigger.action_type)
3441

42+
def register_adapter(self, action_type: str, adapter):
43+
"""Register an action adapter for an action type."""
44+
self._adapters[action_type] = adapter
45+
log.info("Registered adapter for action_type=%s", action_type)
46+
3547
def evaluate(self, events: list[dict]):
3648
"""Evaluate a batch of events against registered triggers.
3749
@@ -54,26 +66,32 @@ def _is_enabled(self) -> bool:
5466
return bool(val)
5567

5668
def _evaluate_event(self, event: dict):
57-
# Collect all matching triggers for this event
5869
matches = [(t, event) for t in self._triggers if t.matches(event)]
5970
if not matches:
6071
return
6172

62-
# Evaluate as a batch — global cooldown applies once per event
6373
results = self._guardrails.check_batch(matches)
6474

6575
for trigger, ev, allowed, reason in results:
6676
if allowed:
67-
self._storage.save_execution(
77+
execution_id = self._storage.save_execution(
6878
trigger_type=ev["event_type"],
6979
action_type=trigger.action_type,
7080
status=ExecutionStatus.PENDING,
7181
trigger_event_id=ev.get("_id"),
7282
trigger_timestamp=ev.get("timestamp"),
7383
details=ev.get("details"),
7484
)
75-
log.info("Smart Capture: pending execution for %s -> %s",
76-
ev["event_type"], trigger.action_type)
85+
log.info("Smart Capture: pending execution #%d for %s -> %s",
86+
execution_id, ev["event_type"], trigger.action_type)
87+
88+
adapter = self._adapters.get(trigger.action_type)
89+
if adapter:
90+
try:
91+
adapter.execute(execution_id, ev)
92+
except Exception as e:
93+
log.error("Smart Capture: adapter error for #%d: %s",
94+
execution_id, e)
7795
else:
7896
self._storage.save_execution(
7997
trigger_type=ev["event_type"],

0 commit comments

Comments
 (0)