Skip to content

Commit 4230822

Browse files
authored
Merge pull request #27 from Gozzim/cleanup
refactor: Massive Clean Up
2 parents cc002ed + c37fe5a commit 4230822

33 files changed

+3056
-1557
lines changed

.github/workflows/ci.yml

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
name: CI
2+
3+
on:
4+
push:
5+
branches: [ "master" ]
6+
pull_request:
7+
8+
jobs:
9+
test:
10+
runs-on: ubuntu-latest
11+
permissions:
12+
contents: write
13+
services:
14+
clickhouse:
15+
image: clickhouse/clickhouse-server:23.8
16+
ports:
17+
- 9000:9000
18+
options: >-
19+
--health-cmd "clickhouse-client --query 'SELECT 1'" --health-interval 10s --health-timeout 5s --health-retries 5
20+
21+
steps:
22+
- uses: actions/checkout@v3
23+
24+
- name: Set up Python 3.12
25+
uses: actions/setup-python@v3
26+
with:
27+
python-version: "3.12"
28+
29+
- name: Install dependencies
30+
run: |
31+
python -m pip install --upgrade pip
32+
# Install versions of these packages compatible with Python 3.12
33+
pip install "numpy>=1.26.0" "pandas>=2.1.0" "scipy>=1.11.0"
34+
# Create a temporary requirements file without the upgraded packages
35+
grep -vE "numpy|pandas|scipy" requirements.txt > temp_requirements.txt
36+
# Install the remaining requirements
37+
pip install -r temp_requirements.txt
38+
pip install -r requirements-dev.txt
39+
40+
- name: Run tests and generate coverage report
41+
env:
42+
CLICKHOUSE_HOST: 127.0.0.1
43+
run: |
44+
pytest --cov=analyzer_lib --cov-report=xml
45+
46+
- name: Generate coverage badge
47+
run: |
48+
genbadge coverage -i coverage.xml -o coverage.svg
49+
50+
- name: Commit coverage badge
51+
if: github.ref == 'refs/heads/master'
52+
uses: EndBug/add-and-commit@v9
53+
with:
54+
author_name: 'github-actions[bot]'
55+
author_email: 'github-actions[bot]@users.noreply.github.com'
56+
message: 'chore: Update coverage badge'
57+
add: 'coverage.svg'

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
# Traceroute Data Analyzer and Anomaly Detector
22

33
[![CodeFactor](https://www.codefactor.io/repository/github/gozzim/ripe-atlas-traceroute-analysis/badge?s=23796f0031a238400a38e22d0679ee1bc5682d46)](https://www.codefactor.io/repository/github/gozzim/ripe-atlas-traceroute-analysis)
4+
[![CI](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml/badge.svg)](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml)
5+
[![Coverage Status](coverage.svg)](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml)
46

57
This project provides a comprehensive framework for analyzing RIPE Atlas traceroute data to detect network performance and routing anomalies. It can process large datasets from local files or a ClickHouse database, establish performance baselines, and compare current data against those baselines to identify significant deviations.
68

analyzer_lib/analysis/traceroute/anomaly_detector.py

Lines changed: 144 additions & 94 deletions
Large diffs are not rendered by default.

analyzer_lib/analysis/traceroute/parser.py

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from ...common.constants import PATH_DELIMITER, PATH_TIMEOUT_MARKER, PRIVATE_HOP_MARKER
3030

31-
logger = logging.getLogger('TracerouteAnalyzer')
31+
logger = logging.getLogger("TracerouteAnalyzer")
3232

3333

3434
@lru_cache(maxsize=8192) # Cache results for up to 8192 unique IP strings
@@ -49,13 +49,20 @@ def _is_ip_private_or_special(ip_str: str) -> bool:
4949
return False # Markers are not private IPs
5050
try:
5151
ip_obj = ipaddress.ip_address(ip_str)
52-
return (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast)
52+
return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast
5353
except ValueError:
5454
return False # Not a valid IP, so not considered private/special in this context
5555

5656

57-
def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], source_networks_to_filter: Optional[List[ipaddress.ip_network]], dest_ips_to_filter: Optional[Set[str]],
58-
dest_networks_to_filter: Optional[List[ipaddress.ip_network]], protocol_filters: Optional[Set[str]], include_private_ips_param: bool = False) -> Optional[Dict[str, Any]]:
57+
def parse_traceroute(
58+
line_bytes: bytes,
59+
source_probe_ids: Optional[Set[int]],
60+
source_networks_to_filter: Optional[List[ipaddress.ip_network]],
61+
dest_ips_to_filter: Optional[Set[str]],
62+
dest_networks_to_filter: Optional[List[ipaddress.ip_network]],
63+
protocol_filters: Optional[Set[str]],
64+
include_private_ips_param: bool = False,
65+
) -> Optional[Dict[str, Any]]:
5966
"""Parses a single JSON line representing a RIPE Atlas traceroute measurement.
6067
6168
This function takes a raw byte string from a JSON line file, parses it,
@@ -90,18 +97,18 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
9097
try:
9198
record = orjson.loads(line_bytes)
9299

93-
prb_id = record.get('prb_id')
94-
src_addr_str = record.get('src_addr')
95-
dst_addr_str = record.get('dst_addr')
96-
timestamp = record.get('timestamp')
97-
endtime = record.get('endtime')
98-
measurement_id = record.get('msm_id')
99-
result_list = record.get('result', [])
100-
protocol = record.get('proto')
101-
destination_responded = record.get('destination_ip_responded', False)
100+
prb_id = record.get("prb_id")
101+
src_addr_str = record.get("src_addr")
102+
dst_addr_str = record.get("dst_addr")
103+
timestamp = record.get("timestamp")
104+
endtime = record.get("endtime")
105+
measurement_id = record.get("msm_id")
106+
result_list = record.get("result", [])
107+
protocol = record.get("proto")
108+
destination_responded = record.get("destination_ip_responded", False)
102109

103110
if not all([isinstance(timestamp, int), isinstance(endtime, int), isinstance(prb_id, int), isinstance(measurement_id, int), isinstance(dst_addr_str, str), isinstance(result_list, list), isinstance(protocol, str)]):
104-
logger.debug(f"Skipping record: Missing or invalid basic fields")
111+
logger.debug("Skipping record: Missing or invalid basic fields")
105112
return None
106113

107114
protocol_upper = protocol.upper()
@@ -159,12 +166,12 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
159166
for hop_idx, hop_data in enumerate(result_list):
160167
if not isinstance(hop_data, dict):
161168
continue
162-
hop_num = hop_data.get('hop')
169+
hop_num = hop_data.get("hop")
163170
if not isinstance(hop_num, int):
164171
continue
165172
max_hop_num_reached = max(max_hop_num_reached, hop_num)
166173

167-
hop_results_list = hop_data.get('result', [])
174+
hop_results_list = hop_data.get("result", [])
168175
rtt_this_hop_val, ip_this_hop_val = np.nan, None
169176
all_probes_timed_out_for_hop = True
170177
current_hop_ip_for_path = None
@@ -173,20 +180,20 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
173180
min_rtt_this_hop = np.nan
174181
for res_idx, res in enumerate(hop_results_list):
175182
if isinstance(res, dict):
176-
if 'from' in res:
183+
if "from" in res:
177184
all_probes_timed_out_for_hop = False
178-
responder_ip = res['from']
185+
responder_ip = res["from"]
179186
if current_hop_ip_for_path is None:
180187
current_hop_ip_for_path = responder_ip
181-
if 'rtt' in res:
182-
current_rtt_val = res.get('rtt')
188+
if "rtt" in res:
189+
current_rtt_val = res.get("rtt")
183190
if pd.notna(current_rtt_val):
184191
if ip_this_hop_val is None:
185192
ip_this_hop_val = responder_ip
186193
rtt_this_hop_val = current_rtt_val
187194
if np.isnan(min_rtt_this_hop) or current_rtt_val < min_rtt_this_hop:
188195
min_rtt_this_hop = current_rtt_val
189-
elif res.get('x') == PATH_TIMEOUT_MARKER:
196+
elif res.get("x") == PATH_TIMEOUT_MARKER:
190197
if current_hop_ip_for_path is None:
191198
current_hop_ip_for_path = PATH_TIMEOUT_MARKER
192199
else:
@@ -215,15 +222,15 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
215222
if path_len_probed > 0:
216223
last_hop_entry_data = result_list[-1]
217224
if isinstance(last_hop_entry_data, dict):
218-
last_hop_results_list = last_hop_entry_data.get('result', [])
225+
last_hop_results_list = last_hop_entry_data.get("result", [])
219226
rtts_at_dest = []
220227
temp_responding_ip_last_hop = None
221228
if isinstance(last_hop_results_list, list):
222229
for res in last_hop_results_list:
223-
if isinstance(res, dict) and 'from' in res:
224-
temp_responding_ip_last_hop = res['from']
225-
if destination_responded and temp_responding_ip_last_hop == dst_addr_str and 'rtt' in res:
226-
rtt_val = res.get('rtt')
230+
if isinstance(res, dict) and "from" in res:
231+
temp_responding_ip_last_hop = res["from"]
232+
if destination_responded and temp_responding_ip_last_hop == dst_addr_str and "rtt" in res:
233+
rtt_val = res.get("rtt")
227234
if pd.notna(rtt_val):
228235
rtts_at_dest.append(rtt_val)
229236
last_hop_ip_candidate = temp_responding_ip_last_hop
@@ -257,10 +264,23 @@ def _to_int_or_none(val):
257264
except (ValueError, TypeError):
258265
return None
259266

260-
return {'timestamp': timestamp, 'endtime': endtime, 'prb_id': prb_id, 'msm_id': measurement_id, 'src_addr': str(src_addr_str) if src_addr_str else None, 'dst_addr': str(dst_addr_str),
261-
'protocol': str(protocol).upper(), 'dest_responded': bool(destination_responded), 'final_rtt_ms': _to_float_or_none(final_rtt), 'first_hop_rtt_ms': _to_float_or_none(final_first_hop_rtt),
262-
'path_len': _to_int_or_none(path_len_probed), 'timeouts_count': _to_int_or_none(total_timeouts_count), 'first_hop_ip': str(final_first_hop_ip) if final_first_hop_ip else None,
263-
'last_hop_ip': str(final_last_hop_ip) if final_last_hop_ip else None, 'hop_path_str': hop_path_str}
267+
return {
268+
"timestamp": timestamp,
269+
"endtime": endtime,
270+
"prb_id": prb_id,
271+
"msm_id": measurement_id,
272+
"src_addr": str(src_addr_str) if src_addr_str else None,
273+
"dst_addr": str(dst_addr_str),
274+
"protocol": str(protocol).upper(),
275+
"dest_responded": bool(destination_responded),
276+
"final_rtt_ms": _to_float_or_none(final_rtt),
277+
"first_hop_rtt_ms": _to_float_or_none(final_first_hop_rtt),
278+
"path_len": _to_int_or_none(path_len_probed),
279+
"timeouts_count": _to_int_or_none(total_timeouts_count),
280+
"first_hop_ip": str(final_first_hop_ip) if final_first_hop_ip else None,
281+
"last_hop_ip": str(final_last_hop_ip) if final_last_hop_ip else None,
282+
"hop_path_str": hop_path_str,
283+
}
264284
except (orjson.JSONDecodeError, KeyError, TypeError, ValueError) as e:
265285
logger.debug(f"Error parsing line: {e}")
266286
return None
@@ -269,7 +289,7 @@ def _to_int_or_none(val):
269289
return None
270290

271291

272-
def reconstruct_path_tuple_from_string(hop_path_str: Optional[str], pbar: Optional['tqdm.tqdm'] = None) -> Tuple:
292+
def reconstruct_path_tuple_from_string(hop_path_str: Optional[str], pbar: Optional["tqdm.tqdm"] = None) -> Tuple:
273293
"""Reconstructs a tuple of hop IPs from a delimited string.
274294
275295
This is a utility function to convert the string representation of a path

0 commit comments

Comments
 (0)