Skip to content
25 changes: 19 additions & 6 deletions sonic_platform_base/sonic_xcvr/api/public/c_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ...fields import consts
from .cmis import CmisApi, CMIS_VDM_KEY_TO_DB_PREFIX_KEY_MAP
import time
import copy
BYTELENGTH = 8
SYSLOG_IDENTIFIER = "CCmisApi"

Expand Down Expand Up @@ -46,6 +47,15 @@
helper_logger = logger.Logger(SYSLOG_IDENTIFIER)

class CCmisApi(CmisApi):
c_cmis_xcvr_info_dict = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
c_cmis_xcvr_info_dict.update({
"supported_max_tx_power": "N/A",
"supported_min_tx_power": "N/A",
"supported_max_laser_freq": "N/A",
"supported_min_laser_freq": "N/A"
})


def __init__(self, xcvr_eeprom):
super(CCmisApi, self).__init__(xcvr_eeprom)

Expand Down Expand Up @@ -354,14 +364,17 @@ def get_transceiver_info(self):
supported_min_laser_freq = FLOAT ; support minimum laser frequency
================================================================================
"""
trans_info = super(CCmisApi,self).get_transceiver_info()
self.xcvr_info = copy.deepcopy(self.c_cmis_xcvr_info_dict)
self.xcvr_info = super(CCmisApi, self).get_transceiver_info()
min_power, max_power = self.get_supported_power_config()
trans_info['supported_max_tx_power'] = max_power
trans_info['supported_min_tx_power'] = min_power
_, _, _, low_freq_supported, high_freq_supported = self.get_supported_freq_config()
trans_info['supported_max_laser_freq'] = high_freq_supported
trans_info['supported_min_laser_freq'] = low_freq_supported
return trans_info
self.xcvr_info.update({
'supported_max_tx_power': max_power,
'supported_min_tx_power': min_power,
'supported_max_laser_freq': high_freq_supported,
'supported_min_laser_freq': low_freq_supported
})
return self.xcvr_info

def get_transceiver_bulk_status(self):
"""
Expand Down
79 changes: 54 additions & 25 deletions sonic_platform_base/sonic_xcvr/api/public/cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .cmisCDB import CmisCdbApi
from .cmisVDM import CmisVdmApi
import time
import copy
from collections import defaultdict

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,6 +83,37 @@ class CmisApi(XcvrApi):
LowPwrRequestSW = 4
LowPwrAllowRequestHW = 6

cmis_xcvr_info_dict = {
"type": "N/A",
"type_abbrv_name": "N/A",
"hardware_rev": "N/A",
"serial": "N/A",
"manufacturer": "N/A",
"model": "N/A",
"connector": "N/A",
"encoding": "N/A",
"ext_identifier": "N/A",
"ext_rateselect_compliance": "N/A",
"cable_length": "N/A",
"nominal_bit_rate": "N/A",
"vendor_date": "N/A",
"vendor_oui": "N/A",
**{f"active_apsel_hostlane{i}": "N/A" for i in range(1, 9)},
"application_advertisement": "N/A",
"host_electrical_interface": "N/A",
"media_interface_code": "N/A",
"host_lane_count": "N/A",
"media_lane_count": "N/A",
"host_lane_assignment_option": "N/A",
"media_lane_assignment_option": "N/A",
"cable_type": "N/A",
"media_interface_technology": "N/A",
"vendor_rev": "N/A",
"cmis_rev": "N/A",
"specification_compliance": "N/A",
"vdm_supported": "N/A"
}

def __init__(self, xcvr_eeprom):
super(CmisApi, self).__init__(xcvr_eeprom)
self.vdm = CmisVdmApi(xcvr_eeprom) if not self.is_flat_memory() else None
Expand Down Expand Up @@ -267,50 +299,47 @@ def get_transceiver_info(self):
ext_id = admin_info[consts.EXT_ID_FIELD]
power_class = ext_id[consts.POWER_CLASS_FIELD]
max_power = ext_id[consts.MAX_POWER_FIELD]

xcvr_info = {
self.xcvr_info = copy.deepcopy(CmisApi.cmis_xcvr_info_dict)
self.xcvr_info.update({
"type": admin_info[consts.ID_FIELD],
"type_abbrv_name": admin_info[consts.ID_ABBRV_FIELD],
"hardware_rev": self.get_module_hardware_revision(),
"serial": admin_info[consts.VENDOR_SERIAL_NO_FIELD],
"manufacturer": admin_info[consts.VENDOR_NAME_FIELD],
"model": admin_info[consts.VENDOR_PART_NO_FIELD],
"connector": admin_info[consts.CONNECTOR_FIELD],
"encoding": "N/A", # Not supported
"ext_identifier": "%s (%sW Max)" % (power_class, max_power),
"ext_rateselect_compliance": "N/A", # Not supported
"cable_length": float(admin_info[consts.LENGTH_ASSEMBLY_FIELD]),
"nominal_bit_rate": 0, # Not supported
"vendor_date": admin_info[consts.VENDOR_DATE_FIELD],
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD]
}
appl_advt = self.get_application_advertisement()
xcvr_info['application_advertisement'] = str(appl_advt) if len(appl_advt) > 0 else 'N/A'
xcvr_info['host_electrical_interface'] = self.get_host_electrical_interface()
xcvr_info['media_interface_code'] = self.get_module_media_interface()
xcvr_info['host_lane_count'] = self.get_host_lane_count()
xcvr_info['media_lane_count'] = self.get_media_lane_count()
xcvr_info['host_lane_assignment_option'] = self.get_host_lane_assignment_option()
xcvr_info['media_lane_assignment_option'] = self.get_media_lane_assignment_option()
xcvr_info['cable_type'] = self.get_cable_length_type()
"vendor_oui": admin_info[consts.VENDOR_OUI_FIELD],
"application_advertisement": str(self.get_application_advertisement()) if len(self.get_application_advertisement()) > 0 else 'N/A',
"host_electrical_interface": self.get_host_electrical_interface(),
"media_interface_code": self.get_module_media_interface(),
"host_lane_count": self.get_host_lane_count(),
"media_lane_count": self.get_media_lane_count(),
"host_lane_assignment_option": self.get_host_lane_assignment_option(),
"media_lane_assignment_option": self.get_media_lane_assignment_option(),
"cable_type": self.get_cable_length_type(),
"media_interface_technology": self.get_media_interface_technology(),
"vendor_rev": self.get_vendor_rev(),
"cmis_rev": self.get_cmis_rev(),
"specification_compliance": self.get_module_media_type(),
"vdm_supported": self.is_transceiver_vdm_supported()
})
apsel_dict = self.get_active_apsel_hostlane()
for lane in range(1, self.NUM_CHANNELS+1):
xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]
xcvr_info['media_interface_technology'] = self.get_media_interface_technology()
xcvr_info['vendor_rev'] = self.get_vendor_rev()
xcvr_info['cmis_rev'] = self.get_cmis_rev()
xcvr_info['specification_compliance'] = self.get_module_media_type()
for lane in range(1, self.NUM_CHANNELS + 1):
self.xcvr_info["%s%d" % ("active_apsel_hostlane", lane)] = \
apsel_dict["%s%d" % (consts.ACTIVE_APSEL_HOSTLANE, lane)]

# In normal case will get a valid value for each of the fields. If get a 'None' value
# means there was a failure while reading the EEPROM, either because the EEPROM was
# not ready yet or experincing some other issues. It shouldn't return a dict with a
# wrong field value, instead should return a 'None' to indicate to XCVRD that retry is
# needed.
if None in xcvr_info.values():
if None in self.xcvr_info.values():
return None
else:
return xcvr_info
return self.xcvr_info

def get_transceiver_info_firmware_versions(self):
return_dict = {"active_firmware" : "N/A", "inactive_firmware" : "N/A"}
Expand Down
17 changes: 10 additions & 7 deletions tests/sonic_xcvr/test_ccmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,29 +160,32 @@ def test_get_pm_all(self, mock_response, expected):

@pytest.mark.parametrize("mock_response, expected",[
(
[ {
(
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver'
},
(-20, 0),
(0xff, -72, 120, 191300, 196100)
],
),
{
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'supported_min_laser_freq': 191300,
'supported_max_laser_freq': 196100,
'supported_max_tx_power': 0,
'supported_min_tx_power': -20,

}
)
])
@patch("sonic_platform_base.sonic_xcvr.api.public.cmis.CmisApi.get_transceiver_info")
def test_get_transceiver_info(self, get_transceiver_info_func, mock_response, expected):
# Mock the base class method to return initial transceiver data
get_transceiver_info_func.return_value = mock_response[0]
self.api.get_supported_power_config = MagicMock()
self.api.get_supported_power_config.return_value = mock_response[1]
self.api.get_supported_freq_config = MagicMock()
self.api.get_supported_freq_config.return_value = mock_response[2]

# Mock the power and frequency configurations
self.api.get_supported_power_config = MagicMock(return_value = mock_response[1])
self.api.get_supported_freq_config = MagicMock(return_value = mock_response[2])

# Call function under test
result = self.api.get_transceiver_info()
assert result == expected

Expand Down
85 changes: 36 additions & 49 deletions tests/sonic_xcvr/test_cmis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,11 +1433,14 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
result = self.api.module_fw_upgrade(input_param)
assert result == expected

@pytest.mark.parametrize("mock_response, expected",[
([None, None, None, None, None, None, None, None, None, None, None, None, None, None, None], None),
@pytest.mark.parametrize("mock_response, expected", [
# Case: No transceiver data
([None] * 15, None),

# Case: Valid transceiver data
(
[
{
{ # EEPROM mocked Data
'Extended Identifier': {'Power Class': 'Power Class 8', 'MaxPower': 20.0},
'Identifier': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'Identifier Abbreviation': 'QSFP-DD',
Expand All @@ -1450,30 +1453,31 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'Length Cable Assembly': 0.0,
'ModuleMediaType': 'sm_media_interface',
'VendorDate': '21010100',
'VendorOUI': 'xx-xx-xx'
'VendorOUI': 'xx-xx-xx',
'vdm_supported': True,
},
'400GAUI-8 C2M (Annex 120E)',
'400ZR, DWDM, amplified',
8, 1, 1, 1,
{'ActiveAppSelLane1': 1, 'ActiveAppSelLane2': 1, 'ActiveAppSelLane3': 1, 'ActiveAppSelLane4': 1,
'ActiveAppSelLane5': 1, 'ActiveAppSelLane6': 1, 'ActiveAppSelLane7': 1, 'ActiveAppSelLane8': 1},
{f'ActiveAppSelLane{i}': 1 for i in range(1, 9)},
'1550 nm DFB',
'0.0',
'5.0',
'0.1',
'0.0',
'sm_media_interface',
{'status': True, 'result': ("0.3.0", 1, 1, 0, "0.2.0", 0, 0, 0, "0.3.0", "0.2.0")}
{'status': True, 'result': ("0.3.0", 1, 1, 0, "0.2.0", 0, 0, 0, "0.3.0", "0.2.0")}
],
{ 'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
{ # Expected result
'type': 'QSFP-DD Double Density 8X Pluggable Transceiver',
'type_abbrv_name': 'QSFP-DD',
'model': 'ABCD',
'encoding': 'N/A',
'ext_identifier': 'Power Class 8 (20.0W Max)',
'ext_rateselect_compliance': 'N/A',
'cable_type': 'Length Cable Assembly(m)',
'cable_length': 0.0,
'nominal_bit_rate': 0,
'nominal_bit_rate': 'N/A',
'specification_compliance': 'sm_media_interface',
'application_advertisement': 'N/A',
'media_lane_count': 1,
Expand All @@ -1485,56 +1489,39 @@ def test_module_fw_upgrade(self, input_param, mock_response, expected):
'media_interface_code': '400ZR, DWDM, amplified',
'serial': '00000000',
'host_lane_count': 8,
'active_apsel_hostlane1': 1,
'active_apsel_hostlane3': 1,
'active_apsel_hostlane2': 1,
'active_apsel_hostlane5': 1,
'active_apsel_hostlane4': 1,
'active_apsel_hostlane7': 1,
'active_apsel_hostlane6': 1,
'active_apsel_hostlane8': 1,
**{f'active_apsel_hostlane{i}': 1 for i in range(1, 9)},
'hardware_rev': '0.0',
'cmis_rev': '5.0',
'media_lane_assignment_option': 1,
'connector': 'LC',
'host_lane_assignment_option': 1,
'vendor_date': '21010100'
'vendor_date': '21010100',
'vdm_supported': True,
}
)
])
def test_get_transceiver_info(self, mock_response, expected):
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = mock_response[0]
self.api.get_host_electrical_interface = MagicMock()
self.api.get_host_electrical_interface.return_value = mock_response[1]
self.api.get_module_media_interface = MagicMock()
self.api.get_module_media_interface.return_value = mock_response[2]
self.api.get_host_lane_count = MagicMock()
self.api.get_host_lane_count.return_value = mock_response[3]
self.api.get_media_lane_count = MagicMock()
self.api.get_media_lane_count.return_value = mock_response[4]
self.api.get_host_lane_assignment_option = MagicMock()
self.api.get_host_lane_assignment_option.return_value = mock_response[5]
self.api.get_media_lane_assignment_option = MagicMock()
self.api.get_media_lane_assignment_option.return_value = mock_response[6]
self.api.get_active_apsel_hostlane = MagicMock()
self.api.get_active_apsel_hostlane.return_value = mock_response[7]
self.api.get_media_interface_technology = MagicMock()
self.api.get_media_interface_technology.return_value = mock_response[8]
self.api.get_vendor_rev = MagicMock()
self.api.get_vendor_rev.return_value = mock_response[9]
self.api.get_cmis_rev = MagicMock()
self.api.get_cmis_rev.return_value = mock_response[10]
self.api.get_module_fw_info = MagicMock()
self.api.get_module_media_type = MagicMock()
self.api.get_module_media_type.return_value = mock_response[13]
self.api.get_module_hardware_revision = MagicMock()
self.api.get_module_hardware_revision.return_value = '0.0'
self.api.get_module_fw_info.return_value = mock_response[14]
self.api.is_flat_memory = MagicMock()
self.api.is_flat_memory.return_value = False
self.api.xcvr_eeprom.read = MagicMock(return_value = mock_response[0])
self.api.get_host_electrical_interface = MagicMock(return_value = mock_response[1])
self.api.get_module_media_interface = MagicMock(return_value = mock_response[2])
self.api.get_host_lane_count = MagicMock(return_value = mock_response[3])
self.api.get_media_lane_count = MagicMock(return_value = mock_response[4])
self.api.get_host_lane_assignment_option = MagicMock(return_value = mock_response[5])
self.api.get_media_lane_assignment_option = MagicMock(return_value = mock_response[6])
self.api.get_active_apsel_hostlane = MagicMock(return_value = mock_response[7])
self.api.get_media_interface_technology = MagicMock(return_value = mock_response[8])
self.api.get_vendor_rev = MagicMock(return_value = mock_response[9])
self.api.get_cmis_rev = MagicMock(return_value = mock_response[10])
self.api.get_module_fw_info = MagicMock(return_value = mock_response[14])
self.api.get_module_media_type = MagicMock(return_value = mock_response[13])
self.api.get_module_hardware_revision = MagicMock(return_value="0.0")
self.api.is_flat_memory = MagicMock(return_value=False)
self.api.is_transceiver_vdm_supported = MagicMock(return_value=True)

# Run test and validate output
result = self.api.get_transceiver_info()
assert result == expected

# Test negative path
self.api.get_cmis_rev.return_value = None
result = self.api.get_transceiver_info()
Expand Down Expand Up @@ -3082,7 +3069,7 @@ def test_get_error_description(self):
}
self.api.xcvr_eeprom.read = MagicMock()
self.api.xcvr_eeprom.read.return_value = 0x10

result = self.api.get_error_description()
assert result is 'OK'

Expand Down
Loading