1- """DOCSIS channel health analysis with configurable thresholds."""
1+ """DOCSIS channel health analysis with configurable thresholds.
22
3+ Thresholds are loaded from thresholds.json (Vodafone VFKD guidelines).
4+ The file supports per-modulation thresholds for DS power, US power, and SNR.
5+ """
6+
7+ import json
38import logging
9+ import os
410
511log = logging .getLogger ("docsis.analyzer" )
612
7- # --- Reference thresholds ---
8- # Downstream Power (dBmV): ideal 0, good -7..+7, marginal -10..+10
9- DS_POWER_WARN = 7.0
10- DS_POWER_CRIT = 10.0
13+ # --- Load thresholds from JSON ---
14+ _THRESHOLDS_PATH = os .path .join (os .path .dirname (__file__ ), "thresholds.json" )
15+ _thresholds = {}
16+
17+
18+ def _load_thresholds ():
19+ """Load thresholds from JSON file. Falls back to hardcoded defaults."""
20+ global _thresholds
21+ try :
22+ with open (_THRESHOLDS_PATH , "r" ) as f :
23+ _thresholds = json .load (f )
24+ log .info ("Loaded thresholds from %s" , _THRESHOLDS_PATH )
25+ except (FileNotFoundError , json .JSONDecodeError ) as e :
26+ log .warning ("Could not load thresholds.json (%s), using defaults" , e )
27+ _thresholds = {}
28+
29+
30+ def _get_ds_power_thresholds (modulation = None ):
31+ """Get DS power thresholds for a given modulation."""
32+ ds = _thresholds .get ("downstream_power" , {})
33+ default_mod = ds .get ("_default" , "256QAM" )
34+ mod = modulation if modulation in ds else default_mod
35+ t = ds .get (mod , {})
36+ return {
37+ "good_min" : t .get ("good_min" , - 4.0 ),
38+ "good_max" : t .get ("good_max" , 13.0 ),
39+ "crit_min" : t .get ("immediate_min" , - 8.0 ),
40+ "crit_max" : t .get ("immediate_max" , 20.0 ),
41+ }
1142
12- # Upstream Power (dBmV): good 35-49, marginal 50-54, bad >54
13- US_POWER_WARN = 50.0
14- US_POWER_CRIT = 54.0
1543
16- # SNR / MER (dB): good >30, marginal 25-30, bad <25
17- SNR_WARN = 30.0
18- SNR_CRIT = 25.0
44+ def _get_us_power_thresholds (docsis_version = None ):
45+ """Get US power thresholds for a given DOCSIS version."""
46+ us = _thresholds .get ("upstream_power" , {})
47+ default_ver = us .get ("_default" , "EuroDOCSIS 3.0" )
48+ # Map version strings
49+ ver = default_ver
50+ if docsis_version in ("3.1" , "DOCSIS 3.1" ):
51+ ver = "DOCSIS 3.1"
52+ elif docsis_version in ("3.0" , "EuroDOCSIS 3.0" ):
53+ ver = "EuroDOCSIS 3.0"
54+ t = us .get (ver , us .get (default_ver , {}))
55+ return {
56+ "good_min" : t .get ("good_min" , 41.0 ),
57+ "good_max" : t .get ("good_max" , 47.0 ),
58+ "crit_min" : t .get ("immediate_min" , 35.0 ),
59+ "crit_max" : t .get ("immediate_max" , 53.0 ),
60+ }
1961
20- # Uncorrectable errors threshold
21- UNCORR_ERRORS_CRIT = 10000
62+
63+ def _get_snr_thresholds (modulation = None ):
64+ """Get SNR thresholds for a given modulation."""
65+ snr = _thresholds .get ("snr" , {})
66+ default_mod = snr .get ("_default" , "256QAM" )
67+ mod = modulation if modulation in snr else default_mod
68+ t = snr .get (mod , {})
69+ return {
70+ "good_min" : t .get ("good_min" , 33.0 ),
71+ "crit_min" : t .get ("immediate_min" , 29.0 ),
72+ }
73+
74+
75+ def _get_uncorr_threshold ():
76+ return _thresholds .get ("errors" , {}).get ("uncorrectable_threshold" , 10000 )
77+
78+
79+ # Load on module import
80+ _load_thresholds ()
2281
2382
2483def _parse_float (val , default = 0.0 ):
@@ -48,36 +107,39 @@ def _assess_ds_channel(ch, docsis_ver):
48107 """Assess a single downstream channel. Returns (health, health_detail)."""
49108 issues = []
50109 power = _parse_float (ch .get ("powerLevel" ))
110+ modulation = (ch .get ("modulation" ) or ch .get ("type" ) or "" ).upper ().replace ("-" , "" )
51111
52- if abs (power ) > DS_POWER_CRIT :
112+ pt = _get_ds_power_thresholds (modulation )
113+ if power < pt ["crit_min" ] or power > pt ["crit_max" ]:
53114 issues .append ("power critical" )
54- elif abs ( power ) > DS_POWER_WARN :
115+ elif power < pt [ "good_min" ] or power > pt [ "good_max" ] :
55116 issues .append ("power warning" )
56117
118+ snr_val = None
57119 if docsis_ver == "3.0" and ch .get ("mse" ):
58- snr = abs (_parse_float (ch ["mse" ]))
59- if snr < SNR_CRIT :
60- issues .append ("snr critical" )
61- elif snr < SNR_WARN :
62- issues .append ("snr warning" )
120+ snr_val = abs (_parse_float (ch ["mse" ]))
63121 elif docsis_ver == "3.1" and ch .get ("mer" ):
64- snr = _parse_float (ch ["mer" ])
65- if snr < SNR_CRIT :
122+ snr_val = _parse_float (ch ["mer" ])
123+
124+ if snr_val is not None :
125+ st = _get_snr_thresholds (modulation )
126+ if snr_val < st ["crit_min" ]:
66127 issues .append ("snr critical" )
67- elif snr < SNR_WARN :
128+ elif snr_val < st [ "good_min" ] :
68129 issues .append ("snr warning" )
69130
70131 return _channel_health (issues ), _health_detail (issues )
71132
72133
73- def _assess_us_channel (ch ):
134+ def _assess_us_channel (ch , docsis_ver = "3.0" ):
74135 """Assess a single upstream channel. Returns (health, health_detail)."""
75136 issues = []
76137 power = _parse_float (ch .get ("powerLevel" ))
77138
78- if power > US_POWER_CRIT :
139+ pt = _get_us_power_thresholds (docsis_ver )
140+ if power < pt ["crit_min" ] or power > pt ["crit_max" ]:
79141 issues .append ("power critical" )
80- elif power > US_POWER_WARN :
142+ elif power < pt [ "good_min" ] or power > pt [ "good_max" ] :
81143 issues .append ("power warning" )
82144
83145 return _channel_health (issues ), _health_detail (issues )
@@ -139,7 +201,7 @@ def analyze(data: dict) -> dict:
139201 # --- Parse upstream channels ---
140202 us_channels = []
141203 for ch in us30 :
142- health , health_detail = _assess_us_channel (ch )
204+ health , health_detail = _assess_us_channel (ch , "3.0" )
143205 us_channels .append ({
144206 "channel_id" : ch .get ("channelID" , 0 ),
145207 "frequency" : ch .get ("frequency" , "" ),
@@ -151,7 +213,7 @@ def analyze(data: dict) -> dict:
151213 "health_detail" : health_detail ,
152214 })
153215 for ch in us31 :
154- health , health_detail = _assess_us_channel (ch )
216+ health , health_detail = _assess_us_channel (ch , "3.1" )
155217 us_channels .append ({
156218 "channel_id" : ch .get ("channelID" , 0 ),
157219 "frequency" : ch .get ("frequency" , "" ),
@@ -190,17 +252,24 @@ def analyze(data: dict) -> dict:
190252
191253 # --- Overall health ---
192254 issues = []
193- if ds_powers and (min (ds_powers ) < - DS_POWER_CRIT or max (ds_powers ) > DS_POWER_CRIT ):
255+ # Summary uses default (256QAM) thresholds for overall health
256+ ds_pt = _get_ds_power_thresholds ()
257+ us_pt = _get_us_power_thresholds ()
258+ snr_t = _get_snr_thresholds ()
259+
260+ if ds_powers and (min (ds_powers ) < ds_pt ["crit_min" ] or max (ds_powers ) > ds_pt ["crit_max" ]):
194261 issues .append ("ds_power_critical" )
195- if us_powers and max (us_powers ) > US_POWER_CRIT :
262+ elif ds_powers and (min (ds_powers ) < ds_pt ["good_min" ] or max (ds_powers ) > ds_pt ["good_max" ]):
263+ issues .append ("ds_power_warn" )
264+ if us_powers and (min (us_powers ) < us_pt ["crit_min" ] or max (us_powers ) > us_pt ["crit_max" ]):
196265 issues .append ("us_power_critical" )
197- elif us_powers and max (us_powers ) > US_POWER_WARN :
266+ elif us_powers and ( min ( us_powers ) < us_pt [ "good_min" ] or max (us_powers ) > us_pt [ "good_max" ]) :
198267 issues .append ("us_power_warn" )
199- if ds_snrs and min (ds_snrs ) < SNR_CRIT :
268+ if ds_snrs and min (ds_snrs ) < snr_t [ "crit_min" ] :
200269 issues .append ("snr_critical" )
201- elif ds_snrs and min (ds_snrs ) < SNR_WARN :
270+ elif ds_snrs and min (ds_snrs ) < snr_t [ "good_min" ] :
202271 issues .append ("snr_warn" )
203- if total_uncorr > UNCORR_ERRORS_CRIT :
272+ if total_uncorr > _get_uncorr_threshold () :
204273 issues .append ("uncorr_errors_high" )
205274
206275 if not issues :
0 commit comments