Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ stages:
DIFF_COVER_CHECK_THRESHOLD: 80
DIFF_COVER_ENABLE: 'true'
pool:
vmImage: ubuntu-20.04
vmImage: ubuntu-latest

container:
image: sonicdev-microsoft.azurecr.io:443/sonic-slave-bookworm:latest
Expand Down
109 changes: 109 additions & 0 deletions src/sonic_ax_impl/mibs/ietf/rfc1213.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import os
import re
import ipaddress
import python_arptable
import socket
import subprocess
from enum import unique, Enum
from bisect import bisect_right

Expand Down Expand Up @@ -179,16 +182,119 @@ def get_next(self, sub_id):

return self.route_list[right]

class NetmaskUpdater(MIBUpdater):
def __init__(self):
super().__init__()
self.db_conn = Namespace.init_namespace_dbs()
self.netmask_map = {}
self.netmask_list = []

def _update_netmask_info(self, dev, ip):
if ip is None: return

if_index = mibs.get_index_from_str(dev)
if if_index is None: return

try:
netip = ipaddress.ip_network(ip, False)
except (ValueError, ipaddress.AddressValueError):
return

netip = str(netip)

if '/' in ip:
netmask = ip.split('/')[1]
else:
mibs.logger.warning("IP '%s' missing prefix length for dev '%s'", ip, dev)
return

netmask = int(netmask)
ip = ip.split('/')[0]
netip = netip.split('/')[0]

netiptuple = ip2byte_tuple(netip)
iptuple = ip2byte_tuple(ip)
subid = (4,) + iptuple

# Create map between subid and OID
oid_tuple = (1, 3, 6, 1, 2, 1, 4, 32, 1, 5)
self.netmask_map[subid] = oid_tuple + (if_index,) + (1, 4) + netiptuple + (netmask,)
self.netmask_list.append(subid)

def update_data(self):
self.netmask_map = {}
self.netmask_list = []

interfaces = Namespace.dbs_keys(self.db_conn, mibs.APPL_DB, "INTF_TABLE:*")
for interface in interfaces:
eth_table_prefix = re.search(
r"INTF_TABLE:([A-Za-z][A-Za-z0-9]+):([0-9./]+)",
interface
)
if eth_table_prefix is None:
continue

dev = eth_table_prefix.group().split(':')[1]
ip = eth_table_prefix.group().split(':')[2]

if "." in ip:
self._update_netmask_info(dev, ip)

result = subprocess.run(
["ip", "addr", "show", "eth0"],
capture_output=True,
text=True,
check=False
)

match = re.search(r"inet (\d+\.\d+\.\d+\.\d+)", result.stdout)
mgmt_ip = match.group(1) if match else ""
if mgmt_ip:
self._update_netmask_info("eth0", mgmt_ip)

result = subprocess.run(
["ip", "addr", "show", "docker0"],
capture_output=True,
text=True,
check=False
)

match = re.search(r"inet (\d+\.\d+\.\d+\.\d+)", result.stdout)
docker_inet = match.group(1) if match else ""
if docker_inet:
self._update_netmask_info("docker0", docker_inet)

match = re.search(r"brd (\d+\.\d+\.\d+\.\d+)", result.stdout)
docker_brd = match.group(1) if match else ""
if docker_brd:
self._update_netmask_info("docker0", docker_brd)

self.netmask_list.sort()

def get_netmask_oid(self, sub_id):
return self.netmask_map.get(sub_id, None)

def get_next(self, sub_id):
right = bisect_right(self.netmask_list, sub_id)
if right >= len(self.netmask_list):
return None

return self.netmask_list[right]

class IpMib(metaclass=MIBMeta, prefix='.1.3.6.1.2.1.4'):
arp_updater = ArpUpdater()
nexthop_updater = NextHopUpdater()
netmask_updater = NetmaskUpdater()

ipRouteNextHop = \
SubtreeMIBEntry('21.1.7', nexthop_updater, ValueType.IP_ADDRESS, nexthop_updater.nexthop)

ipNetToMediaPhysAddress = \
SubtreeMIBEntry('22.1.2', arp_updater, ValueType.OCTET_STRING, arp_updater.arp_dest)

ipNetToNetMask = \
SubtreeMIBEntry('34.1.5.1', netmask_updater, ValueType.OBJECT_IDENTIFIER, netmask_updater.get_netmask_oid)

class InterfacesUpdater(MIBUpdater):

RFC1213_MAX_SPEED = 4294967295
Expand Down Expand Up @@ -398,6 +504,9 @@ def get_counter(self, sub_id, table_name):
# TODO: mgmt counters not available through SNMP right now
# COUNTERS DB does not have support for generic linux (mgmt) interface counters
return 0
elif oid in self.vlan_oid_name_map:
return 0

elif oid in self.oid_lag_name_map:
counter_value = 0
# Sum the values of this counter for all ports in the LAG.
Expand Down
2 changes: 2 additions & 0 deletions src/sonic_ax_impl/mibs/ietf/rfc2863.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ def _get_counter(self, oid, table_name, mask):
# TODO: mgmt counters not available through SNMP right now
# COUNTERS DB does not have support for generic linux (mgmt) interface counters
return 0
if oid in self.vlan_oid_name_map:
return 0

if oid in self.oid_lag_name_map:
counter_value = 0
Expand Down
77 changes: 76 additions & 1 deletion tests/test_rfc1213.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sonic_ax_impl
import sys
from unittest import TestCase
from unittest.mock import MagicMock

if sys.version_info.major == 3:
from unittest import mock
Expand All @@ -12,7 +13,7 @@
modules_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(modules_path, 'src'))

from sonic_ax_impl.mibs.ietf.rfc1213 import NextHopUpdater, InterfacesUpdater, DbTables
from sonic_ax_impl.mibs.ietf.rfc1213 import NextHopUpdater, InterfacesUpdater, DbTables, NetmaskUpdater


class TestNextHopUpdater(TestCase):
Expand Down Expand Up @@ -163,3 +164,77 @@ def mock_dbs_get_all(dbs, db_name, hash, *args, **kwargs):
except TypeError:
self.fail("Caught Type error")
self.assertTrue(counter == None)

class TestNetmaskUpdater(TestCase):
@mock.patch("sonic_ax_impl.mibs.Namespace.init_namespace_dbs", return_value="mock_db_conn")
@mock.patch('sonic_ax_impl.mibs.get_index_from_str', return_value=1)
@mock.patch('ax_interface.util.ip2byte_tuple', side_effect=lambda ip: tuple(map(int, ip.split('.'))))
def test_update_netmask_info(self, mock_ip2byte, mock_get_index, mock_namespace):
updater = NetmaskUpdater()
updater._update_netmask_info("eth0", "192.168.1.1/24")

expected_key = (4, 192, 168, 1, 1)
self.assertIn(expected_key, updater.netmask_map)
self.assertIn(expected_key, updater.netmask_list)

@mock.patch("ax_interface.util.ip2byte_tuple", side_effect=lambda ip: tuple(map(int, ip.split('.'))))
@mock.patch("sonic_ax_impl.mibs.get_index_from_str", return_value=2)
@mock.patch("sonic_ax_impl.mibs.ietf.rfc1213.Namespace.init_namespace_dbs", return_value="mock_db_conn")
@mock.patch("sonic_ax_impl.mibs.ietf.rfc1213.os.popen")
def test_update_data(self, mock_popen, mock_init_ns, mock_get_index, mock_ip2byte):
mock_dbs_keys = mock.MagicMock(return_value=[
"INTF_TABLE:Eth0:192.168.1.1/24",
"INTF_TABLE:Docker0:10.0.0.1/8"
Comment on lines +186 to +187
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test mocks 'Eth0' and 'Docker0' in the interface names, but the actual implementation regex pattern (line 220 in rfc1213.py) expects interface names starting with uppercase followed by lowercase (like "Eth0"). However, these don't match typical SONiC interface naming conventions like "Ethernet8" or "eth0". The test should use realistic interface names that would actually appear in production to properly validate the implementation.

Copilot uses AI. Check for mistakes.
])
mock_init_ns.return_value = "mock_db_conn"

with mock.patch("sonic_ax_impl.mibs.ietf.rfc1213.Namespace.dbs_keys", mock_dbs_keys):
def popen_side_effect(cmd):
mock_proc = MagicMock()
if "Eth0" in cmd and "print $2" in cmd:
Copy link

Copilot AI Dec 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test's mock for os.popen is checking for 'Eth0' in the command, but the actual command in the implementation uses 'eth0' (lowercase). This case sensitivity mismatch means the test won't properly validate the actual code path. The test should match the exact interface names used in the implementation.

Copilot uses AI. Check for mistakes.
mock_proc.read.return_value = "192.168.1.1/24\n"
elif "docker0" in cmd and "print $2" in cmd:
mock_proc.read.return_value = "172.17.0.1/24\n"
elif "docker0" in cmd and "print $4" in cmd:
mock_proc.read.return_value = "172.17.255.255/24\n"
else:
mock_proc.read.return_value = "192.168.1.1/24\n"
return mock_proc

mock_popen.side_effect = popen_side_effect

updater = NetmaskUpdater()
updater.update_data()

self.assertGreater(len(updater.netmask_map), 0)
self.assertTrue(all(isinstance(k, tuple) for k in updater.netmask_list))

def test_get_next(self):
updater = NetmaskUpdater()
updater.netmask_list = [(4, 10, 0, 0, 1), (4, 192, 168, 1, 1), (4, 192, 168, 1, 2)]
self.assertEqual(updater.get_next((4, 10, 0, 0, 1)), (4, 192, 168, 1, 1))
self.assertEqual(updater.get_next((4, 192, 168, 1, 2)), None)

@mock.patch('sonic_ax_impl.mibs.get_index_from_str', return_value=1)
@mock.patch('ax_interface.util.ip2byte_tuple', side_effect=lambda ip: tuple(map(int, ip.split('.'))))
def test_update_netmask_info_invalid_ip(self, mock_ip2byte, mock_get_index):
updater = NetmaskUpdater()

# IP missing prefix length
updater._update_netmask_info("eth0", "192.168.1.1")
self.assertEqual(len(updater.netmask_map), 0)
self.assertEqual(len(updater.netmask_list), 0)

# Malformed IP
updater._update_netmask_info("eth0", "300.168.1.1/24")
self.assertEqual(len(updater.netmask_map), 0)
self.assertEqual(len(updater.netmask_list), 0)

@mock.patch("sonic_ax_impl.mibs.ietf.rfc1213.os.popen", side_effect=OSError("popen failed"))
def test_update_data_popen_fail(self, mock_popen):
updater = NetmaskUpdater()
# Should handle popen failure gracefully without exception
updater.update_data()
self.assertEqual(len(updater.netmask_map), 0)
self.assertEqual(len(updater.netmask_list), 0)

Loading