diff --git a/ansible/group_vars/lab/secrets.yml b/ansible/group_vars/lab/secrets.yml index 98b43b5ea0b..5d22c13d82f 100644 --- a/ansible/group_vars/lab/secrets.yml +++ b/ansible/group_vars/lab/secrets.yml @@ -3,6 +3,8 @@ ansible_become_pass: password sonicadmin_user: admin sonicadmin_password: password sonicadmin_initial_password: password +sonic_bmc_root_user: root +sonic_bmc_root_password: 0penBmcTempPass! console_login: console_telnet: diff --git a/tests/common/helpers/firmware_helper.py b/tests/common/helpers/firmware_helper.py new file mode 100644 index 00000000000..2887d2e7769 --- /dev/null +++ b/tests/common/helpers/firmware_helper.py @@ -0,0 +1,28 @@ +import re + + +def show_firmware(duthost): + out = duthost.command("fwutil show status") + num_spaces = 2 + curr_chassis = "" + output_data = {"chassis": {}} + status_output = out['stdout'] + separators = re.split(r'\s{2,}', status_output.splitlines()[1]) # get separators + output_lines = status_output.splitlines()[2:] + + for line in output_lines: + data = [] + start = 0 + + for sep in separators: + curr_len = len(sep) + data.append(line[start:start+curr_len].strip()) + start += curr_len + num_spaces + + if data[0].strip() != "": + curr_chassis = data[0].strip() + output_data["chassis"][curr_chassis] = {"component": {}} + + output_data["chassis"][curr_chassis]["component"][data[2]] = data[3] + + return output_data diff --git a/tests/common/helpers/platform_api/bmc.py b/tests/common/helpers/platform_api/bmc.py new file mode 100644 index 00000000000..21a849f59ca --- /dev/null +++ b/tests/common/helpers/platform_api/bmc.py @@ -0,0 +1,83 @@ +""" This module provides interface to interact with the BMC of the DUT via platform API remotely """ +import ast +import json +import logging + +logger = logging.getLogger(__name__) + + +def bmc_pmon_api(conn, name, args=None): + if args is None: + args = [] + conn.request('POST', '/platform/chassis/bmc/{}'.format(name), json.dumps({'args': args})) + resp = conn.getresponse() + res = json.loads(resp.read())['res'] + logger.info('Executing chassis API: "{}", arguments: "{}", result: "{}"'.format(name, args, res)) + return res + + +def bmc_host_api(duthost, api_name, *args): + bmc_instance = 'sudo python -c "import sonic_platform; \ + bmc = sonic_platform.platform.Platform().get_chassis().get_bmc(); \ + print(bmc.{})"' + res = duthost.shell(bmc_instance.format(api_name + str(args)))['stdout'] + try: + return ast.literal_eval(res) + except (ValueError, SyntaxError): + return res.strip() + + +def get_name(conn): + return bmc_pmon_api(conn, 'get_name') + + +def get_presence(conn): + return bmc_pmon_api(conn, 'get_presence') + + +def get_model(duthost): + return bmc_host_api(duthost, 'get_model') + + +def get_serial(duthost): + return bmc_host_api(duthost, 'get_serial') + + +def get_revision(conn): + return bmc_pmon_api(conn, 'get_revision') + + +def get_status(conn): + return bmc_pmon_api(conn, 'get_status') + + +def is_replaceable(conn): + return bmc_pmon_api(conn, 'is_replaceable') + + +def get_eeprom(duthost): + return bmc_host_api(duthost, 'get_eeprom') + + +def get_version(duthost): + return bmc_host_api(duthost, 'get_version') + + +def reset_root_password(duthost): + return bmc_host_api(duthost, 'reset_root_password') + + +def trigger_bmc_debug_log_dump(duthost): + return bmc_host_api(duthost, 'trigger_bmc_debug_log_dump') + + +def get_bmc_debug_log_dump(duthost, task_id, filename, path): + return bmc_host_api(duthost, 'get_bmc_debug_log_dump', task_id, filename, path) + + +def update_firmware(duthost, fw_image): + return bmc_host_api(duthost, 'update_firmware', fw_image) + + +def request_bmc_reset(duthost): + return bmc_host_api(duthost, '_request_bmc_reset') diff --git a/tests/common/plugins/conditional_mark/tests_mark_conditions.yaml b/tests/common/plugins/conditional_mark/tests_mark_conditions.yaml index c5956e5e911..80a28e0a257 100644 --- a/tests/common/plugins/conditional_mark/tests_mark_conditions.yaml +++ b/tests/common/plugins/conditional_mark/tests_mark_conditions.yaml @@ -3638,6 +3638,12 @@ show_techsupport/test_auto_techsupport.py::TestAutoTechSupport::test_sai_sdk_dum - "asic_type not in ['mellanox']" - "is_multi_asic==True" +show_techsupport/test_techsupport.py::test_techsupport: + xfail: + reason: "ipv6 mgmt ip is not supported" + conditions: + - "is_mgmt_ipv6_only==True" + ####################################### ##### snappi_tests ##### ####################################### diff --git a/tests/platform_tests/api/test_bmc.py b/tests/platform_tests/api/test_bmc.py new file mode 100644 index 00000000000..a64facd0377 --- /dev/null +++ b/tests/platform_tests/api/test_bmc.py @@ -0,0 +1,293 @@ +import os +import logging +import json +import pytest +import random +import secrets +import time +from urllib.parse import urlparse +from datetime import datetime + +from tests.common.helpers.assertions import pytest_assert +from tests.common.helpers.platform_api import bmc +from tests.common.platform.device_utils import platform_api_conn, start_platform_api_service # noqa: F401 +from .platform_api_test_base import PlatformApiTestBase +from tests.common.helpers.firmware_helper import show_firmware + + +logger = logging.getLogger(__name__) + +pytestmark = [ + pytest.mark.disable_loganalyzer, # disable automatic loganalyzer + pytest.mark.topology('any') +] + +BMC_SHORTEST_PASSWD_LEN = 12 +BMC_LONGEST_PASSWD_LEN = 20 +BMC_DUMP_FILENAME = "bmc_dump_{}.tar.xz" +BMC_DUMP_PATH = "/tmp" +LATEST_BMC_VERSION_IDX = 0 +OLD_BMC_VERSION_IDX = 1 +EROT_BUSY_MSG = "ERoT is busy" +WAIT_TIME = 30 + + +@pytest.fixture(scope="function", autouse=True) +def is_bmc_present(platform_api_conn): # noqa: F811 + if not bmc.get_presence(platform_api_conn): # noqa: F811 + pytest.skip("BMC is not present, skipping BMC platform API tests") + + +@pytest.fixture(scope="module") +def bmc_ip(duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + platform = duthost.shell("sudo show platform summary | grep Platform | awk '{print $2}'")["stdout"] + bmc_config_file = f"/usr/share/sonic/device/{platform}/bmc.json" + duthost.fetch(src=bmc_config_file, dest='/tmp') + with open(f'/tmp/{duthost.hostname}/{bmc_config_file}', "r") as f: + bmc_config = json.load(f) + yield bmc_config["bmc_addr"] + + +class TestBMCApi(PlatformApiTestBase): + """Platform and Host API test cases for the BMC class""" + + @pytest.fixture(autouse=True) + def prepare_param(self, creds): + self.bmc_root_user = creds['sonic_bmc_root_user'] + self.bmc_root_password = creds['sonic_bmc_root_password'] + + @pytest.fixture(scope="module") + def _update_bmc_firmware_by_api(self, duthost, fw_image, timeout=600): + start_time = time.time() + + while True: + if time.time() - start_time > timeout: + logger.warning(f"Timeout after {timeout} seconds while updating BMC firmware") + return False + + time.sleep(WAIT_TIME) + ret_code, message = bmc.update_firmware(duthost, fw_image) + if EROT_BUSY_MSG in message: + logger.info(f"{EROT_BUSY_MSG}, waiting for {WAIT_TIME} seconds") + continue + elif ret_code != 0: + logger.warning(f"Failed to update BMC firmware: return code: {ret_code}, message: {message}") + return False + else: + logger.info("BMC firmware updated successfully!") + break + + bmc.request_bmc_reset(duthost) + return True + + def _generate_password(self): + password_length = random.choice(range(BMC_SHORTEST_PASSWD_LEN, BMC_LONGEST_PASSWD_LEN)) + logger.info(f"Generated password length: {password_length}") + raw_password = secrets.token_urlsafe(64) + password = raw_password[:password_length] + logger.info(f"Generated password: {password}") + return password + + def _string_to_dict(self, str): + result = {} + for line in str.strip().split('\n'): + if ':' in line: + key, value = line.split(':', 1) + result[key.strip()] = value.strip() + return result + + def _validate_bmc_login(self, duthost, bmc_ip, password, expected_success=True): + res = duthost.command(f"curl -k -u {self.bmc_root_user}:{password} -X " # noqa: E231 + f"GET https://{bmc_ip}/redfish/v1/AccountService/Accounts")["stdout"] # noqa: E231 + pytest_assert(res is not None, "Failed to login to BMC") + if expected_success: + pytest_assert('error' not in res, f"Failed to login to BMC with password: {password}") + else: + pytest_assert('error' in res, f"Successfully login to BMC with password: {password}") + + def _change_bmc_root_password(self, duthost, bmc_ip, password): + res = duthost.command(f'curl -k -u {self.bmc_root_user}:{self.bmc_root_password} -X PATCH ' # noqa: E231 + f'https://{bmc_ip}/redfish/v1/AccountService/Accounts/root ' # noqa: E231 + f'-H "Content-Type: application/json" ' # noqa: E231 + f'-d \'{{"Password":"{password}"}}\'')["stdout"] # noqa: E231 + pytest_assert(res is not None, f"Failed to change BMC root password to {password}") + pytest_assert('error' not in res, + f"Failed to change BMC root password to {password} with error response: {res}") + + def _validate_bmc_dump_finished(self, duthost, task_id, timestamp): + ret, msg = bmc.get_bmc_debug_log_dump(duthost, task_id, BMC_DUMP_FILENAME.format(timestamp), BMC_DUMP_PATH) + if ret == 0 and msg == '': + logger.info("BMC dump finished!") + return True + logger.info(f"Failed to retrieve BMC dump: {msg}") + return False + + def _get_bmc_version(self, duthost, timeout=120): + start_time = time.time() + + while True: + if time.time() - start_time > timeout: + logger.warning(f"Timeout after {timeout} seconds while getting BMC version") + return + + res = duthost.show_and_parse('sudo show platform firmware status') + for entry in res: + if entry['component'] == 'BMC': + if entry['version'] == 'N/A': + continue + return entry['version'] + + def test_get_name(self, platform_api_conn): # noqa: F811 + name = bmc.get_name(platform_api_conn) + pytest_assert(name is not None, "Unable to retrieve BMC name") + pytest_assert(isinstance(name, str), f"BMC name type appears incorrect: {type(name)}") + pytest_assert(name == 'BMC', f"BMC name appears incorrect: {name}") + + def test_get_presence(self, platform_api_conn): # noqa: F811 + presence = bmc.get_presence(platform_api_conn) + pytest_assert(presence is not None, "Unable to retrieve BMC presence") + pytest_assert(isinstance(presence, bool), f"BMC presence appears incorrect: {type(presence)}") + pytest_assert(presence is True, f"BMC is not present: {presence}") + + def test_get_model(self, duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + model = bmc.get_model(duthost) + bmc_eeprom_info = duthost.command("sudo show platform bmc eeprom")["stdout"] + pytest_assert(model is not None, "Unable to retrieve BMC model") + pytest_assert(model in bmc_eeprom_info, f"BMC model appears incorrect: {model}") + + def test_get_serial(self, duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + serial = bmc.get_serial(duthost) + bmc_eeprom_info = duthost.command("sudo show platform bmc summary")["stdout"] + pytest_assert(serial is not None, "Unable to retrieve BMC serial number") + pytest_assert(str(serial) in bmc_eeprom_info, f"BMC serial number appears incorrect: {serial}") + + def test_get_revision(self, platform_api_conn): # noqa: F811 + revision = bmc.get_revision(platform_api_conn) + pytest_assert(revision is not None, "Unable to retrieve BMC revision") + pytest_assert(revision == 'N/A', f"BMC revision appears incorrect: {revision}") + + def test_get_status(self, platform_api_conn): # noqa: F811 + status = bmc.get_status(platform_api_conn) + pytest_assert(status is not None, "Unable to retrieve BMC status") + pytest_assert(isinstance(status, bool), f"BMC status appears incorrect: {type(status)}") + pytest_assert(status is True, f"BMC status appears incorrect: {status}") + + def test_is_replaceable(self, platform_api_conn): # noqa: F811 + replaceable = bmc.is_replaceable(platform_api_conn) + pytest_assert(replaceable is not None, "Unable to retrieve BMC is_replaceable") + pytest_assert(isinstance(replaceable, bool), f"BMC replaceable value must be a bool value: {type(replaceable)}") + pytest_assert(replaceable is False, f"BMC replaceable value appears incorrect: {replaceable}") + + def test_get_eeprom(self, duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + eeprom = bmc.get_eeprom(duthost) + bmc_eeprom_info = self._string_to_dict(duthost.command("sudo show platform bmc eeprom")["stdout"]) + pytest_assert(eeprom is not None, f"Failed to retrieve system EEPROM: {eeprom}") + pytest_assert(isinstance(eeprom, dict), f"BMC eeprom value must be a dict value: {type(eeprom)}") + + for key, value in bmc_eeprom_info.items(): + pytest_assert(key in eeprom, f"BMC eeprom {key} appears incorrect") + pytest_assert(eeprom[key] == value, f"BMC eeprom {key} appears incorrect") + + def test_get_version(self, duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + version = bmc.get_version(duthost) + bmc_summary = duthost.command("sudo show platform bmc summary")["stdout"] + pytest_assert(version is not None, f"Unable to retrieve BMC version: {version}") + pytest_assert(version in bmc_summary, f"BMC version appears incorrect: {version}") + + def test_reset_root_password(self, duthosts, enum_rand_one_per_hwsku_hostname, bmc_ip): + """ + Test BMC root password reset with platform API + + Steps: + 1. Reset the BMC root password by BMC platform api reset_root_password + 2. Validate the root password had been reset to the default password by login test using Redfish api + 3. Change the root password to a new value by using Redfish api + 4. Validate login password had been changed by login test using Redfish api + 5. Reset the BMC root password by BMC platform api reset_root_password() + 6. Validate the root password had been reset to the default password by login test using Redfish api + """ + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + + bmc.reset_root_password(duthost) + self._validate_bmc_login(duthost, bmc_ip, self.bmc_root_password) + temp_password = self._generate_password() + self._change_bmc_root_password(duthost, bmc_ip, temp_password) + self._validate_bmc_login(duthost, bmc_ip, temp_password) + bmc.reset_root_password(duthost) + self._validate_bmc_login(duthost, bmc_ip, self.bmc_root_password) + + def test_bmc_dump(self, duthosts, enum_rand_one_per_hwsku_hostname): + """ + Test BMC dump with API + + Steps: + 1. Trigger the BMC dump by BMC api trigger_bmc_debug_log_dump() + 2. During waiting, check the dump process by BMC api get_bmc_debug_log_dump(task_id, filename, path) + 3. After BMC dump finished, validate the BMC dump file existence + """ + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + bmc_dump_path = BMC_DUMP_PATH + '/' + BMC_DUMP_FILENAME.format(timestamp) + ret_code, (task_id, err_msg) = bmc.trigger_bmc_debug_log_dump(duthost) + pytest_assert(ret_code == 0, f"Failed to retrieve BMC dump: {err_msg}") + logger.info(f"BMC dump task id: {task_id}") + pytest_assert(self._validate_bmc_dump_finished(duthost, task_id, timestamp), "BMC dump failed") + pytest_assert(duthost.command( + f"ls -l {bmc_dump_path}")["rc"] == 0, f"BMC dump file not found: {bmc_dump_path}") + + def test_bmc_firmware_update(self, duthosts, enum_rand_one_per_hwsku_hostname, fw_pkg): + """ + Test BMC firmware update with platform API and CLI + + Steps: + 1. Check and record the original BMC firmware version + 2. Update the BMC firmware version by command + 'config platform firmware install chassis component BMC fw -y xxx' + 3. Wait after the installation done + 4. Validate the BMC firmware had been updated to the destination version by command + 'show platform firmware status' + 5. Recover the BMC firmware version to the original one by BMC platform api update_firmware(fw_image) + 6. Wait after the installation done + 7. Validate the BMC firmware had been restored to the original version by command + 'show platform firmware status' + """ + duthost = duthosts[enum_rand_one_per_hwsku_hostname] + bmc_version_origin = self._get_bmc_version(duthost) + logger.info(f"BMC version origin: {bmc_version_origin}") + + chassis = list(show_firmware(duthost)["chassis"].keys())[0] + logger.info(f"Chassis: {chassis}") + fw_pkg_path_new = fw_pkg["chassis"][chassis]["component"]["BMC"][LATEST_BMC_VERSION_IDX]["firmware"] + fw_pkg_clean_path_new = urlparse(fw_pkg_path_new).path + fw_pkt_name_new = os.path.basename(fw_pkg_path_new) + logger.info(f"BMC firmware path: {fw_pkg_clean_path_new}") + logger.info(f"Copy BMC firmware to localhost: /tmp/{fw_pkt_name_new}") + duthost.copy(src=fw_pkg_clean_path_new, dest=f"/tmp/{fw_pkt_name_new}") + + logger.info(f"Execute BMC firmware update to {fw_pkt_name_new} and Wait for BMC firmware update to complete") + res = duthost.command( + f"sudo config platform firmware install chassis component BMC fw -y /tmp/{fw_pkt_name_new}") + + bmc_version_latest = self._get_bmc_version(duthost) + logger.info(f"BMC version after update: {bmc_version_latest}") + pytest_assert(bmc_version_latest != bmc_version_origin, "BMC firmware update failed") + + fw_pkg_path_old = fw_pkg["chassis"][chassis]["component"]["BMC"][OLD_BMC_VERSION_IDX]["firmware"] + fw_pkg_clean_path_old = urlparse(fw_pkg_path_old).path + fw_pkt_name_old = os.path.basename(fw_pkg_path_old) + logger.info(f"BMC firmware path: {fw_pkg_clean_path_old}") + logger.info(f"Copy BMC firmware to localhost: /tmp/{fw_pkt_name_old}") + duthost.copy(src=fw_pkg_clean_path_old, dest=f"/tmp/{fw_pkt_name_old}") + + logger.info(f"Execute BMC firmware update to {fw_pkt_name_old} and Wait for BMC firmware update to complete") + res = self._update_bmc_firmware_by_api(duthost, f"/tmp/{fw_pkt_name_old}") + pytest_assert(res, "Failed to execute BMC firmware update by API!") + + bmc_version_current = self._get_bmc_version(duthost) + logger.info(f"BMC version after update: {bmc_version_current}") + pytest_assert(bmc_version_latest != bmc_version_current, "BMC firmware recovery failed") diff --git a/tests/platform_tests/conftest.py b/tests/platform_tests/conftest.py index 0e48806b366..cb0b1101a42 100644 --- a/tests/platform_tests/conftest.py +++ b/tests/platform_tests/conftest.py @@ -1,3 +1,4 @@ +import tarfile import json import pytest import os @@ -148,6 +149,10 @@ def thermal_manager_enabled(duthosts, enum_rand_one_per_hwsku_hostname): def pytest_generate_tests(metafunc): + val = metafunc.config.getoption('--fw-pkg') + if 'fw_pkg_name' in metafunc.fixturenames and val: + metafunc.parametrize('fw_pkg_name', val.split(','), scope="module") + if 'power_off_delay' in metafunc.fixturenames: delays = metafunc.config.getoption('power_off_delay') default_delay_list = [5, 15] @@ -231,3 +236,34 @@ def cmis_cable_ports_and_ver(duthosts): cmis_cable_ports_and_ver.update({dut.hostname: get_cmis_cable_ports_and_ver(dut)}) logging.info(f"cmis_cable_ports_and_ver: {cmis_cable_ports_and_ver}") return cmis_cable_ports_and_ver + + +@pytest.fixture(scope='module') +def fw_pkg(fw_pkg_name): + if fw_pkg_name is None: + pytest.skip("No fw package specified.") + + yield extract_fw_data(fw_pkg_name) + + +def extract_fw_data(fw_pkg_path): + """ + Extract fw data from updated-fw.tar.gz file or firmware.json file + :param fw_pkg_path: the path to tar.gz file or firmware.json file + :return: fw_data in dictionary + """ + if tarfile.is_tarfile(fw_pkg_path): + path = "/tmp/firmware" + isExist = os.path.exists(path) + if not isExist: + os.mkdir(path) + with tarfile.open(fw_pkg_path, "r:gz") as f: + f.extractall(path) + json_file = os.path.join(path, "firmware.json") + with open(json_file, 'r') as fw: + fw_data = json.load(fw) + else: + with open(fw_pkg_path, 'r') as fw: + fw_data = json.load(fw) + + return fw_data diff --git a/tests/platform_tests/fwutil/conftest.py b/tests/platform_tests/fwutil/conftest.py index 16a24321c55..69afa492c62 100644 --- a/tests/platform_tests/fwutil/conftest.py +++ b/tests/platform_tests/fwutil/conftest.py @@ -1,9 +1,7 @@ -import tarfile -import json import pytest import logging import os -from fwutil_common import show_firmware +from tests.common.helpers.firmware_helper import show_firmware logger = logging.getLogger(__name__) @@ -43,48 +41,23 @@ def check_path_exists(duthost, path): return duthost.stat(path=path)["stat"]["exists"] -def pytest_generate_tests(metafunc): - val = metafunc.config.getoption('--fw-pkg') - if 'fw_pkg_name' in metafunc.fixturenames: - metafunc.parametrize('fw_pkg_name', [val], scope="module") - - -@pytest.fixture(scope='module') -def fw_pkg(fw_pkg_name): - if fw_pkg_name is None: - pytest.skip("No fw package specified.") - - yield extract_fw_data(fw_pkg_name) - - -def extract_fw_data(fw_pkg_path): - """ - Extract fw data from updated-fw.tar.gz file or firmware.json file - :param fw_pkg_path: the path to tar.gz file or firmware.json file - :return: fw_data in dictionary - """ - if tarfile.is_tarfile(fw_pkg_path): - path = "/tmp/firmware" - isExist = os.path.exists(path) - if not isExist: - os.mkdir(path) - with tarfile.open(fw_pkg_path, "r:gz") as f: - f.extractall(path) - json_file = os.path.join(path, "firmware.json") - with open(json_file, 'r') as fw: - fw_data = json.load(fw) - else: - with open(fw_pkg_path, 'r') as fw: - fw_data = json.load(fw) - - return fw_data - - @pytest.fixture(scope='function', params=["CPLD", "ONIE", "BIOS", "FPGA"]) def component(request, duthost, fw_pkg): component_type = request.param chassis = list(show_firmware(duthost)["chassis"].keys())[0] available_components = list(fw_pkg["chassis"].get(chassis, {}).get("component", {}).keys()) + cpld_components = [com for com in available_components if "CPLD" in com] + # if in the host section, the CPLD defined is different, then need to use the one defined for this host. + # For example: if the CPLD2 is defined for the SN3700c, and CPLD1 defined for the r-anaconda-15, then when run + # test for the r-anaconda-15, it will take the CPLD1 instead of CPLD2 as on of the component + if "host" in fw_pkg and duthost.hostname in fw_pkg["host"]: + host_components = list(fw_pkg["host"].get(duthost.hostname, {}).get("component", []).keys()) + cpld_host_components = [com for com in host_components if "CPLD" in com] + if cpld_host_components: + available_components = list(set(available_components) ^ set(cpld_components) | set(host_components)) + else: + available_components = list(set(available_components) | set(host_components)) + if len(available_components) > 0: for component in available_components: if component_type in component: diff --git a/tests/platform_tests/fwutil/fwutil_common.py b/tests/platform_tests/fwutil/fwutil_common.py index 33de637fbeb..613063ed48a 100644 --- a/tests/platform_tests/fwutil/fwutil_common.py +++ b/tests/platform_tests/fwutil/fwutil_common.py @@ -4,12 +4,12 @@ import json import logging import allure -import re from copy import deepcopy from tests.common.utilities import wait_until from tests.common.reboot import SONIC_SSH_REGEX +from tests.common.helpers.firmware_helper import show_firmware logger = logging.getLogger(__name__) @@ -128,33 +128,6 @@ def complete_install(duthost, localhost, boot_type, res, pdu_ctrl, component, au time.sleep(60) -def show_firmware(duthost): - out = duthost.command("fwutil show status") - num_spaces = 2 - curr_chassis = "" - output_data = {"chassis": {}} - status_output = out['stdout'] - separators = re.split(r'\s{2,}', status_output.splitlines()[1]) # get separators - output_lines = status_output.splitlines()[2:] - - for line in output_lines: - data = [] - start = 0 - - for sep in separators: - curr_len = len(sep) - data.append(line[start:start+curr_len].strip()) - start += curr_len + num_spaces - - if data[0].strip() != "": - curr_chassis = data[0].strip() - output_data["chassis"][curr_chassis] = {"component": {}} - - output_data["chassis"][curr_chassis]["component"][data[2]] = data[3] - - return output_data - - def get_install_paths(request, duthost, defined_fw, versions, chassis, target_component): component = get_defined_components(duthost, defined_fw, chassis) ver = versions["chassis"].get(chassis, {})["component"] diff --git a/tests/show_techsupport/test_techsupport.py b/tests/show_techsupport/test_techsupport.py index fd5cb6e6e35..fd6082d2690 100644 --- a/tests/show_techsupport/test_techsupport.py +++ b/tests/show_techsupport/test_techsupport.py @@ -9,7 +9,9 @@ from random import randint from collections import defaultdict from tests.common.helpers.assertions import pytest_assert, pytest_require -from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer, LogAnalyzerError +from tests.common.helpers.platform_api import bmc +from tests.common.platform.device_utils import platform_api_conn, start_platform_api_service # noqa: F401 +from tests.common.plugins.loganalyzer.loganalyzer import LogAnalyzer, LogAnalyzerError, get_bughandler_instance from tests.common.utilities import wait_until, check_msg_in_syslog from log_messages import LOG_EXPECT_ACL_RULE_CREATE_RE, LOG_EXPECT_ACL_RULE_REMOVE_RE, LOG_EXCEPT_MIRROR_SESSION_REMOVE from pkg_resources import parse_version @@ -84,23 +86,23 @@ def setup_acl_rules(duthost, acl_setup): @pytest.fixture(scope='module') -def skip_on_dpu(duthosts, enum_rand_one_per_hwsku_frontend_hostname): +def skip_on_dpu(duthosts, enum_rand_one_per_hwsku_hostname): """ When dut is dpu, skip the case """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] if duthost.dut_basic_facts()['ansible_facts']['dut_basic_facts'].get("is_dpu"): pytest.skip("Skip the test, as it is not supported on DPU.") @pytest.fixture(scope='function') -def acl_setup(duthosts, enum_rand_one_per_hwsku_frontend_hostname): +def acl_setup(duthosts, enum_rand_one_per_hwsku_hostname): """ setup fixture gathers all test required information from DUT facts and testbed :param duthost: DUT host object :return: dictionary with all test required information """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] logger.info('Creating temporary folder for test {}'.format(ACL_RUN_DIR)) duthost.command("mkdir -p {}".format(ACL_RUN_DIR)) tmp_path = duthost.tempfile(path=ACL_RUN_DIR, state='directory', prefix='acl', suffix="")['path'] @@ -128,18 +130,19 @@ def teardown_acl(dut, acl_setup): @pytest.fixture(scope='function') -def acl(duthosts, enum_rand_one_per_hwsku_frontend_hostname, acl_setup, request): +def acl(duthosts, enum_rand_one_per_hwsku_hostname, acl_setup, request): """ setup/teardown ACL rules based on test class requirements :param duthost: DUT host object :param acl_setup: setup information :return: """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] acl_facts = duthost.acl_facts()["ansible_facts"]["ansible_acl_facts"] pytest_require(ACL_TABLE_NAME in acl_facts, "{} acl table not exists") - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl', request=request) + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl', request=request, + bughandler=get_bughandler_instance({"type": "default"})) loganalyzer.load_common_config() try: @@ -163,8 +166,8 @@ def acl(duthosts, enum_rand_one_per_hwsku_frontend_hostname, acl_setup, request) # MIRRORING PART # @pytest.fixture(scope='function') -def neighbor_ip(duthosts, enum_rand_one_per_hwsku_frontend_hostname, tbinfo): - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] +def neighbor_ip(duthosts, enum_rand_one_per_hwsku_hostname, tbinfo): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] # ptf-32 topo is not supported in mirroring if tbinfo['topo']['name'] == 'ptf32': pytest.skip('Unsupported Topology') @@ -184,11 +187,11 @@ def neighbor_ip(duthosts, enum_rand_one_per_hwsku_frontend_hostname, tbinfo): @pytest.fixture(scope='function') -def mirror_setup(duthosts, enum_rand_one_per_hwsku_frontend_hostname): +def mirror_setup(duthosts, enum_rand_one_per_hwsku_hostname): """ setup fixture """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] duthost.command('mkdir -p {}'.format(MIRROR_RUN_DIR)) tmp_path = duthost.tempfile(path=MIRROR_RUN_DIR, state='directory', prefix='mirror', suffix="")['path'] @@ -199,8 +202,8 @@ def mirror_setup(duthosts, enum_rand_one_per_hwsku_frontend_hostname): @pytest.fixture(scope='function') -def gre_version(duthosts, enum_rand_one_per_hwsku_frontend_hostname): - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] +def gre_version(duthosts, enum_rand_one_per_hwsku_hostname): + duthost = duthosts[enum_rand_one_per_hwsku_hostname] asic_type = duthost.facts['asic_type'] if asic_type in ["mellanox"]: SESSION_INFO['gre'] = 0x8949 # Mellanox specific @@ -213,14 +216,14 @@ def gre_version(duthosts, enum_rand_one_per_hwsku_frontend_hostname): @pytest.fixture(scope='function') -def mirroring(duthosts, enum_rand_one_per_hwsku_frontend_hostname, neighbor_ip, mirror_setup, gre_version, request): +def mirroring(duthosts, enum_rand_one_per_hwsku_hostname, neighbor_ip, mirror_setup, gre_version, request): """ fixture gathers all configuration fixtures :param duthost: DUT host :param mirror_setup: mirror_setup fixture :param mirror_config: mirror_config fixture """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] logger.info("Adding mirror_session to DUT") acl_rule_file = os.path.join(mirror_setup['dut_tmp_dir'], ACL_RULE_PERSISTENT_FILE) extra_vars = { @@ -243,7 +246,8 @@ def mirroring(duthosts, enum_rand_one_per_hwsku_frontend_hostname, neighbor_ip, try: yield finally: - loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl', request=request) + loganalyzer = LogAnalyzer(ansible_host=duthost, marker_prefix='acl', request=request, + bughandler=get_bughandler_instance({"type": "default"})) loganalyzer.load_common_config() try: @@ -355,17 +359,24 @@ def gen_dump_file(duthost, since): return tar_file -def test_techsupport(request, config, duthosts, enum_rand_one_per_hwsku_frontend_hostname, skip_on_dpu): # noqa F811 +def test_techsupport(request, config, duthosts, enum_rand_one_per_hwsku_hostname, skip_on_dpu, # noqa F811 + platform_api_conn): # noqa F811 """ test the "show techsupport" command in a loop :param config: fixture to configure additional setups_list on dut. :param duthost: DUT host """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] loop_range = request.config.getoption("--loop_num") or DEFAULT_LOOP_RANGE loop_delay = request.config.getoption("--loop_delay") or DEFAULT_LOOP_DELAY since = request.config.getoption("--logs_since") or str(randint(1, 5)) + " minute ago" - + is_bmc_present = False + try: + if bmc.get_presence(platform_api_conn): + is_bmc_present = True + except Exception as e: + logger.warning("Failed to get BMC presence: {}".format(e)) + is_bmc_present = False logger.debug("Loop_range is {} and loop_delay is {}".format(loop_range, loop_delay)) for i in range(loop_range): @@ -374,7 +385,7 @@ def test_techsupport(request, config, duthosts, enum_rand_one_per_hwsku_frontend extracted_dump_folder_name = tar_file.lstrip('/var/dump/').split('.')[0] extracted_dump_folder_path = '/tmp/{}'.format(extracted_dump_folder_name) try: - validate_dump_file_content(duthost, extracted_dump_folder_path) + validate_dump_file_content(duthost, extracted_dump_folder_path, is_bmc_present) except AssertionError as err: raise AssertionError(err) finally: @@ -384,7 +395,7 @@ def test_techsupport(request, config, duthosts, enum_rand_one_per_hwsku_frontend time.sleep(loop_delay) -def validate_dump_file_content(duthost, dump_folder_path): +def validate_dump_file_content(duthost, dump_folder_path, is_bmc_present): """ Validate generated dump file content :param duthost: duthost object @@ -403,7 +414,14 @@ def validate_dump_file_content(duthost, dump_folder_path): # sai XML dump is only support on the switch sai_xml_regex = re.compile(r'sai_[\w-]+\.xml(?:\.gz)?') assert any(sai_xml_regex.fullmatch(file_name) for file_name in sai_sdk_dump), \ - "No SAI XML file found in sai_sdk_dump folder" + "No SAI XML file found in sai_sdk_dump folder" + if is_bmc_present: + bmc_dump = duthost.command(f"ls {dump_folder_path}/bmc/")["stdout_lines"] + logger.info("BMC is present, validate BMC dump files existence") + assert len(bmc_dump), "Folder 'bmc_dump' in dump archive is empty. Expected not empty folder" + bmc_regex = re.compile(r'bmc_[\w-]+\.tar.xz') + assert any(bmc_regex.fullmatch(file_name) for file_name in bmc_dump), "No BMC dump file found in bmc folder" + assert len(dump) > MIN_FILES_NUM, "Seems like not all expected files available in 'dump' folder in dump archive. " \ "Test expects not less than 50 files. Available files: {}".format(dump) assert len(etc) > MIN_FILES_NUM, "Seems like not all expected files available in 'etc' folder in dump archive. " \ @@ -447,7 +465,7 @@ def add_asic_arg(format_str, cmds_list, asic_num): @pytest.fixture(scope='function') -def commands_to_check(duthosts, enum_rand_one_per_hwsku_frontend_hostname): +def commands_to_check(duthosts, enum_rand_one_per_hwsku_hostname): """ Prepare a list of commands to be expected in the show techsupport output. All the expected commands are @@ -462,7 +480,7 @@ def commands_to_check(duthosts, enum_rand_one_per_hwsku_frontend_hostname): A dict of command groups with each group containing a list of commands """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] num = duthost.num_asics() cmds_to_check = { @@ -571,7 +589,7 @@ def check_cmds(cmd_group_name, cmd_group_to_check, cmdlist, strbash_in_cmdlist): def test_techsupport_commands( - duthosts, enum_rand_one_per_hwsku_frontend_hostname, commands_to_check, skip_on_dpu): # noqa F811 + duthosts, enum_rand_one_per_hwsku_hostname, commands_to_check, skip_on_dpu): # noqa F811 """ This test checks list of commands that will be run when executing 'show techsupport' CLI against a standard expected list of commands @@ -587,7 +605,7 @@ def test_techsupport_commands( """ cmd_not_found = defaultdict(list) - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] stdout = duthost.shell(r'sudo generate_dump -n | grep -v "^mkdir\|^rm\|^tar\|^gzip"') @@ -613,7 +631,7 @@ def test_techsupport_commands( pytest_assert(len(cmd_not_found) == 0, error_message) -def test_techsupport_on_dpu(duthosts, enum_rand_one_per_hwsku_frontend_hostname): +def test_techsupport_on_dpu(duthosts, enum_rand_one_per_hwsku_hostname): """ This test is to check some files exist or not in the dump file generated by show techsupport on DPU 1. Generate dump file by " show techsupport -r --since 'xx xxx xxx' " ( select 1-5 minutes ago randomly) @@ -624,7 +642,7 @@ def test_techsupport_on_dpu(duthosts, enum_rand_one_per_hwsku_frontend_hostname) 5. Validate that sai_sdk_dump is not empty folder :param duthosts: DUT host """ - duthost = duthosts[enum_rand_one_per_hwsku_frontend_hostname] + duthost = duthosts[enum_rand_one_per_hwsku_hostname] if not duthost.dut_basic_facts()['ansible_facts']['dut_basic_facts'].get("is_dpu"): pytest.skip("Skip the test, as it is supported only on DPU.")