diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 4e97f4edb..de28b9238 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -4674,6 +4674,57 @@ def test_load_media_settings_file_from_platform_folder(self): def test_load_media_settings_file_from_hwsku_folder(self): assert media_settings_parser.load_media_settings() != {} + @patch('xcvrd.xcvrd.platform_chassis') + def test_get_is_lpo_exception(self, mock_chassis): + mock_sfp = MagicMock() + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp, side_effect=AttributeError) + result = media_settings_parser.get_is_lpo(0) + assert result == False + + def test_media_settings_parser_get_media_type(self): + physical_port = 0 + + # For the case of extended media type matching enabled for LPO: + media_settings_g_dict = { + ENABLE_EXTENDED_MEDIA_TYPE_MATCHING: [MEDIA_LPO] + } + + # Test LPO module + with patch('xcvrd.xcvrd_utilities.media_settings_parser.g_dict', media_settings_g_dict): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_copper', return_value=False): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_lpo', return_value=True): + assert media_settings_parser.get_media_type(physical_port) == MEDIA_LPO + + # Test non-LPO optical module + with patch('xcvrd.xcvrd_utilities.media_settings_parser.g_dict', media_settings_g_dict): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_copper', return_value=False): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_lpo', return_value=False): + assert media_settings_parser.get_media_type(physical_port) == MEDIA_OPTICAL + + # Test copper module + with patch('xcvrd.xcvrd_utilities.media_settings_parser.g_dict', media_settings_g_dict): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_copper', return_value=True): + media_type = media_settings_parser.get_media_type(physical_port) + assert media_type == MEDIA_COPPER + + # For the case of extended media type matching not enabled for LPO: + media_settings_g_dict = {ENABLE_EXTENDED_MEDIA_TYPE_MATCHING: []} + + # Test LPO module in the case of extended media type matching not enabled for LPO + with patch('xcvrd.xcvrd_utilities.media_settings_parser.g_dict', media_settings_g_dict): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_copper', return_value=False): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_lpo', return_value=True): + assert media_settings_parser.get_media_type(physical_port) == MEDIA_OPTICAL + + # For the case of extended media type matching not enabled at all: + media_settings_g_dict = {} + + # Test LPO module in the case of extended media type matching not enabled at all + with patch('xcvrd.xcvrd_utilities.media_settings_parser.g_dict', media_settings_g_dict): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_copper', return_value=False): + with patch('xcvrd.xcvrd_utilities.media_settings_parser.get_is_lpo', return_value=True): + assert media_settings_parser.get_media_type(physical_port) == MEDIA_OPTICAL + @pytest.mark.parametrize("lport, freq, grid, expected", [ (1, 193100, 75, True), (1, 193100, 100, False), diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/media_settings_parser.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/media_settings_parser.py index 2ca2f7443..04bc04e86 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/media_settings_parser.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/media_settings_parser.py @@ -24,6 +24,18 @@ DEFAULT_KEY = 'Default' # This is useful if default value is desired when no match is found for lane speed key LANE_SPEED_DEFAULT_KEY = LANE_SPEED_KEY_PREFIX + DEFAULT_KEY + +# Standard media types: +MEDIA_COPPER = 'COPPER' +MEDIA_OPTICAL = 'OPTICAL' +# Extended media types: +MEDIA_LPO = 'LPO' + +# This is a top-level key in media_settings.json that can be added to enable +# matching extended media types in addition to COPPER and OPTICAL. Its value is +# a list of extended media types to match. +ENABLE_EXTENDED_MEDIA_TYPE_MATCHING = 'ENABLE_EXTENDED_MEDIA_TYPE_MATCHING' + SYSLOG_IDENTIFIER = "xcvrd" helper_logger = syslogger.SysLogger(SYSLOG_IDENTIFIER, enable_runtime_config=True) @@ -62,6 +74,57 @@ def get_is_copper(physical_port): helper_logger.log_debug(f"No is_copper() defined for xcvr api on physical port {physical_port}, assuming Copper") return True + +def get_is_lpo(physical_port): + """ + Check if the transceiver is Linear Pluggable Optics (LPO) + + Args: + physical_port: physical port number + Returns: + True if the transceiver is LPO, False otherwise + """ + if xcvrd.platform_chassis: + try: + return xcvrd.platform_chassis.get_sfp(physical_port).get_xcvr_api().is_lpo() + except (NotImplementedError, AttributeError): + helper_logger.log_debug(f"No is_lpo() defined for xcvr api on physical port {physical_port}, assuming not LPO") + return False + + +def is_extended_media_type_enabled(media_type): + """ + Helper function to check if an extended media type is enabled in media_settings.json + + Args: + media_type: The extended media type string to check + + Returns: + True if the extended media type is enabled, False otherwise + """ + return media_type in g_dict.get(ENABLE_EXTENDED_MEDIA_TYPE_MATCHING, []) + + +def get_media_type(physical_port): + """ + Get the media type of the transceiver + + Args: + physical_port: physical port number + + Returns: + 'COPPER', 'OPTICAL' or extended media types included in + ENABLE_EXTENDED_MEDIA_TYPE_MATCHING of media_settings.json + """ + if get_is_copper(physical_port): + return MEDIA_COPPER + + # For optical transceivers: + if is_extended_media_type_enabled(MEDIA_LPO) and get_is_lpo(physical_port): + return MEDIA_LPO + return MEDIA_OPTICAL + + def get_lane_speed_key(physical_port, port_speed, lane_count): """ Get lane speed key for the given port @@ -148,9 +211,8 @@ def get_media_settings_key(physical_port, transceiver_dict, port_speed, lane_cou media_key += '-' + '*' lane_speed_key = get_lane_speed_key(physical_port, port_speed, lane_count) - medium = "COPPER" if get_is_copper(physical_port) else "OPTICAL" speed = int(int(int(port_speed) /lane_count)/1000) - medium_lane_speed_key = medium + str(speed) + medium_lane_speed_key = get_media_type(physical_port) + str(speed) # return (vendor_key, media_key, lane_speed_key) return { VENDOR_KEY: vendor_key,