Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: CI

on:
push:
branches: [ "master" ]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
permissions:
contents: write
services:
clickhouse:
image: clickhouse/clickhouse-server:23.8
ports:
- 9000:9000
options: >-
--health-cmd "clickhouse-client --query 'SELECT 1'" --health-interval 10s --health-timeout 5s --health-retries 5

steps:
- uses: actions/checkout@v3

- name: Set up Python 3.12
uses: actions/setup-python@v3
with:
python-version: "3.12"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
# Install versions of these packages compatible with Python 3.12
pip install "numpy>=1.26.0" "pandas>=2.1.0" "scipy>=1.11.0"
# Create a temporary requirements file without the upgraded packages
grep -vE "numpy|pandas|scipy" requirements.txt > temp_requirements.txt
# Install the remaining requirements
pip install -r temp_requirements.txt
pip install -r requirements-dev.txt

- name: Run tests and generate coverage report
env:
CLICKHOUSE_HOST: 127.0.0.1
run: |
pytest --cov=analyzer_lib --cov-report=xml

- name: Generate coverage badge
run: |
genbadge coverage -i coverage.xml -o coverage.svg

- name: Commit coverage badge
if: github.ref == 'refs/heads/master'
uses: EndBug/add-and-commit@v9
with:
author_name: 'github-actions[bot]'
author_email: 'github-actions[bot]@users.noreply.github.com'
message: 'chore: Update coverage badge'
add: 'coverage.svg'
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Traceroute Data Analyzer and Anomaly Detector

[![CodeFactor](https://www.codefactor.io/repository/github/gozzim/ripe-atlas-traceroute-analysis/badge?s=23796f0031a238400a38e22d0679ee1bc5682d46)](https://www.codefactor.io/repository/github/gozzim/ripe-atlas-traceroute-analysis)
[![CI](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml/badge.svg)](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml)
[![Coverage Status](coverage.svg)](https://github.com/Gozzim/RIPE-Atlas-Traceroute-Analysis/actions/workflows/ci.yml)

This project provides a comprehensive framework for analyzing RIPE Atlas traceroute data to detect network performance and routing anomalies. It can process large datasets from local files or a ClickHouse database, establish performance baselines, and compare current data against those baselines to identify significant deviations.

Expand Down
238 changes: 144 additions & 94 deletions analyzer_lib/analysis/traceroute/anomaly_detector.py

Large diffs are not rendered by default.

82 changes: 51 additions & 31 deletions analyzer_lib/analysis/traceroute/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from ...common.constants import PATH_DELIMITER, PATH_TIMEOUT_MARKER, PRIVATE_HOP_MARKER

logger = logging.getLogger('TracerouteAnalyzer')
logger = logging.getLogger("TracerouteAnalyzer")


@lru_cache(maxsize=8192) # Cache results for up to 8192 unique IP strings
Expand All @@ -49,13 +49,20 @@ def _is_ip_private_or_special(ip_str: str) -> bool:
return False # Markers are not private IPs
try:
ip_obj = ipaddress.ip_address(ip_str)
return (ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast)
return ip_obj.is_private or ip_obj.is_loopback or ip_obj.is_link_local or ip_obj.is_multicast
except ValueError:
return False # Not a valid IP, so not considered private/special in this context


def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], source_networks_to_filter: Optional[List[ipaddress.ip_network]], dest_ips_to_filter: Optional[Set[str]],
dest_networks_to_filter: Optional[List[ipaddress.ip_network]], protocol_filters: Optional[Set[str]], include_private_ips_param: bool = False) -> Optional[Dict[str, Any]]:
def parse_traceroute(
line_bytes: bytes,
source_probe_ids: Optional[Set[int]],
source_networks_to_filter: Optional[List[ipaddress.ip_network]],
dest_ips_to_filter: Optional[Set[str]],
dest_networks_to_filter: Optional[List[ipaddress.ip_network]],
protocol_filters: Optional[Set[str]],
include_private_ips_param: bool = False,
) -> Optional[Dict[str, Any]]:
"""Parses a single JSON line representing a RIPE Atlas traceroute measurement.
This function takes a raw byte string from a JSON line file, parses it,
Expand Down Expand Up @@ -90,18 +97,18 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
try:
record = orjson.loads(line_bytes)

prb_id = record.get('prb_id')
src_addr_str = record.get('src_addr')
dst_addr_str = record.get('dst_addr')
timestamp = record.get('timestamp')
endtime = record.get('endtime')
measurement_id = record.get('msm_id')
result_list = record.get('result', [])
protocol = record.get('proto')
destination_responded = record.get('destination_ip_responded', False)
prb_id = record.get("prb_id")
src_addr_str = record.get("src_addr")
dst_addr_str = record.get("dst_addr")
timestamp = record.get("timestamp")
endtime = record.get("endtime")
measurement_id = record.get("msm_id")
result_list = record.get("result", [])
protocol = record.get("proto")
destination_responded = record.get("destination_ip_responded", False)

if not all([isinstance(timestamp, int), isinstance(endtime, int), isinstance(prb_id, int), isinstance(measurement_id, int), isinstance(dst_addr_str, str), isinstance(result_list, list), isinstance(protocol, str)]):
logger.debug(f"Skipping record: Missing or invalid basic fields")
logger.debug("Skipping record: Missing or invalid basic fields")
return None

protocol_upper = protocol.upper()
Expand Down Expand Up @@ -159,12 +166,12 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
for hop_idx, hop_data in enumerate(result_list):
if not isinstance(hop_data, dict):
continue
hop_num = hop_data.get('hop')
hop_num = hop_data.get("hop")
if not isinstance(hop_num, int):
continue
max_hop_num_reached = max(max_hop_num_reached, hop_num)

hop_results_list = hop_data.get('result', [])
hop_results_list = hop_data.get("result", [])
rtt_this_hop_val, ip_this_hop_val = np.nan, None
all_probes_timed_out_for_hop = True
current_hop_ip_for_path = None
Expand All @@ -173,20 +180,20 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
min_rtt_this_hop = np.nan
for res_idx, res in enumerate(hop_results_list):
if isinstance(res, dict):
if 'from' in res:
if "from" in res:
all_probes_timed_out_for_hop = False
responder_ip = res['from']
responder_ip = res["from"]
if current_hop_ip_for_path is None:
current_hop_ip_for_path = responder_ip
if 'rtt' in res:
current_rtt_val = res.get('rtt')
if "rtt" in res:
current_rtt_val = res.get("rtt")
if pd.notna(current_rtt_val):
if ip_this_hop_val is None:
ip_this_hop_val = responder_ip
rtt_this_hop_val = current_rtt_val
if np.isnan(min_rtt_this_hop) or current_rtt_val < min_rtt_this_hop:
min_rtt_this_hop = current_rtt_val
elif res.get('x') == PATH_TIMEOUT_MARKER:
elif res.get("x") == PATH_TIMEOUT_MARKER:
if current_hop_ip_for_path is None:
current_hop_ip_for_path = PATH_TIMEOUT_MARKER
else:
Expand Down Expand Up @@ -215,15 +222,15 @@ def parse_traceroute(line_bytes: bytes, source_probe_ids: Optional[Set[int]], so
if path_len_probed > 0:
last_hop_entry_data = result_list[-1]
if isinstance(last_hop_entry_data, dict):
last_hop_results_list = last_hop_entry_data.get('result', [])
last_hop_results_list = last_hop_entry_data.get("result", [])
rtts_at_dest = []
temp_responding_ip_last_hop = None
if isinstance(last_hop_results_list, list):
for res in last_hop_results_list:
if isinstance(res, dict) and 'from' in res:
temp_responding_ip_last_hop = res['from']
if destination_responded and temp_responding_ip_last_hop == dst_addr_str and 'rtt' in res:
rtt_val = res.get('rtt')
if isinstance(res, dict) and "from" in res:
temp_responding_ip_last_hop = res["from"]
if destination_responded and temp_responding_ip_last_hop == dst_addr_str and "rtt" in res:
rtt_val = res.get("rtt")
if pd.notna(rtt_val):
rtts_at_dest.append(rtt_val)
last_hop_ip_candidate = temp_responding_ip_last_hop
Expand Down Expand Up @@ -257,10 +264,23 @@ def _to_int_or_none(val):
except (ValueError, TypeError):
return None

return {'timestamp': timestamp, 'endtime': endtime, 'prb_id': prb_id, 'msm_id': measurement_id, 'src_addr': str(src_addr_str) if src_addr_str else None, 'dst_addr': str(dst_addr_str),
'protocol': str(protocol).upper(), 'dest_responded': bool(destination_responded), 'final_rtt_ms': _to_float_or_none(final_rtt), 'first_hop_rtt_ms': _to_float_or_none(final_first_hop_rtt),
'path_len': _to_int_or_none(path_len_probed), 'timeouts_count': _to_int_or_none(total_timeouts_count), 'first_hop_ip': str(final_first_hop_ip) if final_first_hop_ip else None,
'last_hop_ip': str(final_last_hop_ip) if final_last_hop_ip else None, 'hop_path_str': hop_path_str}
return {
"timestamp": timestamp,
"endtime": endtime,
"prb_id": prb_id,
"msm_id": measurement_id,
"src_addr": str(src_addr_str) if src_addr_str else None,
"dst_addr": str(dst_addr_str),
"protocol": str(protocol).upper(),
"dest_responded": bool(destination_responded),
"final_rtt_ms": _to_float_or_none(final_rtt),
"first_hop_rtt_ms": _to_float_or_none(final_first_hop_rtt),
"path_len": _to_int_or_none(path_len_probed),
"timeouts_count": _to_int_or_none(total_timeouts_count),
"first_hop_ip": str(final_first_hop_ip) if final_first_hop_ip else None,
"last_hop_ip": str(final_last_hop_ip) if final_last_hop_ip else None,
"hop_path_str": hop_path_str,
}
except (orjson.JSONDecodeError, KeyError, TypeError, ValueError) as e:
logger.debug(f"Error parsing line: {e}")
return None
Expand All @@ -269,7 +289,7 @@ def _to_int_or_none(val):
return None


def reconstruct_path_tuple_from_string(hop_path_str: Optional[str], pbar: Optional['tqdm.tqdm'] = None) -> Tuple:
def reconstruct_path_tuple_from_string(hop_path_str: Optional[str], pbar: Optional["tqdm.tqdm"] = None) -> Tuple:
"""Reconstructs a tuple of hop IPs from a delimited string.
This is a utility function to convert the string representation of a path
Expand Down
Loading