|
| 1 | +import os |
| 2 | +import logging |
| 3 | +import json |
| 4 | +import pytest |
| 5 | +import random |
| 6 | +import secrets |
| 7 | +import time |
| 8 | +from urllib.parse import urlparse |
| 9 | +from datetime import datetime |
| 10 | + |
| 11 | +from tests.common.helpers.assertions import pytest_assert |
| 12 | +from tests.common.helpers.platform_api import bmc |
| 13 | +from tests.common.reboot import SONIC_SSH_PORT, SONIC_SSH_REGEX |
| 14 | +from tests.common.platform.device_utils import platform_api_conn, start_platform_api_service # noqa: F401 |
| 15 | +from .platform_api_test_base import PlatformApiTestBase |
| 16 | +from tests.common.helpers.firmware_helper import show_firmware |
| 17 | + |
| 18 | + |
| 19 | +logger = logging.getLogger(__name__) |
| 20 | + |
| 21 | +pytestmark = [ |
| 22 | + pytest.mark.disable_loganalyzer, # disable automatic loganalyzer |
| 23 | + pytest.mark.topology('any') |
| 24 | +] |
| 25 | + |
| 26 | +BMC_DEFAULT_PASSWORD = "0penBmcTempPass!" |
| 27 | +BMC_ROOT_USERNAME = "root" |
| 28 | +BMC_SHORTEST_PASSWD_LEN = 12 |
| 29 | +BMC_LONGEST_PASSWD_LEN = 20 |
| 30 | +BMC_DUMP_FILENAME = "bmc_dump_{}.tar.xz" |
| 31 | +BMC_DUMP_PATH = "/tmp" |
| 32 | +LATEST_BMC_VERSION_IDX = 0 |
| 33 | +OLD_BMC_VERSION_IDX = 1 |
| 34 | +EROT_BUSY_MSG = "ERoT is busy" |
| 35 | +WAIT_TIME = 30 |
| 36 | + |
| 37 | + |
| 38 | +@pytest.fixture(scope="function", autouse=True) |
| 39 | +def is_bmc_present(platform_api_conn): |
| 40 | + if not bmc.get_presence(platform_api_conn): |
| 41 | + pytest.skip("BMC is not present, skipping BMC platform API tests") |
| 42 | + |
| 43 | + |
| 44 | +@pytest.fixture(scope="module") |
| 45 | +def bmc_ip(duthosts, enum_rand_one_per_hwsku_hostname): |
| 46 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 47 | + platform = duthost.shell("sudo show platform summary | grep Platform | awk '{print $2}'")["stdout"] |
| 48 | + bmc_config_file = f"/usr/share/sonic/device/{platform}/bmc.json" |
| 49 | + duthost.fetch(src=bmc_config_file, dest='/tmp') |
| 50 | + with open(f'/tmp/{duthost.hostname}/{bmc_config_file}', "r") as f: |
| 51 | + bmc_config = json.load(f) |
| 52 | + yield bmc_config["bmc_addr"] |
| 53 | + |
| 54 | + |
| 55 | +class TestBMCApi(PlatformApiTestBase): |
| 56 | + """Platform and Host API test cases for the BMC class""" |
| 57 | + |
| 58 | + def _update_bmc_firmware_by_api(self, duthost, fw_image, timeout=600): |
| 59 | + start_time = time.time() |
| 60 | + |
| 61 | + while True: |
| 62 | + if time.time() - start_time > timeout: |
| 63 | + logger.warning(f"Timeout after {timeout} seconds while updating BMC firmware") |
| 64 | + return False |
| 65 | + |
| 66 | + time.sleep(WAIT_TIME) |
| 67 | + ret_code, message = bmc.update_firmware(duthost, fw_image) |
| 68 | + if EROT_BUSY_MSG in message: |
| 69 | + logger.info(f"{EROT_BUSY_MSG}, waiting for {WAIT_TIME} seconds") |
| 70 | + continue |
| 71 | + elif ret_code != 0: |
| 72 | + logger.warning(f"Failed to update BMC firmware: return code: {ret_code}, message:{message}") |
| 73 | + return False |
| 74 | + else: |
| 75 | + logger.info(f"BMC firmware updated successfully!") |
| 76 | + break |
| 77 | + |
| 78 | + bmc.request_bmc_reset(duthost) |
| 79 | + return True |
| 80 | + |
| 81 | + def _generate_password(self): |
| 82 | + password_length = random.choice(range(BMC_SHORTEST_PASSWD_LEN, BMC_LONGEST_PASSWD_LEN)) |
| 83 | + logger.info(f"Generated password length: {password_length}") |
| 84 | + raw_password = secrets.token_urlsafe(64) |
| 85 | + password = raw_password[:password_length] |
| 86 | + logger.info(f"Generated password: {password}") |
| 87 | + return password |
| 88 | + |
| 89 | + def _string_to_dict(self, str): |
| 90 | + result = {} |
| 91 | + for line in str.strip().split('\n'): |
| 92 | + if ':' in line: |
| 93 | + key, value = line.split(':', 1) |
| 94 | + result[key.strip()] = value.strip() |
| 95 | + return result |
| 96 | + |
| 97 | + def _validate_bmc_login(self, duthost, bmc_ip, password, expected_success=True): |
| 98 | + res = duthost.command(f"curl -k -u {BMC_ROOT_USERNAME}:{password} -X " |
| 99 | + f"GET https://{bmc_ip}/redfish/v1/AccountService/Accounts")["stdout"] |
| 100 | + pytest_assert(res is not None, "Failed to login to BMC") |
| 101 | + if expected_success: |
| 102 | + pytest_assert('error' not in res, f"Failed to login to BMC with password: {password}") |
| 103 | + else: |
| 104 | + pytest_assert('error' in res, f"Successfully login to BMC with password: {password}") |
| 105 | + |
| 106 | + def _change_bmc_root_password(self, duthost, bmc_ip, password): |
| 107 | + res = duthost.command(f'curl -k -u {BMC_ROOT_USERNAME}:{BMC_DEFAULT_PASSWORD} -X PATCH ' |
| 108 | + f'https://{bmc_ip}/redfish/v1/AccountService/Accounts/root -H "Content-Type: application/json"' |
| 109 | + f' -d \'{{"Password":"{password}"}}\'')["stdout"] |
| 110 | + pytest_assert(res is not None, f"Failed to change BMC root password to {password}") |
| 111 | + pytest_assert('error' not in res, f"Failed to change BMC root password to {password} with error response: {res}") |
| 112 | + |
| 113 | + def _validate_bmc_dump_finished(self, duthost, task_id, timestamp): |
| 114 | + ret, msg = bmc.get_bmc_debug_log_dump(duthost, task_id, BMC_DUMP_FILENAME.format(timestamp), BMC_DUMP_PATH) |
| 115 | + if ret == 0: |
| 116 | + if msg == '': |
| 117 | + logger.info(f"BMC dump finished!") |
| 118 | + return True |
| 119 | + logger.info(f"Failed to retrieve BMC dump: {msg}") |
| 120 | + return False |
| 121 | + |
| 122 | + def _get_bmc_version(self, duthost, timeout=120): |
| 123 | + start_time = time.time() |
| 124 | + |
| 125 | + while True: |
| 126 | + if time.time() - start_time > timeout: |
| 127 | + logger.warning(f"Timeout after {timeout} seconds while getting BMC version") |
| 128 | + return |
| 129 | + |
| 130 | + res = duthost.show_and_parse('sudo show platform firmware status') |
| 131 | + for entry in res: |
| 132 | + if entry['component'] == 'BMC': |
| 133 | + if entry['version'] == 'N/A': |
| 134 | + continue |
| 135 | + return entry['version'] |
| 136 | + |
| 137 | + def test_get_name(self, platform_api_conn): |
| 138 | + name = bmc.get_name(platform_api_conn) |
| 139 | + pytest_assert(name is not None, "Unable to retrieve BMC name") |
| 140 | + pytest_assert(isinstance(name, str), f"BMC name type appears incorrect: {type(name)}") |
| 141 | + pytest_assert(name == 'BMC', f"BMC name appears incorrect: {name}") |
| 142 | + |
| 143 | + def test_get_presence(self, platform_api_conn): |
| 144 | + presence = bmc.get_presence(platform_api_conn) |
| 145 | + pytest_assert(presence is not None, "Unable to retrieve BMC presence") |
| 146 | + pytest_assert(isinstance(presence, bool), f"BMC presence appears incorrect: {type(presence)}") |
| 147 | + pytest_assert(presence is True, f"BMC is not present: {presence}") |
| 148 | + |
| 149 | + def test_get_model(self, duthosts, enum_rand_one_per_hwsku_hostname): |
| 150 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 151 | + model = bmc.get_model(duthost) |
| 152 | + bmc_eeprom_info = duthost.command("sudo show platform bmc eeprom")["stdout"] |
| 153 | + pytest_assert(model is not None, "Unable to retrieve BMC model") |
| 154 | + pytest_assert(model in bmc_eeprom_info, f"BMC model appears incorrect: {model}") |
| 155 | + |
| 156 | + def test_get_serial(self, duthosts, enum_rand_one_per_hwsku_hostname): |
| 157 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 158 | + serial = bmc.get_serial(duthost) |
| 159 | + bmc_eeprom_info = duthost.command("sudo show platform bmc summary")["stdout"] |
| 160 | + pytest_assert(serial is not None, "Unable to retrieve BMC serial number") |
| 161 | + pytest_assert(str(serial) in bmc_eeprom_info, f"BMC serial number appears incorrect: {serial}") |
| 162 | + |
| 163 | + def test_get_revision(self, platform_api_conn): |
| 164 | + revision = bmc.get_revision(platform_api_conn) |
| 165 | + pytest_assert(revision is not None, "Unable to retrieve BMC revision") |
| 166 | + pytest_assert(revision == 'N/A', f"BMC revision appears incorrect: {revision}") |
| 167 | + |
| 168 | + def test_get_status(self, platform_api_conn): |
| 169 | + status = bmc.get_status(platform_api_conn) |
| 170 | + pytest_assert(status is not None, "Unable to retrieve BMC status") |
| 171 | + pytest_assert(isinstance(status, bool), f"BMC status appears incorrect: {type(status)}") |
| 172 | + pytest_assert(status is True, f"BMC status appears incorrect: {status}") |
| 173 | + |
| 174 | + def test_is_replaceable(self, platform_api_conn): |
| 175 | + replaceable = bmc.is_replaceable(platform_api_conn) |
| 176 | + pytest_assert(replaceable is not None, "Unable to retrieve BMC is_replaceable") |
| 177 | + pytest_assert(isinstance(replaceable, bool), f"BMC replaceable value must be a bool value: {type(replaceable)}") |
| 178 | + pytest_assert(replaceable is False, f"BMC replaceable value appears incorrect: {replaceable}") |
| 179 | + |
| 180 | + def test_get_eeprom(self, duthosts, enum_rand_one_per_hwsku_hostname): |
| 181 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 182 | + eeprom = bmc.get_eeprom(duthost) |
| 183 | + bmc_eeprom_info = self._string_to_dict(duthost.command("sudo show platform bmc eeprom")["stdout"]) |
| 184 | + pytest_assert(eeprom is not None, f"Failed to retrieve system EEPROM: {eeprom}") |
| 185 | + pytest_assert(isinstance(eeprom, dict), f"BMC eeprom value must be a dict value: {type(eeprom)}") |
| 186 | + |
| 187 | + for key, value in bmc_eeprom_info.items(): |
| 188 | + pytest_assert(key in eeprom, f"BMC eeprom {key} appears incorrect") |
| 189 | + pytest_assert(eeprom[key] == value, f"BMC eeprom {key} appears incorrect") |
| 190 | + |
| 191 | + def test_get_version(self, duthosts, enum_rand_one_per_hwsku_hostname): |
| 192 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 193 | + version = bmc.get_version(duthost) |
| 194 | + bmc_summary = duthost.command("sudo show platform bmc summary")["stdout"] |
| 195 | + pytest_assert(version is not None, f"Unable to retrieve BMC version: {version}") |
| 196 | + pytest_assert(version in bmc_summary, f"BMC version appears incorrect: {version}") |
| 197 | + |
| 198 | + def test_reset_root_password(self, duthosts, enum_rand_one_per_hwsku_hostname, bmc_ip): |
| 199 | + """ |
| 200 | + Test BMC root password reset with platform API |
| 201 | +
|
| 202 | + Steps: |
| 203 | + 1. Reset the BMC root password by BMC platform api reset_root_password |
| 204 | + 2. Validate the root password had been reset to the default password by login test using Redfish api |
| 205 | + 3. Change the root password to a new value by using Redfish api |
| 206 | + 4. Validate login password had been changed by login test using Redfish api |
| 207 | + 5. Reset the BMC root password by BMC platform api reset_root_password() |
| 208 | + 6. Validate the root password had been reset to the default password by login test using Redfish api |
| 209 | + """ |
| 210 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 211 | + bmc.reset_root_password(duthost) |
| 212 | + self._validate_bmc_login(duthost, bmc_ip, BMC_DEFAULT_PASSWORD) |
| 213 | + temp_password = self._generate_password() |
| 214 | + self._change_bmc_root_password(duthost, bmc_ip, temp_password) |
| 215 | + self._validate_bmc_login(duthost, bmc_ip, temp_password) |
| 216 | + bmc.reset_root_password(duthost) |
| 217 | + self._validate_bmc_login(duthost, bmc_ip, BMC_DEFAULT_PASSWORD) |
| 218 | + |
| 219 | + def test_bmc_dump(self, duthosts, enum_rand_one_per_hwsku_hostname, platform_api_conn): |
| 220 | + """ |
| 221 | + Test BMC dump with API |
| 222 | +
|
| 223 | + Steps: |
| 224 | + 1. Trigger the BMC dump by BMC api trigger_bmc_debug_log_dump() |
| 225 | + 2. During waiting, check the dump process by BMC api get_bmc_debug_log_dump(task_id, filename, path) |
| 226 | + 3. After BMC dump finished, validate the BMC dump file existence |
| 227 | + """ |
| 228 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 229 | + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') |
| 230 | + bmc_dump_path = BMC_DUMP_PATH + '/' + BMC_DUMP_FILENAME.format(timestamp) |
| 231 | + ret_code, (task_id, err_msg) = bmc.trigger_bmc_debug_log_dump(duthost) |
| 232 | + pytest_assert(ret_code == 0, f"Failed to retrieve BMC dump: {err_msg}") |
| 233 | + logger.info(f"BMC dump task id: {task_id}") |
| 234 | + pytest_assert(self._validate_bmc_dump_finished(duthost, task_id, timestamp), "BMC dump failed") |
| 235 | + pytest_assert(duthost.command(f"ls -l {bmc_dump_path}")["rc"] == 0, |
| 236 | + f"BMC dump file not found: {bmc_dump_path}") |
| 237 | + |
| 238 | + def test_bmc_firmware_update(self, duthosts, enum_rand_one_per_hwsku_hostname, fw_pkg, localhost): |
| 239 | + """ |
| 240 | + Test BMC firmware update with platform API and CLI |
| 241 | +
|
| 242 | + Steps: |
| 243 | + 1. Check and record the original BMC firmware version |
| 244 | + 2. Update the BMC firmware version by command |
| 245 | + 'config platform firmware install chassis component BMC fw -y xxx' |
| 246 | + 3. Wait after the installation done |
| 247 | + 4. Validate the BMC firmware had been updated to the destination version by command 'show platform firmware status' |
| 248 | + 5. Recover the BMC firmware version to the original one by BMC platform api update_firmware(fw_image) |
| 249 | + 6. Wait after the installation done |
| 250 | + 7. Validate the BMC firmware had been restored to the original version by command 'show platform firmware status' |
| 251 | + """ |
| 252 | + duthost = duthosts[enum_rand_one_per_hwsku_hostname] |
| 253 | + bmc_version_origin = self._get_bmc_version(duthost) |
| 254 | + logger.info(f"BMC version origin: {bmc_version_origin}") |
| 255 | + |
| 256 | + chassis = list(show_firmware(duthost)["chassis"].keys())[0] |
| 257 | + logger.info(f"Chassis: {chassis}") |
| 258 | + fw_pkg_path_new = fw_pkg["chassis"][chassis]["component"]["BMC"][LATEST_BMC_VERSION_IDX]["firmware"] |
| 259 | + fw_pkg_clean_path_new = urlparse(fw_pkg_path_new).path |
| 260 | + fw_pkt_name_new = os.path.basename(fw_pkg_path_new) |
| 261 | + logger.info(f"BMC firmware path: {fw_pkg_clean_path_new}") |
| 262 | + logger.info(f"Copy BMC firmware to localhost: /tmp/{fw_pkt_name_new}") |
| 263 | + duthost.copy(src=fw_pkg_clean_path_new, dest=f"/tmp/{fw_pkt_name_new}") |
| 264 | + |
| 265 | + logger.info(f"Execute BMC firmware update to {fw_pkt_name_new} and Wait for BMC firmware update to complete") |
| 266 | + res = duthost.command(f"sudo config platform firmware install chassis component BMC fw -y /tmp/{fw_pkt_name_new}") |
| 267 | + |
| 268 | + bmc_version_latest = self._get_bmc_version(duthost) |
| 269 | + logger.info(f"BMC version after update: {bmc_version_latest}") |
| 270 | + pytest_assert(bmc_version_latest != bmc_version_origin, "BMC firmware update failed") |
| 271 | + |
| 272 | + fw_pkg_path_old = fw_pkg["chassis"][chassis]["component"]["BMC"][OLD_BMC_VERSION_IDX]["firmware"] |
| 273 | + fw_pkg_clean_path_old = urlparse(fw_pkg_path_old).path |
| 274 | + fw_pkt_name_old = os.path.basename(fw_pkg_path_old) |
| 275 | + logger.info(f"BMC firmware path: {fw_pkg_clean_path_old}") |
| 276 | + logger.info(f"Copy BMC firmware to localhost: /tmp/{fw_pkt_name_old}") |
| 277 | + duthost.copy(src=fw_pkg_clean_path_old, dest=f"/tmp/{fw_pkt_name_old}") |
| 278 | + |
| 279 | + logger.info(f"Execute BMC firmware update to {fw_pkt_name_old} and Wait for BMC firmware update to complete") |
| 280 | + res = self._update_bmc_firmware_by_api(duthost, f"/tmp/{fw_pkt_name_old}") |
| 281 | + pytest_assert(res, "Failed to execute BMC firmware update by API!") |
| 282 | + # logger.info(f"Wait for BMC power cycle finishing") |
| 283 | + # localhost.wait_for(host=duthost.mgmt_ip, |
| 284 | + # port=SONIC_SSH_PORT, |
| 285 | + # state="started", |
| 286 | + # search_regex=SONIC_SSH_REGEX, |
| 287 | + # timeout=180) |
| 288 | + |
| 289 | + bmc_version_current = self._get_bmc_version(duthost) |
| 290 | + logger.info(f"BMC version after update: {bmc_version_current}") |
| 291 | + pytest_assert(bmc_version_latest != bmc_version_current, "BMC firmware recovery failed") |
0 commit comments