diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index c400815fa..41887be39 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -477,7 +477,7 @@ class CmisManagerTask(threading.Thread): CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP', 'OSFP-8X', 'QSFP+C'] CMIS_MAX_HOST_LANES = 8 CMIS_EXPIRATION_BUFFER_MS = 2 - ALL_LANES_MASK = 0xff + ACTIVEAPPSEL_LANE_KEY_PREFIX = 'ActiveAppSelLane{}' def __init__(self, namespaces, port_mapping, main_thread_stop_event, skip_cmis_mgr=False): threading.Thread.__init__(self) @@ -562,6 +562,12 @@ def on_port_update_event(self, port_change_event): if 'subport' in port_change_event.port_dict: self.port_dict[lport]['subport'] = int(port_change_event.port_dict['subport']) + # Decommissioning doesn't care about host_tx_ready change event, no need to force_cmis_reinit + if (self.is_decomm_pending(lport) and + port_change_event.db_name == 'STATE_DB' and + port_change_event.table_name == swsscommon.STATE_PORT_TABLE_NAME): + return + self.force_cmis_reinit(lport, 0) elif port_change_event.event_type == port_change_event.PORT_DEL: @@ -698,34 +704,91 @@ def get_cmis_media_lanes_mask(self, api, appl, lport, subport): def clear_decomm_pending(self, lport): """ - Clear the decommission pending status for the entire physical port this logical port belongs to. + Clear the decommission pending status for this logical port. Args: lport: String, logical port name """ - self.decomm_pending_dict.pop(self.port_dict.get(lport, {}).get('index'), None) + physical_port_idx = self.port_dict.get(lport, {}).get('index') + if physical_port_idx not in self.decomm_pending_dict: + return + self.decomm_pending_dict[physical_port_idx].pop(lport, None) + # If there are no more logical ports pending decommission on this physical port, + # remove the physical port entry from the decomm_pending_dict + if not self.decomm_pending_dict[physical_port_idx]: + self.decomm_pending_dict.pop(physical_port_idx) - def set_decomm_pending(self, lport): + def set_decomm_pending(self, lport, api): """ - Set the decommission pending status. + Set the decommission pending status for this logical port to start decommissioning. + + Decommissioning can be done on a per-logical-port basis: for each logical port, + only decommission the minimal set of host lanes to allow the logical port to + be able to apply the new appl code without config errors. + + decomm_pending_dict stores the host lanes pending to be decommissioned for a logical port. + i.e. self.decomm_pending_dict[physical_port_idx][lport] = host_lanes_mask_requiring_decomm + host_lanes_mask_requiring_decomm of a logical port can be wider than the logical port itself. Args: lport: String, logical port name + api: + XcvrApi object + Returns: + Boolean, True if skip the rest of the processing of the current CMIS state """ - physical_port_idx = self.port_dict[lport]['index'] - if physical_port_idx in self.decomm_pending_dict: - # only one logical port can be the lead logical port doing the - # decommission state machine. - return - self.decomm_pending_dict[physical_port_idx] = lport - self.log_notice("{}: DECOMMISSION: setting decomm_pending for physical port " - "{}".format(lport, physical_port_idx)) + skip_rest_processing = False + + lport_host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] + host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask = self.get_host_lanes_mask_requiring_decomm(lport, api) + + # Check if other lports are doing decommission on the lanes overlapping with this lport + total_affected_host_lanes_mask = lport_host_lanes_mask | host_lanes_mask_requiring_decomm + if total_affected_host_lanes_mask & self.get_decomm_pending_host_lanes_mask(lport, exclude_lports=[lport]): + self.clear_decomm_pending(lport) - def is_decomm_lead_lport(self, lport): + if total_affected_host_lanes_mask & self.get_decomm_failed_host_lanes_mask(lport): + # Fail this lport if any of its host lanes are in decommissioning failed state + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + decomm_status_str = "failed" + else: + decomm_status_str = "waiting for completion" + + self.log_notice("{}: DECOMM: decommission initiated by other lports is still in progress on " + "host lanes {:#010b}, {}".format(lport, total_affected_host_lanes_mask, decomm_status_str)) + + skip_rest_processing = True + elif host_lanes_mask_requiring_decomm: + self.decomm_pending_dict.setdefault(self.port_dict[lport]['index'], {})[lport] = host_lanes_mask_requiring_decomm + self.log_notice("{}: DECOMM: setting decomm_pending for host lanes {:#010b}".format( + lport, host_lanes_mask_requiring_decomm)) + + self.port_dict[lport]['appl'] = 0 + self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask_requiring_decomm + self.port_dict[lport]['media_lanes_mask'] = corresponding_media_lanes_mask + self.log_notice("{}: DECOMM: setting appl={}, host_lanes_mask={:#010b}, media_lanes_mask={:#010b}".format( + lport, self.port_dict[lport]['appl'], host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask)) + + # Skip rest of the deinit/pre-init when this is the logical port doing decommission + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) + skip_rest_processing = True + # For the case of host_lanes_mask_requiring_decomm == 0x0 and this lport was previously marked as decomm pending: + elif self.is_decomm_pending(lport): + # No need to re-do decommissioning from scratch, just check config status to confirm completion + self.port_dict[lport]['appl'] = 0 + self.port_dict[lport]['host_lanes_mask'] = self.decomm_pending_dict[self.port_dict[lport]['index']][lport] + self.log_notice("{}: DECOMM: {}(decommission) was already done, confirming completion in {}(decommission)".format( + lport, CMIS_STATE_AP_CONF, CMIS_STATE_DP_INIT)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_INIT) + skip_rest_processing = True + + return skip_rest_processing + + def is_decomm_pending(self, lport): """ - Check if this is the lead logical port doing the decommission state machine. + Check if this logical port is in middle of decommissioning. Args: lport: @@ -733,63 +796,153 @@ def is_decomm_lead_lport(self, lport): Returns: Boolean, True if decommission pending, False otherwise """ - return self.decomm_pending_dict.get(self.port_dict[lport]['index']) == lport + return lport in self.decomm_pending_dict.get(self.port_dict.get(lport, {}).get('index'), {}) - def is_decomm_pending(self, lport): + def get_decomm_pending_host_lanes_mask(self, lport, exclude_lports=[]): """ - Get the decommission pending status for the physical port the given logical port belongs to. + Get the host lanes in decommission pending status for the entire + physical port the given logical port belongs to. Args: lport: String, logical port name + exclude_lports: + List of logical ports to exclude from the mask Returns: - Boolean, True if decommission pending, False otherwise + Integer, bitmask of host lanes that are decommission pending """ - return self.port_dict[lport]['index'] in self.decomm_pending_dict + physical_port_idx = self.port_dict[lport]['index'] + decomm_ports = self.decomm_pending_dict.get(physical_port_idx, {}) + if not decomm_ports: + return 0 - def is_decomm_failed(self, lport): + decomm_pending_mask = 0 + for logical_port, mask in decomm_ports.items(): + if logical_port in exclude_lports: + continue + decomm_pending_mask |= mask + + return decomm_pending_mask + + def get_decomm_failed_host_lanes_mask(self, lport): """ - Get the decommission failed status for the physical port the given logical port belongs to. + Get the host lanes in decommissioning failed state for the entire + physical port the given logical port belongs to. Args: lport: String, logical port name Returns: - Boolean, True if decommission failed, False otherwise + Integer, bitmask of host lanes in decommissioning failed state """ + failed_mask = 0 + physical_port_idx = self.port_dict[lport]['index'] - lead_logical_port = self.decomm_pending_dict.get(physical_port_idx) - if lead_logical_port is None: - return False - return ( - get_cmis_state_from_state_db( - lead_logical_port, - self.xcvr_table_helper.get_status_sw_tbl( - self.get_asic_id(lead_logical_port) - ) - ) - == CMIS_STATE_FAILED - ) + if physical_port_idx not in self.decomm_pending_dict: + return failed_mask + + for logical_port, mask in self.decomm_pending_dict[physical_port_idx].items(): + if get_cmis_state_from_state_db( + logical_port, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(logical_port)) + ) != CMIS_STATE_FAILED: + continue + failed_mask |= mask - def is_decommission_required(self, api, app_new): + return failed_mask + + def get_data_path_mask(self, app_advt, app, lane_idx): """ - Check if the CMIS decommission (i.e. reset appl code to 0 for all lanes - of the entire physical port) is required + Get the lanes mask for the entire data path based on the appl code on one of its host lanes. Args: + app_advt: The application advertisement dictionary + app: The application code + lane_idx: The index of the lane this appl code is assigned to + Returns: + Tuple of two integers: + - host lanes mask for the data path + - media lanes mask for the data path + """ + if app not in app_advt: + return 0, 0 + + host_lane_assignment_options = app_advt[app].get('host_lane_assignment_options') + host_lane_count = app_advt[app].get('host_lane_count') + media_lane_assignment_options = app_advt[app].get('media_lane_assignment_options') + media_lane_count = app_advt[app].get('media_lane_count') + + if not host_lane_assignment_options or not host_lane_count or \ + not media_lane_assignment_options or not media_lane_count: + return 0, 0 + + host_lane_mask = 1 << lane_idx + data_path_idx = 0 + + for start_lane_idx in range(self.CMIS_MAX_HOST_LANES): + if not (host_lane_assignment_options & (1 << start_lane_idx)): + continue + host_lanes_mask = ((1 << host_lane_count) - 1) << start_lane_idx + if host_lanes_mask & host_lane_mask: + media_start_lane_idx = data_path_idx * media_lane_count + return host_lanes_mask, ((1 << media_lane_count) - 1) << media_start_lane_idx + data_path_idx += 1 + + return 0, 0 + + def get_host_lanes_mask_requiring_decomm(self, lport, api): + """ + Get the minimal set of host lanes that require decommissioning to allow + the given logical port to apply its new appl code successfully. + + Args: + lport: + String, logical port name api: XcvrApi object - app_new: - Integer, the new desired appl code Returns: - True, if decommission is required - False, if decommission is not required + Tuple of two integers: + - host lanes mask requiring decommissioning + - media lanes mask corresponding to the host lanes """ - for lane in range(self.CMIS_MAX_HOST_LANES): - app_cur = api.get_application(lane) - if app_cur != 0 and app_cur != app_new: - return True - return False + lport_host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] + app_advt = api.get_application_advertisement() + active_app_dict = api.get_active_apsel_hostlane() + app_new = self.port_dict[lport]['appl'] + + # Look through Active Control Set to identify the configured data paths + # that share host lanes with this logical port and have conflicting appl + # codes + conflicting_data_paths_host_lanes_mask = 0 + conflicting_data_paths_media_lanes_mask = 0 + for lane_idx in range(self.CMIS_MAX_HOST_LANES): + lane_mask = 1 << lane_idx + if not (lane_mask & lport_host_lanes_mask): + continue # only look at the lanes that belong to this logical port + if lane_mask & conflicting_data_paths_host_lanes_mask: + continue # skip lanes that are already in the conflicting_data_paths mask + app_cur = self.get_active_application(active_app_dict, lane_idx) + if app_cur == 0 or app_cur == app_new: + continue + dp_host_lanes_mask, dp_media_lanes_mask = self.get_data_path_mask(app_advt, app_cur, lane_idx) + conflicting_data_paths_host_lanes_mask |= dp_host_lanes_mask + conflicting_data_paths_media_lanes_mask |= dp_media_lanes_mask + + # If conflicting_data_paths_host_lanes_mask is covered by current lport's mask, + # then new appl code can be applied directly without decommissioning + if not (conflicting_data_paths_host_lanes_mask & ~lport_host_lanes_mask): + host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask = (0, 0) + else: + host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask = ( + conflicting_data_paths_host_lanes_mask, conflicting_data_paths_media_lanes_mask + ) + + log_func = self.log_debug if not host_lanes_mask_requiring_decomm else self.log_notice + log_func("{}: DECOMM: based on ActiveAppSel(lane 8->1) {}, to apply appl {} on host lanes {:#010b}, " + "host lanes requiring decomm is {:#010b}, with media lanes {:#010b}".format( + lport, self.get_compact_reversed_active_apps_str(active_app_dict), app_new, lport_host_lanes_mask, + host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask)) + + return host_lanes_mask_requiring_decomm, corresponding_media_lanes_mask def is_cmis_application_update_required(self, api, app_new, host_lanes_mask): """ @@ -895,6 +1048,80 @@ def check_config_error(self, api, host_lanes_mask, states): return done + def get_active_application(self, active_app_dict, lane_idx): + """ + Get the application code in Active Control Set for a specific lane + + Args: + active_app_dict: + Dictionary containing active application information + lane_idx: + Index of the lane, 0-based + + Returns: + Integer, the active application code for the specified lane, or None if not found + """ + return active_app_dict.get(self.ACTIVEAPPSEL_LANE_KEY_PREFIX.format(lane_idx + 1)) + + def get_active_application_list(self, active_app_dict): + + """ + Get the application codes in Active Control Set for all lanes on this physical port + + Args: + active_app_dict: + Dictionary containing active application information + + Returns: + List, the active application codes for all lanes + """ + return [ + active_app_dict.get(self.ACTIVEAPPSEL_LANE_KEY_PREFIX.format(lane_idx + 1)) + for lane_idx in range(self.CMIS_MAX_HOST_LANES) + ] + + def get_compact_reversed_active_apps_str(self, active_app_dict): + """ + Get a compact string representation of the reversed active application list + e.g. "[3,3,3,3,3,3,3,3]" + + Args: + active_app_dict: + Dictionary containing active application information + + Returns: + String, the compact reversed active application list + """ + active_apps = self.get_active_application_list(active_app_dict) + reversed_apps = list(reversed(active_apps)) + return str(reversed_apps).replace(', ', ',') + + def check_active_application(self, lport, api): + """ + Check if the application code of this logical port is correctly applied at Active Control Set + + Args: + lport: + Logical port name + api: + XcvrApi object + + Returns: + Boolean, true if the application code is correctly set, otherwise false + """ + active_app_dict = api.get_active_apsel_hostlane() + expected_app = self.port_dict[lport]['appl'] + host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] + + for lane in range(self.CMIS_MAX_HOST_LANES): + if not ((1 << lane) & host_lanes_mask): + continue + if self.get_active_application(active_app_dict, lane) != expected_app: + self.log_notice("{}: expecting appl {} on lanes {:#010b}, but ActiveAppSel(lane 8->1) is {}".format( + lport, expected_app, host_lanes_mask, self.get_compact_reversed_active_apps_str(active_app_dict))) + return False + return True + def check_datapath_init_pending(self, api, host_lanes_mask): """ Check if the CMIS datapath init is pending @@ -1211,7 +1438,7 @@ def task_worker(self): host_lanes_mask = self.port_dict[lport].get('host_lanes_mask', 0) appl = self.port_dict[lport].get('appl', 0) # appl can be 0 if this lport is in decommission state machine, which should not be considered as failed case. - if state != CMIS_STATE_INSERTED and not self.is_decomm_lead_lport(lport) and (host_lanes_mask <= 0 or appl < 1): + if state != CMIS_STATE_INSERTED and not self.is_decomm_pending(lport) and (host_lanes_mask <= 0 or appl < 1): self.log_error("{}: Unexpected value for host_lanes_mask {} or appl {} in " "{} state".format(lport, host_lanes_mask, appl, state)) self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) @@ -1219,8 +1446,7 @@ def task_worker(self): self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}{}, Module state={}, DP state={}, appl {} host_lane_count {} " "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, - "(decommission" + ("*" if self.is_decomm_lead_lport(lport) else "") + ")" - if self.is_decomm_pending(lport) else "", + "(decommission)" if self.is_decomm_pending(lport) else "", api.get_module_state(), api.get_datapath_state(), appl, host_lane_count, retries)) if retries > self.CMIS_MAX_RETRIES: self.log_error("{}: FAILED".format(lport)) @@ -1264,27 +1490,7 @@ def task_worker(self): media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] self.log_notice("{}: Setting media_lanemask=0x{:x}".format(lport, media_lanes_mask)) - if self.is_decommission_required(api, appl): - self.set_decomm_pending(lport) - - if self.is_decomm_lead_lport(lport): - # Set all the DP lanes AppSel to unused(0) when non default app code needs to be configured - self.port_dict[lport]['appl'] = appl = 0 - self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask = self.ALL_LANES_MASK - self.port_dict[lport]['media_lanes_mask'] = self.ALL_LANES_MASK - self.log_notice("{}: DECOMMISSION: setting appl={} and " - "host_lanes_mask/media_lanes_mask={:#x}".format(lport, appl, self.ALL_LANES_MASK)) - # Skip rest of the deinit/pre-init when this is the lead logical port for decommission - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) - continue - elif self.is_decomm_pending(lport): - if self.is_decomm_failed(lport): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - decomm_status_str = "failed" - else: - decomm_status_str = "waiting for completion" - self.log_notice("{}: DECOMMISSION: decommission has already started for this physical port, " - "{}".format(lport, decomm_status_str)) + if self.set_decomm_pending(lport, api): continue if self.port_dict[lport]['host_tx_ready'] != 'true' or \ @@ -1442,11 +1648,16 @@ def task_worker(self): self.force_cmis_reinit(lport, retries + 1) continue + if not self.check_active_application(lport, api): + self.log_notice("{}: module failed to correctly update appl code in Active Control Set".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + # Clear decommission status and invoke CMIS reinit so that normal CMIS initialization can begin if self.is_decomm_pending(lport): - self.log_notice("{}: DECOMMISSION: done for physical port {}".format(lport, self.port_dict[lport]['index'])) + self.log_notice("{}: DECOMM: decommission done for host lanes {:#010b}".format(lport, self.port_dict[lport]['host_lanes_mask'])) self.clear_decomm_pending(lport) - self.force_cmis_reinit(lport) + self.force_cmis_reinit(lport, retries) continue if hasattr(api, 'get_cmis_rev'): diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py index c2002bc4d..dc241bda5 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_event_helper.py @@ -1,4 +1,5 @@ from natsort import natsorted +from collections import deque from sonic_py_common import daemon_base from sonic_py_common import multi_asic from swsscommon import swsscommon @@ -159,52 +160,64 @@ def handle_port_update_event(self): fvp['asic_id'] = self.asic_context[port_tbl] fvp['op'] = op fvp['FILTER'] = port_tbl.filter - # Soak duplicate events and consider only the last event - port_event_cache[(port_name, port_tbl.db_name, port_tbl.table_name)] = fvp + # Soak duplicate events and consider only the latest event(s) + # Cache up to 2 most recent events to avoid dropping DEL event followed by SET event, + # since applications may have cleanup actions associated with DEL events. + port_event_cache.setdefault((port_name, port_tbl.db_name, port_tbl.table_name), deque(maxlen=2)).append(fvp) # Now apply filter over soaked events - for key, fvp in port_event_cache.items(): + for key, fvps in port_event_cache.items(): db_name = key[1] table_name = key[2] - port_index = int(fvp['index']) - port_change_event = None - filter = fvp['FILTER'] - del fvp['FILTER'] - self.apply_filter_to_fvp(filter, fvp) - - if key in self.port_event_cache: - # Compare current event with last event on this key, to see if - # there's really a need to update. - diff = set(fvp.items()) - set(self.port_event_cache[key].items()) - # Ignore duplicate events - if not diff: - self.port_event_cache[key] = fvp - continue - # Update the latest event to the cache - self.port_event_cache[key] = fvp - - if fvp['op'] == swsscommon.SET_COMMAND: - port_change_event = PortChangeEvent(fvp['port_name'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_SET, - fvp, - db_name, - table_name) - elif fvp['op'] == swsscommon.DEL_COMMAND: - port_change_event = PortChangeEvent(fvp['port_name'], - port_index, - fvp['asic_id'], - PortChangeEvent.PORT_DEL, - fvp, - db_name, - table_name) - # This is the final event considered for processing - self.logger.log_notice("*** {} handle_port_update_event() fvp {}".format( - key, fvp)) - if port_change_event is not None: - has_event = True - self.port_change_event_handler(port_change_event) + + # If last two events are: + # 1) DEL and SET: then both needs to be processed to not miss the DEL event + # 2) SET and DEL: then only last event (i.e. DEL) needs to be processed + # 3) SET and SET: then only last SET event needs to be processed + # 4) DEL and DEL: then only last DEL event needs to be processed + if len(fvps) == 2 and not (fvps[0]['op'] == swsscommon.DEL_COMMAND and fvps[1]['op'] == swsscommon.SET_COMMAND): + fvps.popleft() + + for fvp in fvps: + port_index = int(fvp['index']) + port_change_event = None + filter = fvp['FILTER'] + del fvp['FILTER'] + self.apply_filter_to_fvp(filter, fvp) + + if key in self.port_event_cache: + # Compare current event with last event on this key, to see if + # there's really a need to update. + diff = set(fvp.items()) - set(self.port_event_cache[key].items()) + # Ignore duplicate events + if not diff: + self.port_event_cache[key] = fvp + continue + # Update the latest event to the cache + self.port_event_cache[key] = fvp + + if fvp['op'] == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_SET, + fvp, + db_name, + table_name) + elif fvp['op'] == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(fvp['port_name'], + port_index, + fvp['asic_id'], + PortChangeEvent.PORT_DEL, + fvp, + db_name, + table_name) + # This is the final event considered for processing + self.logger.log_notice("*** {} handle_port_update_event() fvp {}".format( + key, fvp)) + if port_change_event is not None: + has_event = True + self.port_change_event_handler(port_change_event) return has_event