diff --git a/src/dvsim/BUILD b/src/dvsim/BUILD deleted file mode 100644 index 709aefb7..00000000 --- a/src/dvsim/BUILD +++ /dev/null @@ -1,242 +0,0 @@ -# Copyright lowRISC contributors (OpenTitan project). -# Licensed under the Apache License, Version 2.0, see LICENSE for details. -# SPDX-License-Identifier: Apache-2.0 - -load("@rules_python//python:defs.bzl", "py_binary") -load("@ot_python_deps//:requirements.bzl", "requirement") - -package(default_visibility = ["//visibility:public"]) - -filegroup( - name = "doc_files", - srcs = glob([ - "**/*.md", - "**/*.png", - ]), -) - -py_library( - name = "utils", - srcs = ["utils.py"], - deps = [ - requirement("hjson"), - requirement("mistletoe"), - requirement("premailer"), - ], -) - -py_library( - name = "sim_utils", - srcs = ["sim_utils.py"], -) - -py_library( - name = "msg_buckets", - srcs = [ - "MsgBucket.py", - "MsgBuckets.py", - ], - deps = [ - ":utils", - ], -) - -py_library( - name = "modes", - srcs = [ - "Regression.py", - "Test.py", - "modes.py", - ], - deps = [ - ":utils", - ], -) - -py_library( - name = "timer", - srcs = ["Timer.py"], -) - -py_library( - name = "status_printer", - srcs = ["StatusPrinter.py"], - deps = [ - requirement("enlighten"), - ], -) - -py_library( - name = "launcher", - srcs = [ - "Launcher.py", - "LauncherFactory.py", - "LocalLauncher.py", - "LsfLauncher.py", - ], - deps = [ - ":utils", - ], -) - -py_library( - name = "deploy", - srcs = ["Deploy.py"], - deps = [ - ":launcher", - ":sim_utils", - ":utils", - requirement("tabulate"), - ], -) - -py_library( - name = "scheduler", - srcs = ["Scheduler.py"], - deps = [ - ":launcher", - ":status_printer", - ":timer", - ":utils", - ], -) - -py_library( - name = "cfg_json", - srcs = ["CfgJson.py"], - deps = [ - ":utils", - ], -) - -py_library( - name = "flow_cfg", - srcs = ["FlowCfg.py"], - deps = [ - ":cfg_json", - ":launcher", - ":scheduler", - ":utils", - requirement("hjson"), - ], -) - -py_library( - name = "oneshot_cfg", - srcs = ["OneShotCfg.py"], - deps = [ - ":deploy", - ":flow_cfg", - ":modes", - ":utils", - ], -) - -py_library( - name = "lint_cfg", - srcs = ["LintCfg.py"], - deps = [ - ":msg_buckets", - ":oneshot_cfg", - ":utils", - requirement("tabulate"), - ], -) - -py_library( - name = "formal_cfg", - srcs = ["FormalCfg.py"], - deps = [ - ":oneshot_cfg", - ":utils", - requirement("hjson"), - requirement("tabulate"), - ], -) - -py_library( - name = "cdc_cfg", - srcs = ["CdcCfg.py"], - deps = [ - ":lint_cfg", - ":msg_buckets", - ":utils", - requirement("tabulate"), - ], -) - -py_library( - name = "rdc_cfg", - srcs = ["RdcCfg.py"], - deps = [ - ":cdc_cfg", - ], -) - -py_library( - name = "testplan", - srcs = ["Testplan.py"], - deps = [ - requirement("hjson"), - requirement("mistletoe"), - requirement("tabulate"), - ], -) - -py_library( - name = "sim_results", - srcs = ["SimResults.py"], - deps = [ - ":testplan", - ], -) - -py_library( - name = "sim_cfg", - srcs = ["SimCfg.py"], - deps = [ - ":deploy", - ":flow_cfg", - ":modes", - ":sim_results", - ":testplan", - ":utils", - requirement("tabulate"), - ], -) - -py_library( - name = "syn_cfg", - srcs = ["SynCfg.py"], - deps = [ - ":oneshot_cfg", - ":utils", - requirement("hjson"), - requirement("tabulate"), - ], -) - -py_library( - name = "cfg_factory", - srcs = ["CfgFactory.py"], - deps = [ - ":cdc_cfg", - ":cfg_json", - ":formal_cfg", - ":lint_cfg", - ":sim_cfg", - ":syn_cfg", - ], -) - -py_binary( - name = "dvsim", - srcs = ["dvsim.py"], - deps = [ - ":cfg_factory", - ":deploy", - ":launcher", - ":timer", - ":utils", - ], -) diff --git a/src/dvsim/flow/sim.py b/src/dvsim/flow/sim.py index 21b5796d..0cd26976 100644 --- a/src/dvsim/flow/sim.py +++ b/src/dvsim/flow/sim.py @@ -18,7 +18,15 @@ from tabulate import tabulate from dvsim.flow.base import FlowCfg -from dvsim.job.deploy import CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, Deploy, RunTest +from dvsim.job.deploy import ( + CompileSim, + CovAnalyze, + CovMerge, + CovReport, + CovUnr, + Deploy, + RunTest, +) from dvsim.logging import log from dvsim.modes import BuildMode, Mode, RunMode, find_mode from dvsim.regression import Regression @@ -270,7 +278,10 @@ def _create_objects(self) -> None: self.sw_images.extend(build_mode_obj.sw_images) self.sw_build_opts.extend(build_mode_obj.sw_build_opts) else: - log.error('Mode "%s" enabled on the command line is not defined', en_build_mode) + log.error( + 'Mode "%s" enabled on the command line is not defined', + en_build_mode, + ) sys.exit(1) # Walk through run modes enabled on the CLI and append the opts @@ -876,7 +887,7 @@ def create_bucket_report(buckets): # Append coverage results if coverage was enabled. if self.cov_report_deploy is not None: - report_status = results[self.cov_report_deploy] + report_status = results[self.cov_report_deploy.qual_name] if report_status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 1a744494..5e1c509c 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -14,7 +14,7 @@ from tabulate import tabulate from dvsim.job.time import JobTime -from dvsim.launcher.factory import get_launcher +from dvsim.launcher.base import Launcher from dvsim.logging import log from dvsim.sim_utils import get_cov_summary_table, get_job_runtime, get_simulated_time from dvsim.utils import ( @@ -26,7 +26,11 @@ if TYPE_CHECKING: from dvsim.flow.sim import SimCfg - from dvsim.launcher.base import Launcher + +__all__ = ( + "CompileSim", + "Deploy", +) class Deploy: @@ -65,6 +69,7 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Cross ref the whole cfg object for ease. self.sim_cfg = sim_cfg + self.flow = sim_cfg.name # A list of jobs on which this job depends. self.dependencies = [] @@ -92,9 +97,6 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Construct the job's command. self.cmd = self._construct_cmd() - # Launcher instance created later using create_launcher() method. - self.launcher: Launcher | None = None - # Job's wall clock time (a.k.a CPU time, or runtime). self.job_runtime = JobTime() @@ -284,7 +286,7 @@ def is_equivalent_job(self, item: "Deploy") -> bool: log.verbose('Deploy job "%s" is equivalent to "%s"', item.name, self.name) return True - def pre_launch(self) -> None: + def pre_launch(self, launcher: Launcher) -> None: """Perform additional pre-launch activities (callback). This is invoked by launcher::_pre_launch(). @@ -325,15 +327,6 @@ def extract_info_from_log(self, log_text: list) -> None: log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") self.job_runtime.set(self.launcher.job_runtime_secs, "s") - def create_launcher(self) -> None: - """Create the launcher instance. - - Note that the launcher instance for ALL jobs in the same job group must - be created before the Scheduler starts to dispatch one by one. - """ - # Retain the handle to self for lookup & callbacks. - self.launcher = get_launcher(self) - def model_dump(self) -> Mapping: """Dump the deployment object to mapping object. @@ -371,7 +364,11 @@ def __init__(self, build_mode, sim_cfg) -> None: # Needs to be after the wildcard expansion to log anything meaningful if self.build_timeout_mins: - log.debug('Timeout for job "%s" is %d minutes.', self.name, self.build_timeout_mins) + log.debug( + 'Timeout for job "%s" is %d minutes.', + self.name, + self.build_timeout_mins, + ) def _define_attrs(self) -> None: """Define attributes.""" @@ -420,7 +417,7 @@ def _set_attrs(self) -> None: if self.sim_cfg.args.build_timeout_mins is not None: self.build_timeout_mins = self.sim_cfg.args.build_timeout_mins - def pre_launch(self) -> None: + def pre_launch(self, launcher: Launcher) -> None: """Perform pre-launch tasks.""" # Delete old coverage database directories before building again. We # need to do this because the build directory is not 'renewed'. @@ -446,7 +443,11 @@ def __init__(self, build_mode, sim_cfg) -> None: # Needs to be after the wildcard expansion to log anything meaningful if self.build_timeout_mins: - log.debug('Timeout for job "%s" is %d minutes.', self.name, self.build_timeout_mins) + log.debug( + 'Timeout for job "%s" is %d minutes.', + self.name, + self.build_timeout_mins, + ) def _define_attrs(self) -> None: super()._define_attrs() @@ -518,7 +519,11 @@ def __init__(self, index, test, build_job, sim_cfg: "SimCfg") -> None: # Needs to be after the wildcard expansion to log anything meaningful if self.run_timeout_mins: - log.debug('Timeout for job "%s" is %d minutes.', self.full_name, self.run_timeout_mins) + log.debug( + 'Timeout for job "%s" is %d minutes.', + self.full_name, + self.run_timeout_mins, + ) if build_job is not None: self.dependencies.append(build_job) @@ -595,9 +600,9 @@ def _set_attrs(self) -> None: self.run_timeout_multiplier, ) - def pre_launch(self) -> None: + def pre_launch(self, launcher: Launcher) -> None: """Perform pre-launch tasks.""" - self.launcher.renew_odir = True + launcher.renew_odir = True def post_finish(self, status) -> None: """Perform tidy up tasks.""" @@ -664,7 +669,11 @@ def _define_attrs(self) -> None: ) self.mandatory_misc_attrs.update( - {"cov_unr_dir": False, "cov_merge_db_dir": False, "build_fail_patterns": False}, + { + "cov_unr_dir": False, + "cov_merge_db_dir": False, + "build_fail_patterns": False, + }, ) def _set_attrs(self) -> None: @@ -749,7 +758,11 @@ def _define_attrs(self) -> None: self.mandatory_cmd_attrs.update({"cov_report_cmd": False, "cov_report_opts": False}) self.mandatory_misc_attrs.update( - {"cov_report_dir": False, "cov_merge_db_dir": False, "cov_report_txt": False}, + { + "cov_report_dir": False, + "cov_merge_db_dir": False, + "cov_report_txt": False, + }, ) def _set_attrs(self) -> None: diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index f2249e0e..e19a8225 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -2,33 +2,35 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 -import collections +"""Launcher abstract base class.""" + import datetime import os import re import sys +from abc import ABC, abstractmethod +from collections.abc import Mapping, Sequence from pathlib import Path +from typing import TYPE_CHECKING + +from pydantic import BaseModel, ConfigDict from dvsim.logging import log from dvsim.utils import clean_odirs, mk_symlink, rm_path +if TYPE_CHECKING: + from dvsim.job.deploy import Deploy + class LauncherError(Exception): - def __init__(self, msg) -> None: - self.msg = msg + """Error occurred during job launching.""" class LauncherBusyError(Exception): - def __init__(self, msg) -> None: - self.msg = msg + """Launcher is busy and the job was not able to be launched.""" -class ErrorMessage( - collections.namedtuple( - "ErrorMessage", - ["line_number", "message", "context"], - ), -): +class ErrorMessage(BaseModel): """Contains error-related information. This support classification of failures into buckets. The message field @@ -36,8 +38,14 @@ class ErrorMessage( the failing log that can be useful for quick diagnostics. """ + model_config = ConfigDict(frozen=True, extra="forbid") + + line_number: int | None = None + message: str + context: Sequence[str] + -class Launcher: +class Launcher(ABC): """Abstraction for launching and maintaining a job. An abstract class that provides methods to prepare a job's environment, @@ -47,7 +55,7 @@ class Launcher: """ # Type of launcher used as string. - variant = None + variant: str | None = None # Max jobs running at one time max_parallel = sys.maxsize @@ -83,7 +91,7 @@ class Launcher: ) @staticmethod - def set_pyvenv(project) -> None: + def set_pyvenv(project: str) -> None: """Activate a python virtualenv if available. The env variable _PYTHON_VENV if set, points to the path @@ -113,17 +121,22 @@ def set_pyvenv(project) -> None: Launcher.pyvenv = os.environ.get(f"{project.upper()}_PYVENV") @staticmethod - def prepare_workspace(project, repo_top, args) -> None: + @abstractmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: """Prepare the workspace based on the chosen launcher's needs. This is done once for the entire duration for the flow run. - 'project' is the name of the project. - 'repo_top' is the path to the repository. - 'args' are the command line args passed to dvsim. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + """ @staticmethod - def prepare_workspace_for_cfg(cfg) -> None: + @abstractmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: """Prepare the workspace for a cfg. This is invoked once for each cfg. @@ -131,13 +144,21 @@ def prepare_workspace_for_cfg(cfg) -> None: """ def __str__(self) -> str: + """Get a string representation.""" return self.deploy.full_name + ":launcher" - def __init__(self, deploy) -> None: + def __init__(self, deploy: "Deploy") -> None: + """Initialise launcher. + + Args: + deploy: deployment object that will be launched. + + """ cfg = deploy.sim_cfg # One-time preparation of the workspace. if not Launcher.workspace_prepared: + # TODO: CLI args should be processed far earlier than this self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args) Launcher.workspace_prepared = True @@ -173,7 +194,8 @@ def _make_odir(self) -> None: # If renew_odir flag is True - then move it. if self.renew_odir: clean_odirs(odir=self.deploy.odir, max_odirs=self.max_odirs) - os.makedirs(self.deploy.odir, exist_ok=True) + + Path(self.deploy.odir).mkdir(exist_ok=True, parents=True) def _link_odir(self, status) -> None: """Soft-links the job's directory based on job's status. @@ -189,7 +211,7 @@ def _link_odir(self, status) -> None: old = Path(self.deploy.sim_cfg.links["D"], self.deploy.qual_name) rm_path(old) - def _dump_env_vars(self, exports) -> None: + def _dump_env_vars(self, exports: Mapping[str, str]) -> None: """Write env vars to a file for ease of debug. Each extended class computes the list of exports and invokes this @@ -210,44 +232,58 @@ def _pre_launch(self) -> None: old runs, creating the output directory, dumping all env variables etc. This method is already invoked by launch() as the first step. """ - self.deploy.pre_launch() + self.deploy.pre_launch(self) self._make_odir() self.start_time = datetime.datetime.now() + @abstractmethod def _do_launch(self) -> None: """Launch the job.""" - raise NotImplementedError def launch(self) -> None: """Launch the job.""" self._pre_launch() self._do_launch() + @abstractmethod def poll(self) -> str | None: """Poll the launched job for completion. Invokes _check_status() and _post_finish() when the job completes. """ - raise NotImplementedError + @abstractmethod def kill(self) -> None: """Terminate the job.""" - raise NotImplementedError - def _check_status(self): + def _check_status(self) -> tuple[str, ErrorMessage | None]: """Determine the outcome of the job (P/F if it ran to completion). - Returns (status, err_msg) extracted from the log, where the status is - "P" if the it passed, "F" otherwise. This is invoked by poll() just - after the job finishes. err_msg is an instance of the named tuple - ErrorMessage. + Returns: + (status, err_msg) extracted from the log, where the status is + "P" if the it passed, "F" otherwise. This is invoked by poll() just + after the job finishes. err_msg is an instance of the named tuple + ErrorMessage. + """ - def _find_patterns(patterns, line): - """Helper function that returns the pattern if any of the given + def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: + """Get all patterns that match the given line. + + Helper function that returns the pattern if any of the given patterns is found, else None. + + Args: + patterns: sequence of regex patterns to check + line: string to check matches against + + Returns: + All matching patterns or None if there are no matches. + """ - assert patterns + if not patterns: + return None + for pattern in patterns: match = re.search(rf"{pattern}", line) if match: @@ -319,13 +355,15 @@ def _find_patterns(patterns, line): ) return "P", None - def _post_finish(self, status, err_msg) -> None: + def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: """Do post-completion activities, such as preparing the results. Must be invoked by poll(), after the job outcome is determined. - status is the status of the job, either 'P', 'F' or 'K'. - err_msg is an instance of the named tuple ErrorMessage. + Args: + status: status of the job, either 'P', 'F' or 'K'. + err_msg: an instance of the named tuple ErrorMessage. + """ assert status in ["P", "F", "K"] self._link_odir(status) @@ -335,6 +373,7 @@ def _post_finish(self, status, err_msg) -> None: # Run the target-specific cleanup tasks regardless of the job's # outcome. self.deploy.post_finish(status) + except Exception as e: # If the job had already failed, then don't do anything. If it's # cleanup task failed, then mark the job as failed. diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 2c829fa0..85a6470b 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -17,8 +17,10 @@ from types import FrameType from typing import TYPE_CHECKING, Any +from pydantic import BaseModel, ConfigDict + from dvsim.job.deploy import Deploy -from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError +from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer from dvsim.utils.timer import Timer @@ -27,6 +29,15 @@ from dvsim.flow.base import FlowCfg +class CompletedJobStatus(BaseModel): + """Job status.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + status: str + fail_msg: ErrorMessage + + def total_sub_items( d: Mapping[str, Sequence[Deploy]] | Mapping["FlowCfg", Sequence[Deploy]], ) -> int: @@ -138,15 +149,21 @@ def __init__( # per-target. self.item_to_status: MutableMapping[Deploy, str] = {} + # TODO: Why is the deployment object asked about which launcher to use when + # the launcher class is explicitly passed. Either each deployment can have it's + # own distinct Launcher class type or all deployments must have the same + # Launcher class? Both can't be true. + # Create the launcher instance for all items. - for item in self.items: - item.create_launcher() + self._launchers: Mapping[str, Launcher] = { + item.qual_name: launcher_cls(item) for item in self.items + } # The chosen launcher class. This allows us to access launcher # variant-specific settings such as max parallel jobs & poll rate. self.launcher_cls: type[Launcher] = launcher_cls - def run(self) -> Mapping[Deploy, str]: + def run(self) -> Mapping[str, CompletedJobStatus]: """Run all scheduled jobs and return the results. Returns the results (status) of all items dispatched for all @@ -211,7 +228,13 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: self.status_printer.exit() # We got to the end without anything exploding. Return the results. - return self.item_to_status + return { + d.qual_name: CompletedJobStatus( + status=status, + fail_msg=self._launchers[d.qual_name].fail_msg, + ) + for d, status in self.item_to_status.items() + } def add_to_scheduled(self, items: Sequence[Deploy]) -> None: """Add items to the schedule. @@ -222,14 +245,14 @@ def add_to_scheduled(self, items: Sequence[Deploy]) -> None: """ for item in items: target_dict = self._scheduled.setdefault(item.target, {}) - cfg_list = target_dict.setdefault(item.sim_cfg, []) + cfg_list = target_dict.setdefault(item.flow, []) if item not in cfg_list: cfg_list.append(item) def _unschedule_item(self, item: Deploy) -> None: """Remove deploy item from the schedule.""" target_dict = self._scheduled[item.target] - cfg_list = target_dict.get(item.sim_cfg) + cfg_list = target_dict.get(item.flow) if cfg_list is not None: with contextlib.suppress(ValueError): cfg_list.remove(item) @@ -237,7 +260,7 @@ def _unschedule_item(self, item: Deploy) -> None: # When all items in _scheduled[target][cfg] are finally removed, # the cfg key is deleted. if not cfg_list: - del target_dict[item.sim_cfg] + del target_dict[item.flow] def _enqueue_successors(self, item: Deploy | None = None) -> None: """Move an item's successors from _scheduled to _queued. @@ -305,7 +328,7 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: if target is None: return [] - cfgs = {item.sim_cfg} + cfgs = {item.flow} # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this @@ -398,7 +421,7 @@ def _poll(self, hms: str) -> bool: self._running[target], self.last_item_polled_idx[target], ) - status = item.launcher.poll() + status = self._launchers[item.qual_name].poll() level = log.VERBOSE if status not in ["D", "P", "F", "E", "K"]: @@ -509,7 +532,7 @@ def _dispatch(self, hms: str) -> None: for item in to_dispatch: try: - item.launcher.launch() + self._launchers[item.qual_name].launch() except LauncherError: log.exception("Error launching %s", item) @@ -605,7 +628,7 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: def _kill_item(self, item: Deploy) -> None: """Kill a running item and cancel all of its successors.""" - item.launcher.kill() + self._launchers[item.qual_name].kill() self.item_to_status[item] = "K" self._killed[item.target].add(item) self._running[item.target].remove(item) diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index ef7b4bb2..946a9dcf 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -6,7 +6,9 @@ import collections import re +from collections.abc import Mapping +from dvsim.scheduler import CompletedJobStatus from dvsim.testplan import Result _REGEX_REMOVE = [ @@ -81,18 +83,22 @@ def __init__(self, items, results) -> None: for item in items: self._add_item(item, results) - def _add_item(self, item, results) -> None: + def _add_item(self, item, results: Mapping[str, CompletedJobStatus]) -> None: """Recursively add a single item to the table of results.""" - status = results[item] - if status in ["F", "K"]: - bucket = self._bucketize(item.launcher.fail_msg.message) + job_status = results[item.qual_name] + if job_status.status in ["F", "K"]: + bucket = self._bucketize(job_status.fail_msg.message) self.buckets[bucket].append( - (item, item.launcher.fail_msg.line_number, item.launcher.fail_msg.context), + ( + item, + job_status.fail_msg.line_number, + job_status.fail_msg.context, + ), ) # Runs get added to the table directly if item.target == "run": - self._add_run(item, status) + self._add_run(item, job_status.status) def _add_run(self, item, status) -> None: """Add an entry to table for item."""