2424#
2525###############################################################################
2626import re
27- from typing import Optional
27+ from typing import List , Optional , Tuple
2828
2929from nodescraper .base import InBandDataCollector
3030from nodescraper .enums import EventCategory , EventPriority , ExecutionStatus
@@ -49,7 +49,7 @@ class NetworkCollector(InBandDataCollector[NetworkDataModel, None]):
4949 CMD_RULE = "ip rule show"
5050 CMD_NEIGHBOR = "ip neighbor show"
5151
52- def _parse_ip_addr (self , output : str ) -> list [NetworkInterface ]:
52+ def _parse_ip_addr (self , output : str ) -> List [NetworkInterface ]:
5353 """Parse 'ip addr show' output into NetworkInterface objects.
5454
5555 Args:
@@ -78,7 +78,7 @@ def _parse_ip_addr(self, output: str) -> list[NetworkInterface]:
7878 current_interface = ifname
7979
8080 # Extract flags
81- flags : list [str ] = []
81+ flags : List [str ] = []
8282 if "<" in line :
8383 flag_match = re .search (r"<([^>]+)>" , line )
8484 if flag_match :
@@ -89,16 +89,20 @@ def _parse_ip_addr(self, output: str) -> list[NetworkInterface]:
8989 qdisc = None
9090 state = None
9191
92+ # Known keyword-value pairs
93+ keyword_value_pairs = ["mtu" , "qdisc" , "state" ]
94+
9295 for i , part in enumerate (parts ):
93- if part == "mtu" and i + 1 < len (parts ):
94- try :
95- mtu = int (parts [i + 1 ])
96- except ValueError :
97- pass
98- elif part == "qdisc" and i + 1 < len (parts ):
99- qdisc = parts [i + 1 ]
100- elif part == "state" and i + 1 < len (parts ):
101- state = parts [i + 1 ]
96+ if part in keyword_value_pairs and i + 1 < len (parts ):
97+ if part == "mtu" :
98+ try :
99+ mtu = int (parts [i + 1 ])
100+ except ValueError :
101+ pass
102+ elif part == "qdisc" :
103+ qdisc = parts [i + 1 ]
104+ elif part == "state" :
105+ state = parts [i + 1 ]
102106
103107 interfaces [ifname ] = NetworkInterface (
104108 name = ifname ,
@@ -165,7 +169,7 @@ def _parse_ip_addr(self, output: str) -> list[NetworkInterface]:
165169
166170 return list (interfaces .values ())
167171
168- def _parse_ip_route (self , output : str ) -> list [Route ]:
172+ def _parse_ip_route (self , output : str ) -> List [Route ]:
169173 """Parse 'ip route show' output into Route objects.
170174
171175 Args:
@@ -190,32 +194,33 @@ def _parse_ip_route(self, output: str) -> list[Route]:
190194
191195 route = Route (destination = destination )
192196
197+ # Known keyword-value pairs
198+ keyword_value_pairs = ["via" , "dev" , "proto" , "scope" , "metric" , "src" , "table" ]
199+
193200 # Parse route attributes
194201 i = 1
195202 while i < len (parts ):
196- if parts [i ] == "via" and i + 1 < len (parts ):
197- route .gateway = parts [i + 1 ]
198- i += 2
199- elif parts [i ] == "dev" and i + 1 < len (parts ):
200- route .device = parts [i + 1 ]
201- i += 2
202- elif parts [i ] == "proto" and i + 1 < len (parts ):
203- route .protocol = parts [i + 1 ]
204- i += 2
205- elif parts [i ] == "scope" and i + 1 < len (parts ):
206- route .scope = parts [i + 1 ]
207- i += 2
208- elif parts [i ] == "metric" and i + 1 < len (parts ):
209- try :
210- route .metric = int (parts [i + 1 ])
211- except ValueError :
212- pass
213- i += 2
214- elif parts [i ] == "src" and i + 1 < len (parts ):
215- route .source = parts [i + 1 ]
216- i += 2
217- elif parts [i ] == "table" and i + 1 < len (parts ):
218- route .table = parts [i + 1 ]
203+ if parts [i ] in keyword_value_pairs and i + 1 < len (parts ):
204+ keyword = parts [i ]
205+ value = parts [i + 1 ]
206+
207+ if keyword == "via" :
208+ route .gateway = value
209+ elif keyword == "dev" :
210+ route .device = value
211+ elif keyword == "proto" :
212+ route .protocol = value
213+ elif keyword == "scope" :
214+ route .scope = value
215+ elif keyword == "metric" :
216+ try :
217+ route .metric = int (value )
218+ except ValueError :
219+ pass
220+ elif keyword == "src" :
221+ route .source = value
222+ elif keyword == "table" :
223+ route .table = value
219224 i += 2
220225 else :
221226 i += 1
@@ -224,7 +229,7 @@ def _parse_ip_route(self, output: str) -> list[Route]:
224229
225230 return routes
226231
227- def _parse_ip_rule (self , output : str ) -> list [RoutingRule ]:
232+ def _parse_ip_rule (self , output : str ) -> List [RoutingRule ]:
228233 """Parse 'ip rule show' output into RoutingRule objects.
229234 Example ip rule: 200: from 172.16.0.0/12 to 8.8.8.8 iif wlan0 oif eth0 fwmark 0x20 table vpn_table
230235
@@ -289,7 +294,7 @@ def _parse_ip_rule(self, output: str) -> list[RoutingRule]:
289294
290295 return rules
291296
292- def _parse_ip_neighbor (self , output : str ) -> list [Neighbor ]:
297+ def _parse_ip_neighbor (self , output : str ) -> List [Neighbor ]:
293298 """Parse 'ip neighbor show' output into Neighbor objects.
294299
295300 Args:
@@ -368,11 +373,11 @@ def _parse_ip_neighbor(self, output: str) -> list[Neighbor]:
368373 def collect_data (
369374 self ,
370375 args = None ,
371- ) -> tuple [TaskResult , Optional [NetworkDataModel ]]:
376+ ) -> Tuple [TaskResult , Optional [NetworkDataModel ]]:
372377 """Collect network configuration from the system.
373378
374379 Returns:
375- tuple [TaskResult, Optional[NetworkDataModel]]: tuple containing the task result
380+ Tuple [TaskResult, Optional[NetworkDataModel]]: tuple containing the task result
376381 and an instance of NetworkDataModel or None if collection failed.
377382 """
378383 interfaces = []
0 commit comments