Skip to content
85 changes: 83 additions & 2 deletions platform/mellanox/mlnx-platform-api/sonic_platform/sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@
except ImportError as e:
raise ImportError (str(e) + "- required module not found")

try:
import sys
sys.path.append('/run/hw-management/bin')
import hw_management_independent_mode_update
except ImportError:
# Only mock if running under pytest (check if pytest is imported)
if 'pytest' in sys.modules:
from unittest import mock
hw_management_independent_mode_update = mock.MagicMock()
hw_management_independent_mode_update.vendor_data_set_module = mock.MagicMock()
else:
raise

# Define the sdk constants
SX_PORT_MODULE_STATUS_INITIALIZING = 0
SX_PORT_MODULE_STATUS_PLUGGED = 1
Expand Down Expand Up @@ -426,6 +439,9 @@ def __init__(self, sfp_index, sfp_type=None, slot_id=0, linecard_port_count=0, l
self.sn = None
self.temp_high_threshold = None
self.temp_critical_threshold = None
self.retry_read_vendor = 5
self.manufacturer = None
self.part_number = None

def __str__(self):
return f'SFP {self.sdk_index}'
Expand Down Expand Up @@ -867,19 +883,84 @@ def reinit_if_sn_changed(self):
sn = self._get_serial()
if sn != self.sn:
self.reinit()
self.sn = self._get_serial()
# Clear cached vendor info so a new module will be re-read
self.manufacturer = None
self.part_number = None
self.temp_high_threshold = None
self.temp_critical_threshold = None
self.sn = self._get_serial()
if self.sn is not None:
self.retry_read_vendor = 5
else:
self.retry_read_vendor = 0
return True
return False


def get_vendor_info(self):
"""Get SFP vendor info (manufacturer and part number).
Reads fields via xcvr_eeprom to avoid manual offset logic.
Uses cache to avoid redundant reads.
Returns:
tuple: (manufacturer, part_number) or (None, None) if read fails
"""
try:
display_idx = self.sdk_index + 1
if self.manufacturer is not None and self.part_number is not None:
return self.manufacturer, self.part_number

api = self.get_xcvr_api()
if not api or api.xcvr_eeprom is None:
return None, None

try:
manufacturer = api.xcvr_eeprom.read(consts.VENDOR_NAME_FIELD)
part_number = api.xcvr_eeprom.read(consts.VENDOR_PART_NO_FIELD)
logger.log_info(f"SFP {display_idx} vendor info read: manufacturer='{manufacturer}', part_number='{part_number}'")
except Exception as e:
logger.log_error(f"SFP {display_idx} vendor info read failed: {e}")
manufacturer = None
part_number = None

if manufacturer and part_number:
self.manufacturer = manufacturer
self.part_number = part_number
return manufacturer, part_number

return None, None
except Exception:
return None, None

def get_temperature_info(self):
"""Get SFP temperature info in a fast way. This function is faster than calling following functions one by one: get_temperature, get_temperature_warning_threshold, get_temperature_critical_threshold.

Returns:
tuple: (temperature, warning_threshold, critical_threshold)
"""
try:
sn_changed = self.reinit_if_sn_changed()
if self.retry_read_vendor > 0:
try:
manufacturer, part_number = self.get_vendor_info()
if manufacturer and part_number:
vendor_info = {'manufacturer': manufacturer, 'part_number': part_number}
hw_management_independent_mode_update.vendor_data_set_module(
0, # ASIC index always 0 for now
self.sdk_index + 1,
vendor_info
)
logger.log_notice(f'Module {self.sdk_index + 1} vendor info updated - '
f'manufacturer: {manufacturer} part_number: {part_number}')
self.retry_read_vendor = 0
else:
self.retry_read_vendor -= 1
if self.retry_read_vendor == 0:
logger.log_notice(f"SFP {self.sdk_index + 1}: vendor info unavailable after retries")
except Exception as e:
logger.log_warning(f'Failed to publish vendor info for SFP {self.sdk_index + 1} - {e}')
self.retry_read_vendor -= 1
if self.retry_read_vendor == 0:
logger.log_notice(f"SFP {self.sdk_index + 1}: vendor info unavailable after retries")

sw_control = self.is_sw_control()
if not sw_control:
return sw_control, None, None, None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@
try:
import hw_management_independent_mode_update
except ImportError:
# For unit test only
from unittest import mock
hw_management_independent_mode_update = mock.MagicMock()
hw_management_independent_mode_update.module_data_set_module_counter = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_set_asic = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_set_module = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_clean_asic = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_clean_module = mock.MagicMock()
# Only mock if running under pytest (check if pytest is imported)
if 'pytest' in sys.modules:
from unittest import mock
hw_management_independent_mode_update = mock.MagicMock()
hw_management_independent_mode_update.module_data_set_module_counter = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_set_asic = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_set_module = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_clean_asic = mock.MagicMock()
hw_management_independent_mode_update.thermal_data_clean_module = mock.MagicMock()
else:
raise


SFP_TEMPERATURE_SCALE = 1000
Expand Down
124 changes: 122 additions & 2 deletions platform/mellanox/mlnx-platform-api/tests/test_sfp.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,14 +608,134 @@ def mock_read(field):
sfp.is_sw_control.side_effect = Exception('')
assert sfp.get_temperature_info() == (False, None, None, None)

@mock.patch('time.sleep', mock.MagicMock())
def test_get_temperature_info_vendor_retry_loop(self):
sfp = SFP(0)
sfp.reinit_if_sn_changed = mock.MagicMock(side_effect=[True, False, False])
sfp.is_sw_control = mock.MagicMock(return_value=False)
sfp.retry_read_vendor = 5
# First two attempts fail, third succeeds
sfp.get_vendor_info = mock.MagicMock(side_effect=[(None, None), (None, None), ('Mellanox', 'PN-9999')])

# Attempt 1: reinit sets retry counter, first vendor read fails (counter -> 4)
sfp.get_temperature_info()
# Attempt 2: retry counter >0, second vendor read fails (counter -> 3)
sfp.get_temperature_info()
# Attempt 3: retry counter >0, vendor read succeeds, counter cleared
sfp.get_temperature_info()

assert sfp.get_vendor_info.call_count == 3
assert sfp.retry_read_vendor == 0

def test_reinit_if_sn_changed(self):
sfp = SFP(0)
sfp.get_xcvr_api = mock.MagicMock(return_value=None)
assert not sfp.reinit_if_sn_changed()

sfp.get_xcvr_api.return_value = mock.MagicMock()
sfp.get_xcvr_api.return_value.xcvr_eeprom.read = mock.MagicMock(return_value='1234567890')
assert sfp.reinit_if_sn_changed()

assert sfp.retry_read_vendor == 5

sfp.get_xcvr_api.return_value.xcvr_eeprom.read.return_value = '1234567891'
assert sfp.reinit_if_sn_changed()
assert sfp.retry_read_vendor == 5

# Vendor cache should reset on reinit to allow new modules to be read
sfp.sn = 'old_sn'
sfp.manufacturer = 'OldVendor'
sfp.part_number = 'OldPart'
sfp._get_serial = mock.MagicMock(return_value='new_sn')
assert sfp.reinit_if_sn_changed()
assert sfp.manufacturer is None
assert sfp.part_number is None
assert sfp.retry_read_vendor == 5

@mock.patch('time.sleep', mock.MagicMock())
def test_get_vendor_info_success_and_cache(self):
sfp = SFP(0)
mock_api = mock.MagicMock()
mock_eeprom = mock.MagicMock()
mock_api.xcvr_eeprom = mock_eeprom
sfp.get_xcvr_api = mock.MagicMock(return_value=mock_api)

from sonic_platform_base.sonic_xcvr.fields import consts
def mock_read(field):
if field == consts.VENDOR_NAME_FIELD:
return 'Mellanox'
if field == consts.VENDOR_PART_NO_FIELD:
return 'PN-1234'
return None
mock_eeprom.read.side_effect = mock_read

# First call reads from eeprom
manufacturer, part_number = sfp.get_vendor_info()
assert manufacturer == 'Mellanox'
assert part_number == 'PN-1234'
assert mock_eeprom.read.call_count == 2

# Second call should return cached values without additional reads
manufacturer, part_number = sfp.get_vendor_info()
assert manufacturer == 'Mellanox'
assert part_number == 'PN-1234'
assert mock_eeprom.read.call_count == 2

@mock.patch('time.sleep', mock.MagicMock())
def test_get_vendor_info_retry_then_success(self):
sfp = SFP(0)
mock_api = mock.MagicMock()
mock_eeprom = mock.MagicMock()
mock_api.xcvr_eeprom = mock_eeprom
sfp.get_xcvr_api = mock.MagicMock(return_value=mock_api)

from sonic_platform_base.sonic_xcvr.fields import consts
state = {'fail_reads': 2}
def flaky_read(field):
if state['fail_reads'] > 0:
state['fail_reads'] -= 1
raise Exception('EEPROM not ready')
if field == consts.VENDOR_NAME_FIELD:
return 'Mellanox'
if field == consts.VENDOR_PART_NO_FIELD:
return 'PN-5678'
return None
mock_eeprom.read.side_effect = flaky_read

# First call fails (counter decremented)
manufacturer, part_number = sfp.get_vendor_info()
assert (manufacturer, part_number) == (None, None)
# Second call fails (counter decremented)
manufacturer, part_number = sfp.get_vendor_info()
assert (manufacturer, part_number) == (None, None)
# Third call succeeds (reads both fields)
manufacturer, part_number = sfp.get_vendor_info()
assert (manufacturer, part_number) == ('Mellanox', 'PN-5678')
# Total read invocations: first two calls each raise on first field (2),
# third call reads both fields (2) → 4 total
assert mock_eeprom.read.call_count == 4

@mock.patch('time.sleep', mock.MagicMock())
def test_get_vendor_info_all_fail(self):
sfp = SFP(0)
mock_api = mock.MagicMock()
mock_eeprom = mock.MagicMock()
mock_api.xcvr_eeprom = mock_eeprom
sfp.get_xcvr_api = mock.MagicMock(return_value=mock_api)
mock_eeprom.read.side_effect = Exception('EEPROM error')

manufacturer, part_number = sfp.get_vendor_info()
assert manufacturer is None
assert part_number is None

def test_get_vendor_info_no_api_or_missing_attr(self):
sfp = SFP(0)
# No API
sfp.get_xcvr_api = mock.MagicMock(return_value=None)
assert sfp.get_vendor_info() == (None, None)

# API without xcvr_eeprom attribute
class DummyApi(object):
pass
sfp.get_xcvr_api.return_value = DummyApi()
assert sfp.get_vendor_info() == (None, None)

Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,48 @@ def test_update_module(self):
hw_management_independent_mode_update.reset_mock()
updater.update_module()
hw_management_independent_mode_update.thermal_data_set_module.assert_called_once_with(0, 11, 0, 0, 0, 0)

# ---- SFP.get_temperature_info publishes vendor info on module change ----
def _make_sfp_for_publish(self, sn_changed=True, vendor=('Innolight', 'TR-iQ13L-NVS')):
# Import locally to avoid any potential name resolution issues in test scope
from sonic_platform.sfp import SFP as _SFP
sfp = object.__new__(_SFP)
sfp.sdk_index = 10
sfp.retry_read_vendor = 5 if sn_changed else 0
sfp.is_sw_control = mock.MagicMock(return_value=True)
sfp.reinit_if_sn_changed = mock.MagicMock(return_value=sn_changed)
if vendor is None:
sfp.get_vendor_info = mock.MagicMock(return_value=(None, None))
else:
sfp.get_vendor_info = mock.MagicMock(return_value=vendor)
api = mock.MagicMock()
api.get_transceiver_thresholds_support = mock.MagicMock(return_value=False)
sfp.get_xcvr_api = mock.MagicMock(return_value=api)
return sfp

def test_sfp_get_temperature_info_publishes_vendor_on_sn_change(self):
from sonic_platform.sfp import hw_management_independent_mode_update as sfp_hw_management_independent_mode_update
sfp = self._make_sfp_for_publish(sn_changed=True, vendor=('Innolight', 'TR-iQ13L-NVS'))
with mock.patch('sonic_platform.sfp.SfpOptoeBase.get_temperature', return_value=55.0):
sfp_hw_management_independent_mode_update.reset_mock()
sfp.get_temperature_info()
sfp_hw_management_independent_mode_update.vendor_data_set_module.assert_called_once_with(
0, 11, {'manufacturer': 'Innolight', 'part_number': 'TR-iQ13L-NVS'}
)

def test_sfp_get_temperature_info_no_publish_when_no_change(self):
from sonic_platform.sfp import hw_management_independent_mode_update as sfp_hw_management_independent_mode_update
sfp = self._make_sfp_for_publish(sn_changed=False, vendor=('Innolight', 'TR-iQ13L-NVS'))
with mock.patch('sonic_platform.sfp.SfpOptoeBase.get_temperature', return_value=55.0):
sfp_hw_management_independent_mode_update.reset_mock()
sfp.get_temperature_info()
sfp_hw_management_independent_mode_update.vendor_data_set_module.assert_not_called()

def test_sfp_get_temperature_info_no_publish_when_vendor_missing(self):
from sonic_platform.sfp import hw_management_independent_mode_update as sfp_hw_management_independent_mode_update
sfp = self._make_sfp_for_publish(sn_changed=True, vendor=None)
with mock.patch('sonic_platform.sfp.SfpOptoeBase.get_temperature', return_value=55.0):
sfp_hw_management_independent_mode_update.reset_mock()
sfp.get_temperature_info()
sfp_hw_management_independent_mode_update.vendor_data_set_module.assert_not_called()

Loading