|
7 | 7 | import shutil |
8 | 8 | from ipaddress import IPv4Interface |
9 | 9 | from pathlib import Path |
| 10 | +from typing import Dict |
10 | 11 |
|
11 | 12 | from mfd_host import Host |
12 | 13 | from mfd_connect.exceptions import ( |
|
17 | 18 |
|
18 | 19 | from common.nicctl import Nicctl |
19 | 20 | from Engine.const import * |
| 21 | +from Engine.csv_report import ( |
| 22 | + csv_add_test, |
| 23 | + csv_write_report, |
| 24 | + update_compliance_result, |
| 25 | +) |
20 | 26 | from Engine.mcm_apps import MediaProxy, MeshAgent, get_mcm_path, get_mtl_path |
21 | 27 | from datetime import datetime |
| 28 | +from mfd_common_libs.custom_logger import add_logging_level |
| 29 | +from mfd_common_libs.log_levels import TEST_FAIL, TEST_INFO, TEST_PASS |
| 30 | +from pytest_mfd_logging.amber_log_formatter import AmberLogFormatter |
| 31 | + |
22 | 32 |
|
23 | 33 | logger = logging.getLogger(__name__) |
| 34 | +phase_report_key = pytest.StashKey[Dict[str, pytest.CollectReport]]() |
| 35 | + |
| 36 | + |
| 37 | +@pytest.hookimpl(wrapper=True, tryfirst=True) |
| 38 | +def pytest_runtest_makereport(item, call): |
| 39 | + # execute all other hooks to obtain the report object |
| 40 | + rep = yield |
| 41 | + |
| 42 | + # store test results for each phase of a call, which can |
| 43 | + # be "setup", "call", "teardown" |
| 44 | + item.stash.setdefault(phase_report_key, {})[rep.when] = rep |
| 45 | + |
| 46 | + return rep |
24 | 47 |
|
25 | 48 |
|
26 | 49 | @pytest.fixture(scope="function") |
@@ -380,3 +403,89 @@ def log_interface_driver_info(hosts: dict[str, Host]) -> None: |
380 | 403 | logger.info( |
381 | 404 | f"Interface {interface.name} on host {host.name} uses driver: {driver_info.driver_name} ({driver_info.driver_version})" |
382 | 405 | ) |
| 406 | + |
| 407 | + |
| 408 | +@pytest.fixture(scope="session", autouse=True) |
| 409 | +def log_session(): |
| 410 | + add_logging_level("TESTCMD", TESTCMD_LVL) |
| 411 | + |
| 412 | + today = datetime.today() |
| 413 | + folder = today.strftime("%Y-%m-%dT%H:%M:%S") |
| 414 | + path = os.path.join(LOG_FOLDER, folder) |
| 415 | + path_symlink = os.path.join(LOG_FOLDER, "latest") |
| 416 | + try: |
| 417 | + os.remove(path_symlink) |
| 418 | + except FileNotFoundError: |
| 419 | + pass |
| 420 | + os.makedirs(path, exist_ok=True) |
| 421 | + os.symlink(folder, path_symlink) |
| 422 | + yield |
| 423 | + shutil.copy("pytest.log", f"{LOG_FOLDER}/latest/pytest.log") |
| 424 | + csv_write_report(f"{LOG_FOLDER}/latest/report.csv") |
| 425 | + |
| 426 | +@pytest.fixture(scope="function", autouse=True) |
| 427 | +def log_case(request, caplog: pytest.LogCaptureFixture): |
| 428 | + case_id = request.node.nodeid |
| 429 | + case_folder = os.path.dirname(case_id) |
| 430 | + os.makedirs(os.path.join(LOG_FOLDER, "latest", case_folder), exist_ok=True) |
| 431 | + logfile = os.path.join(LOG_FOLDER, "latest", f"{case_id}.log") |
| 432 | + fh = logging.FileHandler(logfile) |
| 433 | + formatter = request.session.config.pluginmanager.get_plugin( |
| 434 | + "logging-plugin" |
| 435 | + ).formatter |
| 436 | + format = AmberLogFormatter(formatter) |
| 437 | + fh.setFormatter(format) |
| 438 | + fh.setLevel(logging.DEBUG) |
| 439 | + logger = logging.getLogger() |
| 440 | + logger.addHandler(fh) |
| 441 | + yield |
| 442 | + report = request.node.stash[phase_report_key] |
| 443 | + if report["setup"].failed: |
| 444 | + logging.log(level=TEST_FAIL, msg=f"Setup failed for {case_id}") |
| 445 | + os.chmod(logfile, 0o4755) |
| 446 | + result = "Fail" |
| 447 | + elif ("call" not in report) or report["call"].failed: |
| 448 | + logging.log(level=TEST_FAIL, msg=f"Test failed for {case_id}") |
| 449 | + os.chmod(logfile, 0o4755) |
| 450 | + result = "Fail" |
| 451 | + elif report["call"].passed: |
| 452 | + logging.log(level=TEST_PASS, msg=f"Test passed for {case_id}") |
| 453 | + os.chmod(logfile, 0o755) |
| 454 | + result = "Pass" |
| 455 | + else: |
| 456 | + logging.log(level=TEST_INFO, msg=f"Test skipped for {case_id}") |
| 457 | + result = "Skip" |
| 458 | + |
| 459 | + logger.removeHandler(fh) |
| 460 | + |
| 461 | + commands = [] |
| 462 | + for record in caplog.get_records("call"): |
| 463 | + if record.levelno == TESTCMD_LVL: |
| 464 | + commands.append(record.message) |
| 465 | + |
| 466 | + csv_add_test( |
| 467 | + test_case=case_id, |
| 468 | + commands="\n".join(commands), |
| 469 | + result=result, |
| 470 | + issue="n/a", |
| 471 | + result_note="n/a", |
| 472 | + ) |
| 473 | + |
| 474 | + |
| 475 | +@pytest.fixture(scope="session") |
| 476 | +def compliance_report(request, log_session, test_config): |
| 477 | + """ |
| 478 | + This function is used for compliance check and report. |
| 479 | + """ |
| 480 | + # TODO: Implement compliance check logic. When tcpdump pcap is enabled, at the end of the test session all pcaps |
| 481 | + # shall be send into EBU list. |
| 482 | + # Pcaps shall be stored in the ramdisk, and then moved to the compliance |
| 483 | + # folder or send into EBU list after each test finished and remove it from the ramdisk. |
| 484 | + # Compliance report generation logic goes here after yield. Or in another class / function but triggered here. |
| 485 | + # AFAIK names of pcaps contains test name so it can be matched with result of each test like in code below. |
| 486 | + yield |
| 487 | + if test_config.get("compliance", False): |
| 488 | + logging.info("Compliance mode enabled, updating compliance results") |
| 489 | + for item in request.session.items: |
| 490 | + test_case = item.nodeid |
| 491 | + update_compliance_result(test_case, "Fail") |
0 commit comments