From e63c60881a826e9608f69e1fe32d45042f145bad Mon Sep 17 00:00:00 2001 From: Bobby McGonigle Date: Fri, 31 Oct 2025 18:00:35 +0000 Subject: [PATCH 1/2] Breakup task_worker into separate functions --- sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py | 744 ++++++++++---------- 1 file changed, 374 insertions(+), 370 deletions(-) diff --git a/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py index 1c90fdec1..4260f6cf0 100644 --- a/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py +++ b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py @@ -706,401 +706,405 @@ def is_timer_expired(self, expired_time, current_time=None): return expired_time <= current_time - def task_worker(self): + def process_single_lport(self, lport, info): is_fast_reboot = common.is_fast_reboot_enabled() - # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - port_change_observer = PortChangeObserver(self.namespaces, helper_logger, - self.task_stopping_event, - self.on_port_update_event) + # Cache gearbox line lanes dictionary for this set of iterations over the port_dict + gearbox_lanes_dict = self.xcvr_table_helper.get_gearbox_line_lanes_dict() - while not self.task_stopping_event.is_set(): - # Handle port change event from main thread - port_change_observer.handle_port_update_event() + state = common.get_cmis_state_from_state_db(lport, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport))) + if state in CMIS_TERMINAL_STATES or state == CMIS_STATE_UNKNOWN: + if state != CMIS_STATE_READY: + self.port_dict[lport]['appl'] = 0 + self.port_dict[lport]['host_lanes_mask'] = 0 + return - # Cache gearbox line lanes dictionary for this set of iterations over the port_dict - gearbox_lanes_dict = self.xcvr_table_helper.get_gearbox_line_lanes_dict() + # Handle the case when Xcvrd was NOT running when 'host_tx_ready' or 'admin_status' + # was updated or this is the first run so reconcile the above two attributes + if 'host_tx_ready' not in self.port_dict[lport]: + self.port_dict[lport]['host_tx_ready'] = self.get_host_tx_status(lport) - for lport, info in self.port_dict.items(): - if self.task_stopping_event.is_set(): - break + if 'admin_status' not in self.port_dict[lport]: + self.port_dict[lport]['admin_status'] = self.get_port_admin_status(lport) - if lport not in self.port_dict: - continue + pport = int(info.get('index', "-1")) + speed = int(info.get('speed', "0")) + lanes = info.get('lanes', "").strip() + subport = info.get('subport', 0) + if pport < 0 or speed == 0 or len(lanes) < 1 or subport < 0: + return - state = common.get_cmis_state_from_state_db(lport, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport))) - if state in CMIS_TERMINAL_STATES or state == CMIS_STATE_UNKNOWN: - if state != CMIS_STATE_READY: - self.port_dict[lport]['appl'] = 0 - self.port_dict[lport]['host_lanes_mask'] = 0 - continue + # Desired port speed on the host side + host_speed = speed + host_lane_count = self.get_host_lane_count(lport, lanes, gearbox_lanes_dict) - # Handle the case when Xcvrd was NOT running when 'host_tx_ready' or 'admin_status' - # was updated or this is the first run so reconcile the above two attributes - if 'host_tx_ready' not in self.port_dict[lport]: - self.port_dict[lport]['host_tx_ready'] = self.get_host_tx_status(lport) + # double-check the HW presence before moving forward + sfp = self.platform_chassis.get_sfp(pport) + if not sfp.get_presence(): + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) + return - if 'admin_status' not in self.port_dict[lport]: - self.port_dict[lport]['admin_status'] = self.get_port_admin_status(lport) + try: + # Skip if XcvrApi is not supported + api = sfp.get_xcvr_api() + if api is None: + self.log_error("{}: skipping CMIS state machine since no xcvr api!!!".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return - pport = int(info.get('index', "-1")) - speed = int(info.get('speed', "0")) - lanes = info.get('lanes', "").strip() - subport = info.get('subport', 0) - if pport < 0 or speed == 0 or len(lanes) < 1 or subport < 0: - continue + # Skip if it's not a paged memory device + if api.is_flat_memory(): + self.log_notice("{}: skipping CMIS state machine for flat memory xcvr".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return - # Desired port speed on the host side - host_speed = speed - host_lane_count = self.get_host_lane_count(lport, lanes, gearbox_lanes_dict) + # Skip if it's not a CMIS module + type = api.get_module_type_abbreviation() + if (type is None) or (type not in self.CMIS_MODULE_TYPES): + self.log_notice("{}: skipping CMIS state machine for non-CMIS module with type {}".format(lport, type)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return - # double-check the HW presence before moving forward - sfp = self.platform_chassis.get_sfp(pport) - if not sfp.get_presence(): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) - continue + if api.is_coherent_module(): + if 'tx_power' not in self.port_dict[lport]: + self.port_dict[lport]['tx_power'] = self.get_configured_tx_power_from_db(lport) + if 'laser_freq' not in self.port_dict[lport]: + self.port_dict[lport]['laser_freq'] = self.get_configured_laser_freq_from_db(lport) + except AttributeError: + # Skip if these essential routines are not available + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return + except Exception as e: + self.log_error("{}: Exception in xcvr api: {}".format(lport, e)) + common.log_exception_traceback() + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + return - try: - # Skip if XcvrApi is not supported - api = sfp.get_xcvr_api() - if api is None: - self.log_error("{}: skipping CMIS state machine since no xcvr api!!!".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - - # Skip if it's not a paged memory device - if api.is_flat_memory(): - self.log_notice("{}: skipping CMIS state machine for flat memory xcvr".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - - # Skip if it's not a CMIS module - type = api.get_module_type_abbreviation() - if (type is None) or (type not in self.CMIS_MODULE_TYPES): - self.log_notice("{}: skipping CMIS state machine for non-CMIS module with type {}".format(lport, type)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue + # CMIS expiration and retries + # + # A retry should always start over at INSETRTED state, while the + # expiration will reset the state to INSETRTED and advance the + # retry counter + expired = self.port_dict[lport].get('cmis_expired') + retries = self.port_dict[lport].get('cmis_retries', 0) + host_lanes_mask = self.port_dict[lport].get('host_lanes_mask', 0) + appl = self.port_dict[lport].get('appl', 0) + # appl can be 0 if this lport is in decommission state machine, which should not be considered as failed case. + if state != CMIS_STATE_INSERTED and not self.is_decomm_lead_lport(lport) and (host_lanes_mask <= 0 or appl < 1): + self.log_error("{}: Unexpected value for host_lanes_mask {} or appl {} in " + "{} state".format(lport, host_lanes_mask, appl, state)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + return - if api.is_coherent_module(): - if 'tx_power' not in self.port_dict[lport]: - self.port_dict[lport]['tx_power'] = self.get_configured_tx_power_from_db(lport) - if 'laser_freq' not in self.port_dict[lport]: - self.port_dict[lport]['laser_freq'] = self.get_configured_laser_freq_from_db(lport) - except AttributeError: - # Skip if these essential routines are not available - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - except Exception as e: - self.log_error("{}: Exception in xcvr api: {}".format(lport, e)) - common.log_exception_traceback() - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue + self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}{}, Module state={}, DP state={}, appl {} host_lane_count {} " + "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, + "(decommission" + ("*" if self.is_decomm_lead_lport(lport) else "") + ")" + if self.is_decomm_pending(lport) else "", + api.get_module_state(), api.get_datapath_state(), appl, host_lane_count, retries)) + if retries > self.CMIS_MAX_RETRIES: + self.log_error("{}: FAILED".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + return - # CMIS expiration and retries - # - # A retry should always start over at INSETRTED state, while the - # expiration will reset the state to INSETRTED and advance the - # retry counter - expired = self.port_dict[lport].get('cmis_expired') - retries = self.port_dict[lport].get('cmis_retries', 0) - host_lanes_mask = self.port_dict[lport].get('host_lanes_mask', 0) - appl = self.port_dict[lport].get('appl', 0) - # appl can be 0 if this lport is in decommission state machine, which should not be considered as failed case. - if state != CMIS_STATE_INSERTED and not self.is_decomm_lead_lport(lport) and (host_lanes_mask <= 0 or appl < 1): - self.log_error("{}: Unexpected value for host_lanes_mask {} or appl {} in " - "{} state".format(lport, host_lanes_mask, appl, state)) + try: + # CMIS state transitions + if state == CMIS_STATE_INSERTED: + self.port_dict[lport]['appl'] = common.get_cmis_application_desired(api, host_lane_count, host_speed) + if self.port_dict[lport]['appl'] is None: + self.log_error("{}: no suitable app for the port appl {} host_lane_count {} " + "host_speed {}".format(lport, appl, host_lane_count, host_speed)) self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - - self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}{}, Module state={}, DP state={}, appl {} host_lane_count {} " - "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, - "(decommission" + ("*" if self.is_decomm_lead_lport(lport) else "") + ")" - if self.is_decomm_pending(lport) else "", - api.get_module_state(), api.get_datapath_state(), appl, host_lane_count, retries)) - if retries > self.CMIS_MAX_RETRIES: - self.log_error("{}: FAILED".format(lport)) + return + appl = self.port_dict[lport]['appl'] + self.log_notice("{}: Setting appl={}".format(lport, appl)) + + self.port_dict[lport]['host_lanes_mask'] = self.get_cmis_host_lanes_mask(api, + appl, host_lane_count, subport) + if self.port_dict[lport]['host_lanes_mask'] <= 0: + self.log_error("{}: Invalid lane mask received - host_lane_count {} subport {} " + "appl {}!".format(lport, host_lane_count, subport, appl)) self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - - try: - # CMIS state transitions - if state == CMIS_STATE_INSERTED: - self.port_dict[lport]['appl'] = common.get_cmis_application_desired(api, host_lane_count, host_speed) - if self.port_dict[lport]['appl'] is None: - self.log_error("{}: no suitable app for the port appl {} host_lane_count {} " - "host_speed {}".format(lport, appl, host_lane_count, host_speed)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - appl = self.port_dict[lport]['appl'] - self.log_notice("{}: Setting appl={}".format(lport, appl)) - - self.port_dict[lport]['host_lanes_mask'] = self.get_cmis_host_lanes_mask(api, - appl, host_lane_count, subport) - if self.port_dict[lport]['host_lanes_mask'] <= 0: - self.log_error("{}: Invalid lane mask received - host_lane_count {} subport {} " - "appl {}!".format(lport, host_lane_count, subport, appl)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] - self.log_notice("{}: Setting host_lanemask=0x{:x}".format(lport, host_lanes_mask)) - - self.port_dict[lport]['media_lane_count'] = int(api.get_media_lane_count(appl)) - self.port_dict[lport]['media_lane_assignment_options'] = int(api.get_media_lane_assignment_option(appl)) - media_lane_count = self.port_dict[lport]['media_lane_count'] - media_lane_assignment_options = self.port_dict[lport]['media_lane_assignment_options'] - self.port_dict[lport]['media_lanes_mask'] = self.get_cmis_media_lanes_mask(api, - appl, lport, subport) - if self.port_dict[lport]['media_lanes_mask'] <= 0: - self.log_error("{}: Invalid media lane mask received - media_lane_count {} " - "media_lane_assignment_options {} subport {}" - " appl {}!".format(lport, media_lane_count, media_lane_assignment_options, subport, appl)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - self.log_notice("{}: Setting media_lanemask=0x{:x}".format(lport, media_lanes_mask)) - - if self.is_decommission_required(api, appl): - self.set_decomm_pending(lport) - - if self.is_decomm_lead_lport(lport): - # Set all the DP lanes AppSel to unused(0) when non default app code needs to be configured - self.port_dict[lport]['appl'] = appl = 0 - self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask = self.ALL_LANES_MASK - self.port_dict[lport]['media_lanes_mask'] = self.ALL_LANES_MASK - self.log_notice("{}: DECOMMISSION: setting appl={} and " - "host_lanes_mask/media_lanes_mask={:#x}".format(lport, appl, self.ALL_LANES_MASK)) - # Skip rest of the deinit/pre-init when this is the lead logical port for decommission - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) - continue - elif self.is_decomm_pending(lport): - if self.is_decomm_failed(lport): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - decomm_status_str = "failed" + return + host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] + self.log_notice("{}: Setting host_lanemask=0x{:x}".format(lport, host_lanes_mask)) + + self.port_dict[lport]['media_lane_count'] = int(api.get_media_lane_count(appl)) + self.port_dict[lport]['media_lane_assignment_options'] = int(api.get_media_lane_assignment_option(appl)) + media_lane_count = self.port_dict[lport]['media_lane_count'] + media_lane_assignment_options = self.port_dict[lport]['media_lane_assignment_options'] + self.port_dict[lport]['media_lanes_mask'] = self.get_cmis_media_lanes_mask(api, + appl, lport, subport) + if self.port_dict[lport]['media_lanes_mask'] <= 0: + self.log_error("{}: Invalid media lane mask received - media_lane_count {} " + "media_lane_assignment_options {} subport {}" + " appl {}!".format(lport, media_lane_count, media_lane_assignment_options, subport, appl)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + return + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + self.log_notice("{}: Setting media_lanemask=0x{:x}".format(lport, media_lanes_mask)) + + if self.is_decommission_required(api, appl): + self.set_decomm_pending(lport) + + if self.is_decomm_lead_lport(lport): + # Set all the DP lanes AppSel to unused(0) when non default app code needs to be configured + self.port_dict[lport]['appl'] = appl = 0 + self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask = self.ALL_LANES_MASK + self.port_dict[lport]['media_lanes_mask'] = self.ALL_LANES_MASK + self.log_notice("{}: DECOMMISSION: setting appl={} and " + "host_lanes_mask/media_lanes_mask={:#x}".format(lport, appl, self.ALL_LANES_MASK)) + # Skip rest of the deinit/pre-init when this is the lead logical port for decommission + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) + return + elif self.is_decomm_pending(lport): + if self.is_decomm_failed(lport): + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + decomm_status_str = "failed" + else: + decomm_status_str = "waiting for completion" + self.log_notice("{}: DECOMMISSION: decommission has already started for this physical port, " + "{}".format(lport, decomm_status_str)) + return + + if self.port_dict[lport]['host_tx_ready'] != 'true' or \ + self.port_dict[lport]['admin_status'] != 'up': + if is_fast_reboot and self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): + self.log_notice("{} Skip datapath re-init in fast-reboot".format(lport)) + else: + self.log_notice("{} Forcing Tx laser OFF".format(lport)) + # Force DataPath re-init + api.tx_disable_channel(media_lanes_mask, True) + self.port_dict[lport]['forced_tx_disabled'] = True + txoff_duration = self.get_cmis_dp_tx_turnoff_duration_secs(api) + self.log_notice("{}: Tx turn off duration {} secs".format(lport, txoff_duration)) + self.update_cmis_state_expiration_time(lport, txoff_duration) + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask, reset_apsel=True) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_PRE_INIT_CHECK) + if state == CMIS_STATE_DP_PRE_INIT_CHECK: + if self.port_dict[lport].get('forced_tx_disabled', False): + # Ensure that Tx is OFF + # Transceiver will remain in DataPathDeactivated state while it is in Low Power Mode (even if Tx is disabled) + # Transceiver will enter DataPathInitialized state if Tx was disabled after CMIS initialization was completed + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated', 'DataPathInitialized']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathDeactivated/DataPathInitialized'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + self.port_dict[lport]['forced_tx_disabled'] = False + self.log_notice("{}: Tx laser is successfully turned OFF".format(lport)) + + # Configure the target output power if ZR module + if api.is_coherent_module(): + tx_power = self.port_dict[lport]['tx_power'] + # Prevent configuring same tx power multiple times + if 0 != tx_power and tx_power != api.get_tx_config_power(): + if 1 != self.configure_tx_output_power(api, lport, tx_power): + self.log_error("{} failed to configure Tx power = {}".format(lport, tx_power)) + else: + self.log_notice("{} Successfully configured Tx power = {}".format(lport, tx_power)) + + need_update = self.is_cmis_application_update_required(api, appl, host_lanes_mask) + + # For ZR module, Datapath needs to be re-initialized on new channel selection + if api.is_coherent_module(): + freq = self.port_dict[lport]['laser_freq'] + # If user requested frequency is NOT the same as configured on the module + # force datapath re-initialization + if 0 != freq and freq != api.get_laser_config_freq(): + if self.validate_frequency_and_grid(api, lport, freq) == True: + need_update = True + else: + # clear setting of invalid frequency config + self.port_dict[lport]['laser_freq'] = 0 + + if not need_update: + # No application updates + # As part of xcvrd restart, the TRANSCEIVER_INFO table is deleted and + # created with default value of 'N/A' for all the active apsel fields. + # The below (post_port_active_apsel_to_db) will ensure that the + # active apsel fields are updated correctly in the DB since + # the CMIS state remains unchanged during xcvrd restart + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) + self.log_notice("{}: no CMIS application update required...READY".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + return + self.log_notice("{}: force Datapath reinit".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) + elif state == CMIS_STATE_DP_DEINIT: + # D.2.2 Software Deinitialization + api.set_datapath_deinit(host_lanes_mask) + + # D.1.3 Software Configuration and Initialization + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + if not api.tx_disable_channel(media_lanes_mask, True): + self.log_notice("{}: unable to turn off tx power with host_lanes_mask {}".format(lport, host_lanes_mask)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + return + + #Sets module to high power mode and doesn't impact datapath if module is already in high power mode + api.set_lpmode(False, wait_state_change = False) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_AP_CONF) + dpDeinitDuration = self.get_cmis_dp_deinit_duration_secs(api) + modulePwrUpDuration = self.get_cmis_module_power_up_duration_secs(api) + self.log_notice("{}: DpDeinit duration {} secs, modulePwrUp duration {} secs".format(lport, dpDeinitDuration, modulePwrUpDuration)) + self.update_cmis_state_expiration_time(lport, max(modulePwrUpDuration, dpDeinitDuration)) + + elif state == CMIS_STATE_AP_CONF: + # Explicit control bit to apply custom Host SI settings. + # It will be set to 1 and applied via set_application if + # custom SI settings is applicable + ec = 0 + + # TODO: Use fine grained time when the CMIS memory map is available + if not self.check_module_state(api, ['ModuleReady']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathDeactivated state'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + # Skip rest if it's in decommission state machine + if not self.is_decomm_pending(lport): + if api.is_coherent_module(): + # For ZR module, configure the laser frequency when Datapath is in Deactivated state + freq = self.port_dict[lport]['laser_freq'] + if 0 != freq: + if 1 != self.configure_laser_frequency(api, lport, freq): + self.log_error("{} failed to configure laser frequency {} GHz".format(lport, freq)) else: - decomm_status_str = "waiting for completion" - self.log_notice("{}: DECOMMISSION: decommission has already started for this physical port, " - "{}".format(lport, decomm_status_str)) - continue - - if self.port_dict[lport]['host_tx_ready'] != 'true' or \ - self.port_dict[lport]['admin_status'] != 'up': - if is_fast_reboot and self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): - self.log_notice("{} Skip datapath re-init in fast-reboot".format(lport)) - else: - self.log_notice("{} Forcing Tx laser OFF".format(lport)) - # Force DataPath re-init - api.tx_disable_channel(media_lanes_mask, True) - self.port_dict[lport]['forced_tx_disabled'] = True - txoff_duration = self.get_cmis_dp_tx_turnoff_duration_secs(api) - self.log_notice("{}: Tx turn off duration {} secs".format(lport, txoff_duration)) - self.update_cmis_state_expiration_time(lport, txoff_duration) - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask, reset_apsel=True) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_PRE_INIT_CHECK) - if state == CMIS_STATE_DP_PRE_INIT_CHECK: - if self.port_dict[lport].get('forced_tx_disabled', False): - # Ensure that Tx is OFF - # Transceiver will remain in DataPathDeactivated state while it is in Low Power Mode (even if Tx is disabled) - # Transceiver will enter DataPathInitialized state if Tx was disabled after CMIS initialization was completed - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated', 'DataPathInitialized']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathDeactivated/DataPathInitialized'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - self.port_dict[lport]['forced_tx_disabled'] = False - self.log_notice("{}: Tx laser is successfully turned OFF".format(lport)) - - # Configure the target output power if ZR module - if api.is_coherent_module(): - tx_power = self.port_dict[lport]['tx_power'] - # Prevent configuring same tx power multiple times - if 0 != tx_power and tx_power != api.get_tx_config_power(): - if 1 != self.configure_tx_output_power(api, lport, tx_power): - self.log_error("{} failed to configure Tx power = {}".format(lport, tx_power)) - else: - self.log_notice("{} Successfully configured Tx power = {}".format(lport, tx_power)) - - need_update = self.is_cmis_application_update_required(api, appl, host_lanes_mask) - - # For ZR module, Datapath needs to be re-initialized on new channel selection - if api.is_coherent_module(): - freq = self.port_dict[lport]['laser_freq'] - # If user requested frequency is NOT the same as configured on the module - # force datapath re-initialization - if 0 != freq and freq != api.get_laser_config_freq(): - if self.validate_frequency_and_grid(api, lport, freq) == True: - need_update = True - else: - # clear setting of invalid frequency config - self.port_dict[lport]['laser_freq'] = 0 - - if not need_update: - # No application updates - # As part of xcvrd restart, the TRANSCEIVER_INFO table is deleted and - # created with default value of 'N/A' for all the active apsel fields. - # The below (post_port_active_apsel_to_db) will ensure that the - # active apsel fields are updated correctly in the DB since - # the CMIS state remains unchanged during xcvrd restart - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) - self.log_notice("{}: no CMIS application update required...READY".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - self.log_notice("{}: force Datapath reinit".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) - elif state == CMIS_STATE_DP_DEINIT: - # D.2.2 Software Deinitialization - api.set_datapath_deinit(host_lanes_mask) - - # D.1.3 Software Configuration and Initialization - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - if not api.tx_disable_channel(media_lanes_mask, True): - self.log_notice("{}: unable to turn off tx power with host_lanes_mask {}".format(lport, host_lanes_mask)) - self.port_dict[lport]['cmis_retries'] = retries + 1 - continue - - #Sets module to high power mode and doesn't impact datapath if module is already in high power mode - api.set_lpmode(False, wait_state_change = False) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_AP_CONF) - dpDeinitDuration = self.get_cmis_dp_deinit_duration_secs(api) - modulePwrUpDuration = self.get_cmis_module_power_up_duration_secs(api) - self.log_notice("{}: DpDeinit duration {} secs, modulePwrUp duration {} secs".format(lport, dpDeinitDuration, modulePwrUpDuration)) - self.update_cmis_state_expiration_time(lport, max(modulePwrUpDuration, dpDeinitDuration)) - - elif state == CMIS_STATE_AP_CONF: - # Explicit control bit to apply custom Host SI settings. - # It will be set to 1 and applied via set_application if - # custom SI settings is applicable - ec = 0 - - # TODO: Use fine grained time when the CMIS memory map is available - if not self.check_module_state(api, ['ModuleReady']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) + self.log_notice("{} configured laser frequency {} GHz".format(lport, freq)) + + # Stage custom SI settings + if optics_si_parser.optics_si_present(): + optics_si_dict = {} + # Apply module SI settings if applicable + lane_speed = int(speed/1000)//host_lane_count + optics_si_dict = optics_si_parser.fetch_optics_si_setting(pport, lane_speed, sfp) + + self.log_debug("Read SI parameters for port {} from optics_si_settings.json vendor file:".format(lport)) + for key, sub_dict in optics_si_dict.items(): + self.log_debug("{}".format(key)) + for sub_key, value in sub_dict.items(): + self.log_debug("{}: {}".format(sub_key, str(value))) + + if optics_si_dict: + self.log_notice("{}: Apply Optics SI found for Vendor: {} PN: {} lane speed: {}G". + format(lport, api.get_manufacturer(), api.get_model(), lane_speed)) + if not api.stage_custom_si_settings(host_lanes_mask, optics_si_dict): + self.log_notice("{}: unable to stage custom SI settings ".format(lport)) self.force_cmis_reinit(lport, retries + 1) - continue + return + + # Set Explicit control bit to apply Custom Host SI settings + ec = 1 + + # D.1.3 Software Configuration and Initialization + api.set_application(host_lanes_mask, appl, ec) + if not api.scs_apply_datapath_init(host_lanes_mask): + self.log_notice("{}: unable to set application and stage DP init".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_INIT) + elif state == CMIS_STATE_DP_INIT: + if not self.check_config_error(api, host_lanes_mask, ['ConfigSuccess']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'ConfigSuccess', current ConfigStatus: " + "{}".format(lport, list(api.get_config_datapath_hostlane_status().values()))) + self.force_cmis_reinit(lport, retries + 1) + return + + # Clear decommission status and invoke CMIS reinit so that normal CMIS initialization can begin + if self.is_decomm_pending(lport): + self.log_notice("{}: DECOMMISSION: done for physical port {}".format(lport, self.port_dict[lport]['index'])) + self.clear_decomm_pending(lport) + self.force_cmis_reinit(lport) + return + + if hasattr(api, 'get_cmis_rev'): + # Check datapath init pending on module that supports CMIS 5.x + majorRev = int(api.get_cmis_rev().split('.')[0]) + if majorRev >= 5 and not self.check_datapath_init_pending(api, host_lanes_mask): + self.log_notice("{}: datapath init not pending".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + # Ensure the Datapath is NOT Activated unless the host Tx siganl is good. + # NOTE: Some CMIS compliant modules may have 'auto-squelch' feature where + # the module won't take datapaths to Activated state if host tries to enable + # the datapaths while there is no good Tx signal from the host-side. + if self.port_dict[lport]['admin_status'] != 'up' or \ + self.port_dict[lport]['host_tx_ready'] != 'true': + self.log_notice("{} waiting for host tx ready...".format(lport)) + return + + # D.1.3 Software Configuration and Initialization + api.set_datapath_init(host_lanes_mask) + dpInitDuration = self.get_cmis_dp_init_duration_secs(api) + self.log_notice("{}: DpInit duration {} secs".format(lport, dpInitDuration)) + self.update_cmis_state_expiration_time(lport, dpInitDuration) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_TXON) + elif state == CMIS_STATE_DP_TXON: + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathInitialized']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + # Turn ON the laser + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + api.tx_disable_channel(media_lanes_mask, False) + self.log_notice("{}: Turning ON tx power".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_ACTIVATE) + elif state == CMIS_STATE_DP_ACTIVATE: + # Use dpInitDuration instead of MaxDurationDPTxTurnOn because + # some modules rely on dpInitDuration to turn on the Tx signal. + # This behavior deviates from the CMIS spec but is honored + # to prevent old modules from breaking with new sonic + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + return + + self.log_notice("{}: READY".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathDeactivated state'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Skip rest if it's in decommission state machine - if not self.is_decomm_pending(lport): - if api.is_coherent_module(): - # For ZR module, configure the laser frequency when Datapath is in Deactivated state - freq = self.port_dict[lport]['laser_freq'] - if 0 != freq: - if 1 != self.configure_laser_frequency(api, lport, freq): - self.log_error("{} failed to configure laser frequency {} GHz".format(lport, freq)) - else: - self.log_notice("{} configured laser frequency {} GHz".format(lport, freq)) - - # Stage custom SI settings - if optics_si_parser.optics_si_present(): - optics_si_dict = {} - # Apply module SI settings if applicable - lane_speed = int(speed/1000)//host_lane_count - optics_si_dict = optics_si_parser.fetch_optics_si_setting(pport, lane_speed, sfp) - - self.log_debug("Read SI parameters for port {} from optics_si_settings.json vendor file:".format(lport)) - for key, sub_dict in optics_si_dict.items(): - self.log_debug("{}".format(key)) - for sub_key, value in sub_dict.items(): - self.log_debug("{}: {}".format(sub_key, str(value))) - - if optics_si_dict: - self.log_notice("{}: Apply Optics SI found for Vendor: {} PN: {} lane speed: {}G". - format(lport, api.get_manufacturer(), api.get_model(), lane_speed)) - if not api.stage_custom_si_settings(host_lanes_mask, optics_si_dict): - self.log_notice("{}: unable to stage custom SI settings ".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Set Explicit control bit to apply Custom Host SI settings - ec = 1 - - # D.1.3 Software Configuration and Initialization - api.set_application(host_lanes_mask, appl, ec) - if not api.scs_apply_datapath_init(host_lanes_mask): - self.log_notice("{}: unable to set application and stage DP init".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_INIT) - elif state == CMIS_STATE_DP_INIT: - if not self.check_config_error(api, host_lanes_mask, ['ConfigSuccess']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'ConfigSuccess', current ConfigStatus: " - "{}".format(lport, list(api.get_config_datapath_hostlane_status().values()))) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Clear decommission status and invoke CMIS reinit so that normal CMIS initialization can begin - if self.is_decomm_pending(lport): - self.log_notice("{}: DECOMMISSION: done for physical port {}".format(lport, self.port_dict[lport]['index'])) - self.clear_decomm_pending(lport) - self.force_cmis_reinit(lport) - continue - - if hasattr(api, 'get_cmis_rev'): - # Check datapath init pending on module that supports CMIS 5.x - majorRev = int(api.get_cmis_rev().split('.')[0]) - if majorRev >= 5 and not self.check_datapath_init_pending(api, host_lanes_mask): - self.log_notice("{}: datapath init not pending".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Ensure the Datapath is NOT Activated unless the host Tx siganl is good. - # NOTE: Some CMIS compliant modules may have 'auto-squelch' feature where - # the module won't take datapaths to Activated state if host tries to enable - # the datapaths while there is no good Tx signal from the host-side. - if self.port_dict[lport]['admin_status'] != 'up' or \ - self.port_dict[lport]['host_tx_ready'] != 'true': - self.log_notice("{} waiting for host tx ready...".format(lport)) - continue - - # D.1.3 Software Configuration and Initialization - api.set_datapath_init(host_lanes_mask) - dpInitDuration = self.get_cmis_dp_init_duration_secs(api) - self.log_notice("{}: DpInit duration {} secs".format(lport, dpInitDuration)) - self.update_cmis_state_expiration_time(lport, dpInitDuration) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_TXON) - elif state == CMIS_STATE_DP_TXON: - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathInitialized']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Turn ON the laser - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - api.tx_disable_channel(media_lanes_mask, False) - self.log_notice("{}: Turning ON tx power".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_ACTIVATE) - elif state == CMIS_STATE_DP_ACTIVATE: - # Use dpInitDuration instead of MaxDurationDPTxTurnOn because - # some modules rely on dpInitDuration to turn on the Tx signal. - # This behavior deviates from the CMIS spec but is honored - # to prevent old modules from breaking with new sonic - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue + except Exception as e: + self.log_error("{}: internal errors due to {}".format(lport, e)) + common.log_exception_traceback() + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - self.log_notice("{}: READY".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) + def task_worker(self): + # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal + port_change_observer = PortChangeObserver(self.namespaces, helper_logger, + self.task_stopping_event, + self.on_port_update_event) - except Exception as e: - self.log_error("{}: internal errors due to {}".format(lport, e)) - common.log_exception_traceback() - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_change_observer.handle_port_update_event() + + for lport, info in self.port_dict.items(): + if self.task_stopping_event.is_set(): + break + + if lport not in self.port_dict: + continue + + self.process_single_lport(lport, info) self.log_notice("Stopped") + def run(self): if self.platform_chassis is None: self.log_notice("Platform chassis is not available, stopping...") From 78b3c81725dd2f84239ba48ed5c2a416e9b3e3c6 Mon Sep 17 00:00:00 2001 From: Bobby McGonigle Date: Mon, 3 Nov 2025 23:01:18 +0000 Subject: [PATCH 2/2] Fix error where gearbox_lanes_dict is fetched once per lport instead of once per iteration over all ports. --- sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py index 4260f6cf0..bcb259081 100644 --- a/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py +++ b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py @@ -706,12 +706,9 @@ def is_timer_expired(self, expired_time, current_time=None): return expired_time <= current_time - def process_single_lport(self, lport, info): + def process_single_lport(self, lport, info, gearbox_lanes_dict): is_fast_reboot = common.is_fast_reboot_enabled() - # Cache gearbox line lanes dictionary for this set of iterations over the port_dict - gearbox_lanes_dict = self.xcvr_table_helper.get_gearbox_line_lanes_dict() - state = common.get_cmis_state_from_state_db(lport, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport))) if state in CMIS_TERMINAL_STATES or state == CMIS_STATE_UNKNOWN: if state != CMIS_STATE_READY: @@ -1093,6 +1090,9 @@ def task_worker(self): # Handle port change event from main thread port_change_observer.handle_port_update_event() + # Cache gearbox line lanes dictionary once per iteration over all ports + gearbox_lanes_dict = self.xcvr_table_helper.get_gearbox_line_lanes_dict() + for lport, info in self.port_dict.items(): if self.task_stopping_event.is_set(): break @@ -1100,7 +1100,7 @@ def task_worker(self): if lport not in self.port_dict: continue - self.process_single_lport(lport, info) + self.process_single_lport(lport, info, gearbox_lanes_dict) self.log_notice("Stopped")