diff --git a/doc/develop/test/twister.rst b/doc/develop/test/twister.rst index b8a81eb0ffb0d..d76955f289d96 100644 --- a/doc/develop/test/twister.rst +++ b/doc/develop/test/twister.rst @@ -746,6 +746,43 @@ required_snippets: - cdc-acm-console - user-snippet-example +required_applications: (default empty) + Specify a list of test applications that must be built before current test can run. + It enables sharing of built applications between test scenarios, allowing tests + to access build artifacts from other applications. + + Each required application entry supports: + - ``name``: Test scenario identifier (required) + - ``platform``: Target platform (optional, defaults to current test's platform) + + Required applications must be available in the source tree (specified with ``-T`` + and/or ``-s`` options). When reusing build directories (e.g., with ``--no-clean``), + Twister can find required applications in the current build directory. + + How it works: + + - Twister builds the required applications first + - The main test application waits for required applications to complete + - Build directories of required applications are made available to the test harness + - For pytest harness, build directories are passed via ``--required-build`` arguments + and accessible through the ``required_build_dirs`` fixture + + Example configuration: + + .. code-block:: yaml + + tests: + sample.required_app_demo: + harness: pytest + required_applications: + - name: sample.shared_app + - name: sample.basic.helloworld + platform: native_sim + sample.shared_app: + build_only: true + + Limitations: Not supported with ``--subset`` or ``--runtime-artifact-cleanup`` options. + expect_reboot: (default False) Notify twister that the test scenario is expected to reboot while executing. When enabled, twister will suppress warnings about unexpected multiple runs diff --git a/samples/subsys/testsuite/pytest/shell/pytest_shared_app/test_shared_app.py b/samples/subsys/testsuite/pytest/shell/pytest_shared_app/test_shared_app.py new file mode 100755 index 0000000000000..48fa4bfc1e771 --- /dev/null +++ b/samples/subsys/testsuite/pytest/shell/pytest_shared_app/test_shared_app.py @@ -0,0 +1,18 @@ +# Copyright (c) 2025 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: Apache-2.0 +from __future__ import annotations + +import logging +from pathlib import Path + +from twister_harness import DeviceAdapter + +logger = logging.getLogger(__name__) + + +def test_required_build_dirs_found(dut: DeviceAdapter, required_build_dirs): + logger.info(f"Required build directories: {required_build_dirs}") + assert len(required_build_dirs) == 2, "Expected two required build directories" + assert Path(required_build_dirs[0]).is_dir() + assert Path(Path(required_build_dirs[0]) / 'build.log').exists() diff --git a/samples/subsys/testsuite/pytest/shell/testcase.yaml b/samples/subsys/testsuite/pytest/shell/testcase.yaml index a5f18fc7a2979..60b80817620d8 100644 --- a/samples/subsys/testsuite/pytest/shell/testcase.yaml +++ b/samples/subsys/testsuite/pytest/shell/testcase.yaml @@ -33,3 +33,11 @@ tests: - CONFIG_SHELL_VT100_COLORS=n harness_config: shell_commands: *kernel_commands + sample.pytest.required_app_demo: + required_applications: + - name: sample.pytest.shell + - name: sample.harness.shell + harness: pytest + harness_config: + pytest_root: + - "pytest_shared_app/test_shared_app.py" diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures.py index 621b0defd71a1..765da163312ad 100644 --- a/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures.py +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/fixtures.py @@ -87,6 +87,11 @@ def shell(dut: DeviceAdapter) -> Shell: return shell +@pytest.fixture(scope='session') +def required_build_dirs(request: pytest.FixtureRequest) -> list[str]: + return request.config.getoption('--required-build') + + @pytest.fixture() def mcumgr(device_object: DeviceAdapter) -> Generator[MCUmgr, None, None]: """Fixture to create an MCUmgr instance for serial connection.""" diff --git a/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py index ec212db98bde6..6aacb18f643a4 100644 --- a/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py +++ b/scripts/pylib/pytest-twister-harness/src/twister_harness/plugin.py @@ -122,6 +122,11 @@ def pytest_addoption(parser: pytest.Parser): choices=('function', 'class', 'module', 'package', 'session'), help='The scope for which `dut` and `shell` fixtures are shared.' ) + twister_harness_group.addoption( + '--required-build', action='append', default=[], metavar='PATH', + help='Required build directory / shared applications for the test. ' + 'May be given multiple times.' + ) twister_harness_group.addoption( '--twister-fixture', action='append', dest='fixtures', metavar='FIXTURE', default=[], help='Twister fixture supported by this platform. May be given multiple times.' diff --git a/scripts/pylib/twister/twisterlib/config_parser.py b/scripts/pylib/twister/twisterlib/config_parser.py index 738fa8285b406..5ffe989f2f5cd 100644 --- a/scripts/pylib/twister/twisterlib/config_parser.py +++ b/scripts/pylib/twister/twisterlib/config_parser.py @@ -55,6 +55,7 @@ class TwisterConfigParser: "extra_conf_files": {"type": "list", "default": []}, "extra_overlay_confs": {"type": "list", "default": []}, "extra_dtc_overlay_files": {"type": "list", "default": []}, + "required_applications": {"type": "list"}, "required_snippets": {"type": "list"}, "build_only": {"type": "bool", "default": False}, "build_on_all": {"type": "bool", "default": False}, diff --git a/scripts/pylib/twister/twisterlib/harness.py b/scripts/pylib/twister/twisterlib/harness.py index 0e9d27cf0ddd4..e69a4c1d2864e 100644 --- a/scripts/pylib/twister/twisterlib/harness.py +++ b/scripts/pylib/twister/twisterlib/harness.py @@ -434,6 +434,9 @@ def generate_command(self): f'Support for handler {handler.type_str} not implemented yet' ) + for req_build in self.instance.required_build_dirs: + command.append(f'--required-build={req_build}') + if handler.type_str != 'device': for fixture in handler.options.fixture: command.append(f'--twister-fixture={fixture}') diff --git a/scripts/pylib/twister/twisterlib/runner.py b/scripts/pylib/twister/twisterlib/runner.py index 0c3b506a27d98..a32883fbc6392 100644 --- a/scripts/pylib/twister/twisterlib/runner.py +++ b/scripts/pylib/twister/twisterlib/runner.py @@ -9,13 +9,13 @@ import os import pathlib import pickle -import queue import re import shutil import subprocess import sys import time import traceback +from collections import deque from math import log10 from multiprocessing import Lock, Process, Value from multiprocessing.managers import BaseManager @@ -874,7 +874,7 @@ def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs) ) self.log = "build.log" - self.instance = instance + self.instance: TestInstance = instance self.filtered_tests = 0 self.options = env.options self.env = env @@ -941,22 +941,21 @@ def log_info_file(self, inline_logs): else: self.log_info(f"{b_log}", inline_logs) - - def _add_to_pipeline(self, pipeline, op: str, additionals: dict=None): + def _add_to_processing_queue(self, processing_queue: deque, op: str, additionals: dict=None): if additionals is None: additionals = {} try: if op: task = dict({'op': op, 'test': self.instance}, **additionals) - pipeline.put(task) - # Only possible RuntimeError source here is a mutation of the pipeline during iteration. - # If that happens, we ought to consider the whole pipeline corrupted. + processing_queue.append(task) + # Only possible RuntimeError source here is a mutation of the processing_queue during + # iteration. If that happens, we ought to consider the whole processing_queue corrupted. except RuntimeError as e: logger.error(f"RuntimeError: {e}") traceback.print_exc() - - def process(self, pipeline, done, message, lock, results): + def process(self, processing_queue: deque, processing_ready: dict[str, TestInstance], + message, lock, results: ExecutionCounter): next_op = None additionals = {} @@ -990,7 +989,7 @@ def process(self, pipeline, done, message, lock, results): self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) next_op = 'report' finally: - self._add_to_pipeline(pipeline, next_op) + self._add_to_processing_queue(processing_queue, next_op) # The build process, call cmake and build with configured generator elif op == "cmake": @@ -1023,7 +1022,7 @@ def process(self, pipeline, done, message, lock, results): self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) next_op = 'report' finally: - self._add_to_pipeline(pipeline, next_op) + self._add_to_processing_queue(processing_queue, next_op) elif op == "build": try: @@ -1072,7 +1071,7 @@ def process(self, pipeline, done, message, lock, results): self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) next_op = 'report' finally: - self._add_to_pipeline(pipeline, next_op) + self._add_to_processing_queue(processing_queue, next_op) elif op == "gather_metrics": try: @@ -1103,7 +1102,7 @@ def process(self, pipeline, done, message, lock, results): self.instance.add_missing_case_status(TwisterStatus.BLOCK, reason) next_op = 'report' finally: - self._add_to_pipeline(pipeline, next_op) + self._add_to_processing_queue(processing_queue, next_op) # Run the generated binary using one of the supported handlers elif op == "run": @@ -1130,7 +1129,7 @@ def process(self, pipeline, done, message, lock, results): next_op = 'report' additionals = {} finally: - self._add_to_pipeline(pipeline, next_op, additionals) + self._add_to_processing_queue(processing_queue, next_op, additionals) # Run per-instance code coverage elif op == "coverage": @@ -1152,13 +1151,13 @@ def process(self, pipeline, done, message, lock, results): next_op = 'report' additionals = {} finally: - self._add_to_pipeline(pipeline, next_op, additionals) + self._add_to_processing_queue(processing_queue, next_op, additionals) # Report results and output progress to screen elif op == "report": try: with lock: - done.put(self.instance) + processing_ready.update({self.instance.name: self.instance}) self.report_out(results) if not self.options.coverage: @@ -1181,7 +1180,7 @@ def process(self, pipeline, done, message, lock, results): next_op = None additionals = {} finally: - self._add_to_pipeline(pipeline, next_op, additionals) + self._add_to_processing_queue(processing_queue, next_op, additionals) elif op == "cleanup": try: @@ -1810,11 +1809,10 @@ def calc_size(instance: TestInstance, from_buildlog: bool): class TwisterRunner: def __init__(self, instances, suites, env=None) -> None: - self.pipeline = None self.options = env.options self.env = env - self.instances = instances - self.suites = suites + self.instances: dict[str, TestInstance] = instances + self.suites: dict[str, TestSuite] = suites self.duts = None self.jobs = 1 self.results = None @@ -1824,14 +1822,15 @@ def run(self): retries = self.options.retry_failed + 1 - BaseManager.register('LifoQueue', queue.LifoQueue) + BaseManager.register('deque', deque, exposed=['append', 'appendleft', 'pop']) + BaseManager.register('get_dict', dict) manager = BaseManager() manager.start() self.results = ExecutionCounter(total=len(self.instances)) self.iteration = 0 - pipeline = manager.LifoQueue() - done_queue = manager.LifoQueue() + processing_queue: deque = manager.deque() + processing_ready: dict[str, TestInstance] = manager.get_dict() # Set number of jobs if self.options.jobs: @@ -1870,16 +1869,11 @@ def run(self): else: self.results.done = self.results.filtered_static + self.results.skipped - self.execute(pipeline, done_queue) + self.execute(processing_queue, processing_ready) - while True: - try: - inst = done_queue.get_nowait() - except queue.Empty: - break - else: - inst.metrics["handler_time"] = inst.execution_time - self.instances[inst.name] = inst + for inst in processing_ready.values(): + inst.metrics["handler_time"] = inst.execution_time + self.instances[inst.name] = inst print("") @@ -1922,7 +1916,7 @@ def show_brief(self): def add_tasks_to_queue( self, - pipeline, + processing_queue: deque, build_only=False, test_only=False, retry_build_errors=False @@ -1962,74 +1956,126 @@ def add_tasks_to_queue( ) if test_only and instance.run: - pipeline.put({"op": "run", "test": instance}) + processing_queue.append({"op": "run", "test": instance}) elif instance.filter_stages and "full" not in instance.filter_stages: - pipeline.put({"op": "filter", "test": instance}) + processing_queue.append({"op": "filter", "test": instance}) else: cache_file = os.path.join(instance.build_dir, "CMakeCache.txt") if os.path.exists(cache_file) and self.env.options.aggressive_no_clean: - pipeline.put({"op": "build", "test": instance}) + processing_queue.append({"op": "build", "test": instance}) else: - pipeline.put({"op": "cmake", "test": instance}) - + processing_queue.append({"op": "cmake", "test": instance}) + + def _are_required_apps_ready( + self, instance: TestInstance, processing_ready: dict[str, TestInstance] + ) -> bool: + """Verify that all required applications are ready to be used.""" + for required_app in instance.required_applications: + if processing_ready.get(required_app) is None: + return False + return True + + def _are_all_required_apps_success( + self, instance: TestInstance, processing_ready: dict[str, TestInstance] + ) -> bool: + """Verify that all required applications were successfully built.""" + found_failed_app = False + for required_app in instance.required_applications: + inst = processing_ready.get(required_app) + if inst.status not in (TwisterStatus.PASS, TwisterStatus.NOTRUN): + logger.debug(f"{required_app}: Required application failed: {inst.reason}") + found_failed_app = True + return not found_failed_app + + def are_required_apps_processed( + self, instance: TestInstance, processing_queue: deque, + processing_ready: dict[str, TestInstance], task + ) -> bool: + if not instance.required_applications: + return True + + if not self._are_required_apps_ready(instance, processing_ready): + # required app not ready yet, + # add the task back to the pipeline to process it later + if self.jobs > 1: + # to avoid busy waiting + time.sleep(1) + processing_queue.appendleft(task) + return False + + if not self._are_all_required_apps_success(instance, processing_ready): + instance.status = TwisterStatus.SKIP + for tc in instance.testcases: + tc.status = TwisterStatus.SKIP + instance.reason = "Required application failed" + instance.required_applications = [] + processing_queue.append({"op": "report", "test": instance}) + return False + + # keep paths to required applications build directories to use it in harness module + for required_image in instance.required_applications: + instance.required_build_dirs.append(self.instances[required_image].build_dir) + + # required applications are ready, clear to not process them later + instance.required_applications = [] + return True + + def process_tasks( + self, processing_queue: deque, processing_ready: dict[str, TestInstance], + lock, results: ExecutionCounter + ) -> bool: + while True: + try: + task = processing_queue.pop() + except IndexError: + break + else: + instance: TestInstance = task['test'] - def pipeline_mgr(self, pipeline, done_queue, lock, results): + if not self.are_required_apps_processed( + instance, processing_queue, processing_ready, task + ): + # postpone processing task if required applications are not ready + continue + + pb = ProjectBuilder(instance, self.env, self.jobserver) + pb.duts = self.duts + pb.process(processing_queue, processing_ready, task, lock, results) + if ( + self.env.options.quit_on_failure + and pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR] + ): + try: + while True: + processing_queue.pop() + except IndexError: + pass + return True + + def pipeline_mgr(self, processing_queue: deque, processing_ready: dict[str, TestInstance], + lock, results: ExecutionCounter): try: if sys.platform == 'linux': with self.jobserver.get_job(): - while True: - try: - task = pipeline.get_nowait() - except queue.Empty: - break - else: - instance = task['test'] - pb = ProjectBuilder(instance, self.env, self.jobserver) - pb.duts = self.duts - pb.process(pipeline, done_queue, task, lock, results) - if self.env.options.quit_on_failure and \ - pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: - try: - while True: - pipeline.get_nowait() - except queue.Empty: - pass - - return True + return self.process_tasks(processing_queue, processing_ready, lock, results) else: - while True: - try: - task = pipeline.get_nowait() - except queue.Empty: - break - else: - instance = task['test'] - pb = ProjectBuilder(instance, self.env, self.jobserver) - pb.duts = self.duts - pb.process(pipeline, done_queue, task, lock, results) - if self.env.options.quit_on_failure and \ - pb.instance.status in [TwisterStatus.FAIL, TwisterStatus.ERROR]: - try: - while True: - pipeline.get_nowait() - except queue.Empty: - pass - return True + return self.process_tasks(processing_queue, processing_ready, lock, results) except Exception as e: logger.error(f"General exception: {e}\n{traceback.format_exc()}") sys.exit(1) - def execute(self, pipeline, done): + def execute(self, processing_queue: deque, processing_ready: dict[str, TestInstance]): lock = Lock() logger.info("Adding tasks to the queue...") - self.add_tasks_to_queue(pipeline, self.options.build_only, self.options.test_only, + self.add_tasks_to_queue(processing_queue, self.options.build_only, self.options.test_only, retry_build_errors=self.options.retry_build_errors) logger.info("Added initial list of jobs to queue") processes = [] for _ in range(self.jobs): - p = Process(target=self.pipeline_mgr, args=(pipeline, done, lock, self.results, )) + p = Process(target=self.pipeline_mgr, + args=(processing_queue, processing_ready, lock, self.results, )) processes.append(p) p.start() logger.debug(f"Launched {self.jobs} jobs") diff --git a/scripts/pylib/twister/twisterlib/testinstance.py b/scripts/pylib/twister/twisterlib/testinstance.py index 8b36c8e7f9686..19cdb4526eac3 100644 --- a/scripts/pylib/twister/twisterlib/testinstance.py +++ b/scripts/pylib/twister/twisterlib/testinstance.py @@ -97,6 +97,8 @@ def __init__(self, testsuite, platform, toolchain, outdir): self.init_cases() self.filters = [] self.filter_type = None + self.required_applications = [] + self.required_build_dirs = [] def setup_run_id(self): self.run_id = self._get_run_id() diff --git a/scripts/pylib/twister/twisterlib/testplan.py b/scripts/pylib/twister/twisterlib/testplan.py index 32dc7d8cef6db..699104a20e304 100755 --- a/scripts/pylib/twister/twisterlib/testplan.py +++ b/scripts/pylib/twister/twisterlib/testplan.py @@ -7,6 +7,7 @@ # SPDX-License-Identifier: Apache-2.0 import collections import copy +import glob import itertools import json import logging @@ -175,7 +176,7 @@ def __init__(self, env: Namespace): self.selected_platforms = [] self.default_platforms = [] self.load_errors = 0 - self.instances = dict() + self.instances: dict[str, TestInstance] = {} self.instance_fail_count = 0 self.warnings = 0 @@ -744,6 +745,7 @@ def load_from_file(self, file, filter_platform=None): except FileNotFoundError as e: logger.error(f"{e}") return 1 + self.apply_changes_for_required_applications(loaded_from_file=True) def check_platform(self, platform, platform_list): return any(p in platform.aliases for p in platform_list) @@ -1192,6 +1194,8 @@ def apply_filters(self, **kwargs): else: self.add_instances(instance_list) + self.apply_changes_for_required_applications() + for _, case in self.instances.items(): # Do not create files for filtered instances if case.status == TwisterStatus.FILTER: @@ -1216,6 +1220,116 @@ def apply_filters(self, **kwargs): change_skip_to_error_if_integration(self.options, inst) inst.add_missing_case_status(inst.status) + def _find_required_instance(self, required_app, instance: TestInstance) -> TestInstance | None: + if req_platform := required_app.get("platform", None): + platform = self.get_platform(req_platform) + if not platform: + raise TwisterRuntimeError( + f"Unknown platform {req_platform} in required application" + ) + req_platform = platform.name + else: + req_platform = instance.platform.name + + for inst in self.instances.values(): + if required_app["name"] == inst.testsuite.id and req_platform == inst.platform.name: + return inst + return None + + def _find_required_application_in_outdir(self, required_app, + instance: TestInstance) -> str | None: + """Check if required application exists in build directory.""" + if not ( + self.options.no_clean + or self.options.only_failed + or self.options.test_only + or self.options.report_summary + ): + return None + + if platform := required_app.get("platform", None): + platform = self.get_platform(platform) + else: + platform = instance.platform + name = required_app["name"] + glob_pattern = f"{self.options.outdir}/{platform.normalized_name}/**/{name}" + build_dirs = glob.glob(glob_pattern, recursive=True) + if not build_dirs: + return None + if not os.path.exists(os.path.join(build_dirs[0], "zephyr")): + # application was only pre-built + return None + logger.debug(f"Found existing build directory for required app: {build_dirs[0]}") + return build_dirs[0] + + def apply_changes_for_required_applications(self, loaded_from_file=False) -> None: + # check if required applications are in scope + for instance in self.instances.values(): + if not instance.testsuite.required_applications: + continue + if instance.status == TwisterStatus.FILTER: + # do not proceed if the test is already filtered + continue + + if self.options.subset: + instance.add_filter("Required applications are not supported with --subsets", + Filters.CMD_LINE) + continue + + if self.options.runtime_artifact_cleanup: + instance.add_filter( + "Required applications are not supported with --runtime-artifact-cleanup", + Filters.CMD_LINE + ) + continue + + for required_app in instance.testsuite.required_applications: + req_instance = self._find_required_instance(required_app, instance) + if not req_instance: + # check if required application exists in build directory + if req_build_dir := self._find_required_application_in_outdir( + required_app, + instance + ): + # keep path to required build directory to use it in harness module + instance.required_build_dirs.append(req_build_dir) + continue + + instance.add_filter(f"Missing required application {required_app['name']}", + Filters.TESTSUITE) + logger.debug( + f"{instance.name}: Required application '{required_app['name']}' was not" + " found. Please verify if required test is provided with --testsuite-root" + " or build all required applications and rerun twister with --no-cleanup" + " option." + ) + break + + if req_instance.status == TwisterStatus.FILTER: + # check if required application is filtered because is not runnable + if loaded_from_file or ( + self.options.device_testing and not req_instance.run + and len(req_instance.filters) == 1 + and req_instance.reason == "Not runnable on device"): + # clear status flag to build required application + self.instances[req_instance.name].status = TwisterStatus.NONE + else: + instance.add_filter(f"Required app {req_instance.name} is filtered", + Filters.TESTSUITE) + logger.debug(f"{instance.name}: Required application '{req_instance.name}'" + " is filtered") + break + + if instance.testsuite.id in req_instance.testsuite.required_applications: + instance.add_filter("Circular dependency in required applications", + Filters.TESTSUITE) + logger.warning(f"{instance.name}: Circular dependency, current app also" + f" required by {req_instance.name}") + break + # keep dependencies to use it in the runner module to synchronize + # building of applications + instance.required_applications.append(req_instance.name) + def add_instances(self, instance_list): for instance in instance_list: self.instances[instance.name] = instance diff --git a/scripts/schemas/twister/testsuite-schema.yaml b/scripts/schemas/twister/testsuite-schema.yaml index ad6691c1327b3..1d00098e318c3 100644 --- a/scripts/schemas/twister/testsuite-schema.yaml +++ b/scripts/schemas/twister/testsuite-schema.yaml @@ -73,6 +73,17 @@ schema;scenario-schema: "expect_reboot": type: bool required: false + "required_applications": + type: seq + required: false + sequence: + - type: map + mapping: + "name": + type: str + required: true + "platform": + type: str "required_snippets": type: seq required: false diff --git a/scripts/tests/twister/test_runner.py b/scripts/tests/twister/test_runner.py index 5907c0a5aadb2..f452f03ab3fd6 100644 --- a/scripts/tests/twister/test_runner.py +++ b/scripts/tests/twister/test_runner.py @@ -11,12 +11,12 @@ import os import pathlib import pytest -import queue import re import subprocess import sys import yaml +from collections import deque from contextlib import nullcontext from elftools.elf.sections import SymbolTableSection from typing import List @@ -1490,7 +1490,7 @@ def test_projectbuilder_process( expected_skipped, expected_missing ): - def mock_pipeline_put(msg): + def mock_processing_queue_append(msg): if isinstance(pipeline_runtime_error, type) and \ issubclass(pipeline_runtime_error, Exception): raise RuntimeError('Pipeline Error!') @@ -1530,8 +1530,8 @@ def mock_determine_testcases(res): pb.run = mock.Mock() pb.gather_metrics = mock.Mock(return_value=metrics_res) - pipeline_mock = mock.Mock(put=mock.Mock(side_effect=mock_pipeline_put)) - done_mock = mock.Mock() + processing_queue_mock = mock.Mock(append=mock.Mock(side_effect=mock_processing_queue_append)) + processing_ready_mock = mock.Mock() lock_mock = mock.Mock( __enter__=mock.Mock(return_value=(mock.Mock(), mock.Mock())), __exit__=mock.Mock(return_value=None) @@ -1539,12 +1539,12 @@ def mock_determine_testcases(res): results_mock = mock.Mock() results_mock.filtered_runtime = 0 - pb.process(pipeline_mock, done_mock, message, lock_mock, results_mock) + pb.process(processing_queue_mock, processing_ready_mock, message, lock_mock, results_mock) assert all([log in caplog.text for log in expected_logs]) if resulting_message: - pipeline_mock.put.assert_called_with(resulting_message) + processing_queue_mock.append.assert_called_with(resulting_message) assert pb.instance.status == expected_status assert pb.instance.reason == expected_reason @@ -2513,18 +2513,17 @@ def mock_client_from_environ(jobs): jobclient_mock = mock.Mock() jobclient_mock().name='JobClient' - pipeline_q = queue.LifoQueue() - done_q = queue.LifoQueue() - done_instance = mock.Mock( + processing_queue = deque() + processing_ready = {} + processing_instance = mock.Mock( metrics={'k': 'v2'}, execution_time=30 ) - done_instance.name='dummy instance' - done_q.put(done_instance) + processing_instance.name='dummy instance' + processing_ready[processing_instance.name] = processing_instance manager_mock = mock.Mock() - manager_mock().LifoQueue = mock.Mock( - side_effect=iter([pipeline_q, done_q]) - ) + manager_mock().deque = mock.Mock(return_value=processing_queue) + manager_mock().get_dict = mock.Mock(return_value=processing_ready) results_mock = mock.Mock() results_mock().error = 1 @@ -2734,10 +2733,10 @@ def mock_get_cmake_filter_stages(filter, keys): ) tr.results = mock.Mock(iteration=0) - pipeline_mock = mock.Mock() + processing_queue_mock = mock.Mock() tr.add_tasks_to_queue( - pipeline_mock, + processing_queue_mock, build_only, test_only, retry_build_errors @@ -2751,10 +2750,10 @@ def mock_get_cmake_filter_stages(filter, keys): if retry_build_errors: tr.get_cmake_filter_stages.assert_any_call('some', mock.ANY) - print(pipeline_mock.put.call_args_list) + print(processing_queue_mock.append.call_args_list) print([mock.call(el) for el in expected_pipeline_elements]) - assert pipeline_mock.put.call_args_list == \ + assert processing_queue_mock.append.call_args_list == \ [mock.call(el) for el in expected_pipeline_elements] @@ -2769,12 +2768,12 @@ def mock_get_cmake_filter_stages(filter, keys): ) def test_twisterrunner_pipeline_mgr(mocked_jobserver, platform): counter = 0 - def mock_get_nowait(): + def mock_pop(): nonlocal counter counter += 1 if counter > 5: - raise queue.Empty() - return {'test': 'dummy'} + raise IndexError + return {'test': mock.Mock(required_applications=[])} instances = {} suites = [] @@ -2787,16 +2786,16 @@ def mock_get_nowait(): ) ) - pipeline_mock = mock.Mock() - pipeline_mock.get_nowait = mock.Mock(side_effect=mock_get_nowait) - done_queue_mock = mock.Mock() + processing_queue_mock = mock.Mock() + processing_queue_mock.pop = mock.Mock(side_effect=mock_pop) + processing_ready_mock = mock.Mock() lock_mock = mock.Mock() results_mock = mock.Mock() with mock.patch('sys.platform', platform), \ mock.patch('twisterlib.runner.ProjectBuilder',\ return_value=mock.Mock()) as pb: - tr.pipeline_mgr(pipeline_mock, done_queue_mock, lock_mock, results_mock) + tr.pipeline_mgr(processing_queue_mock, processing_ready_mock, lock_mock, results_mock) assert len(pb().process.call_args_list) == 5