diff --git a/flake.lock b/flake.lock index 86cca31..946349e 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1760423683, - "narHash": "sha256-Tb+NYuJhWZieDZUxN6PgglB16yuqBYQeMJyYBGCXlt8=", + "lastModified": 1762233356, + "narHash": "sha256-cGS3lLTYusbEP/IJIWGgnkzIl+FA5xDvtiHyjalGr4k=", "owner": "nixos", "repo": "nixpkgs", - "rev": "a493e93b4a259cd9fea8073f89a7ed9b1c5a1da2", + "rev": "ca534a76c4afb2bdc07b681dbc11b453bab21af8", "type": "github" }, "original": { @@ -36,11 +36,11 @@ }, "nixpkgs-unstable": { "locked": { - "lastModified": 1760524057, - "narHash": "sha256-EVAqOteLBFmd7pKkb0+FIUyzTF61VKi7YmvP1tw4nEw=", + "lastModified": 1762111121, + "narHash": "sha256-4vhDuZ7OZaZmKKrnDpxLZZpGIJvAeMtK6FKLJYUtAdw=", "owner": "nixos", "repo": "nixpkgs", - "rev": "544961dfcce86422ba200ed9a0b00dd4b1486ec5", + "rev": "b3d51a0365f6695e7dd5cdf3e180604530ed33b4", "type": "github" }, "original": { @@ -63,11 +63,11 @@ ] }, "locked": { - "lastModified": 1759113590, - "narHash": "sha256-fgxP2RCN4cg0jYiMYoETYc7TZ2JjgyvJa2y9l8oSUFE=", + "lastModified": 1761781027, + "narHash": "sha256-YDvxPAm2WnxrznRqWwHLjryBGG5Ey1ATEJXrON+TWt8=", "owner": "pyproject-nix", "repo": "build-system-pkgs", - "rev": "dbfc0483b5952c6b86e36f8b3afeb9dde30ea4b5", + "rev": "795a980d25301e5133eca37adae37283ec3c8e66", "type": "github" }, "original": { @@ -83,11 +83,11 @@ ] }, "locked": { - "lastModified": 1760402624, - "narHash": "sha256-jF6UKLs2uGc2rtved8Vrt58oTWjTQoAssuYs/0578Z4=", + "lastModified": 1762427963, + "narHash": "sha256-CkPlAbIQ87wmjy5qHibfzk4DmMGBNqFer+lLfXjpP5M=", "owner": "nix-community", "repo": "pyproject.nix", - "rev": "84c4ea102127c77058ea1ed7be7300261fafc7d2", + "rev": "4540ea004e04fcd12dd2738d51383d10f956f7b9", "type": "github" }, "original": { @@ -131,11 +131,11 @@ ] }, "locked": { - "lastModified": 1760161183, - "narHash": "sha256-1USClOZthg+pGJp+p3ouVtTMO+ZY8Cd0+FbsNN/RpO8=", + "lastModified": 1761872265, + "narHash": "sha256-i25GRgp2vUOebY70L3NTAgkd+Pr1hnn5xM3qHxH0ONU=", "owner": "pyproject-nix", "repo": "uv2nix", - "rev": "b6ed0901aec29583532abe65117b18d86a49b617", + "rev": "74dfb62871be152ad3673b143b0cc56105a4f3c5", "type": "github" }, "original": { diff --git a/ruff-ci.toml b/ruff-ci.toml index f0c39ac..dca968d 100644 --- a/ruff-ci.toml +++ b/ruff-ci.toml @@ -92,7 +92,6 @@ ignore = [ "S103", "S202", "S311", - "S602", "S605", "SIM115", "SLF001", diff --git a/src/dvsim/cli.py b/src/dvsim/cli.py index 9690d27..8868dc3 100644 --- a/src/dvsim/cli.py +++ b/src/dvsim/cli.py @@ -82,7 +82,7 @@ def resolve_scratch_root(arg_scratch_root, proj_root): arg_scratch_root = os.path.realpath(arg_scratch_root) try: - os.makedirs(arg_scratch_root, exist_ok=True) + Path(arg_scratch_root).mkdir(exist_ok=True, parents=True) except PermissionError as e: log.fatal(f"Failed to create scratch root {arg_scratch_root}:\n{e}.") sys.exit(1) @@ -238,7 +238,7 @@ def copy_repo(src, dest) -> None: log.verbose("[copy_repo] [cmd]: \n%s", " ".join(cmd)) # Make sure the dest exists first. - os.makedirs(dest, exist_ok=True) + Path(dest).mkdir(exist_ok=True, parents=True) try: subprocess.run(cmd, check=True, capture_output=True) except subprocess.CalledProcessError as e: diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index ff126f0..2a12980 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -16,9 +16,10 @@ import hjson from dvsim.flow.hjson import set_target_attribute +from dvsim.job.data import CompletedJobStatus from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log -from dvsim.scheduler import CompletedJobStatus, Scheduler +from dvsim.scheduler import Scheduler from dvsim.utils import ( find_and_substitute_wildcards, md_results_to_html, @@ -403,7 +404,7 @@ def create_deploy_objects(self) -> None: for item in self.cfgs: item._create_deploy_objects() - def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: + def deploy_objects(self) -> Sequence[CompletedJobStatus]: """Public facing API for deploying all available objects. Runs each job and returns a map from item to status. @@ -416,27 +417,36 @@ def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: log.error("Nothing to run!") sys.exit(1) + jobs = [d.get_job_spec() for d in deploy] + if os.environ.get("DVSIM_DEPLOY_DUMP", "true"): filename = f"deploy_{self.branch}_{self.timestamp}.json" (Path(self.scratch_root) / filename).write_text( json.dumps( # Sort on full name to ensure consistent ordering sorted( - [d.dump() for d in deploy], - key=lambda d: d["full_name"], + [ + j.model_dump( + # callback functions can't be serialised + exclude={"pre_launch", "post_finish"}, + mode="json", + ) + for j in jobs + ], + key=lambda j: j["full_name"], ), indent=2, ), ) return Scheduler( - items=deploy, + items=jobs, launcher_cls=get_launcher_cls(), interactive=self.interactive, ).run() @abstractmethod - def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: + def _gen_results(self, results: Sequence[CompletedJobStatus]) -> str: """Generate flow results. The function is called after the flow has completed. It collates @@ -446,18 +456,18 @@ def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: report, which is in markdown format. Args: - results: dictionary mapping deployed item names to completed job status. + results: completed job status objects. Returns: Results as a formatted string """ - def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: + def gen_results(self, results: Sequence[CompletedJobStatus]) -> None: """Generate flow results. Args: - results: dictionary mapping deployed item names to completed job status. + results: completed job status objects. """ for item in self.cfgs: @@ -476,7 +486,7 @@ def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: self.write_results(self.results_html_name, self.results_summary_md) @abstractmethod - def gen_results_summary(self) -> None: + def gen_results_summary(self) -> str: """Public facing API to generate summary results for each IP/cfg file.""" def write_results(self, html_filename: str, text_md: str, json_str: str | None = None) -> None: diff --git a/src/dvsim/flow/formal.py b/src/dvsim/flow/formal.py index 716f161..5710ece 100644 --- a/src/dvsim/flow/formal.py +++ b/src/dvsim/flow/formal.py @@ -228,7 +228,7 @@ def _gen_results(self, results): "results.hjson", ) try: - with open(result_data) as results_file: + with Path(result_data).open() as results_file: self.result = hjson.load(results_file, use_decimal=True) except OSError as err: log.warning("%s", err) diff --git a/src/dvsim/flow/one_shot.py b/src/dvsim/flow/one_shot.py index 1a30f29..ea8dc15 100644 --- a/src/dvsim/flow/one_shot.py +++ b/src/dvsim/flow/one_shot.py @@ -4,7 +4,7 @@ """Class describing a one-shot build configuration object.""" -import os +import pathlib from collections import OrderedDict from dvsim.flow.base import FlowCfg @@ -135,7 +135,7 @@ def _create_dirs(self) -> None: """Create initial set of directories.""" for link in self.links: rm_path(self.links[link]) - os.makedirs(self.links[link]) + pathlib.Path(self.links[link]).mkdir(parents=True) def _create_deploy_objects(self) -> None: """Create deploy objects from build modes.""" diff --git a/src/dvsim/flow/sim.py b/src/dvsim/flow/sim.py index e492a9e..1624f9a 100644 --- a/src/dvsim/flow/sim.py +++ b/src/dvsim/flow/sim.py @@ -6,11 +6,10 @@ import fnmatch import json -import os import re import sys from collections import OrderedDict, defaultdict -from collections.abc import Mapping +from collections.abc import Sequence from datetime import datetime, timezone from pathlib import Path from typing import ClassVar @@ -18,13 +17,13 @@ from tabulate import tabulate from dvsim.flow.base import FlowCfg +from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.deploy import ( CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, - Deploy, RunTest, ) from dvsim.logging import log @@ -423,7 +422,7 @@ def _create_dirs(self) -> None: """Create initial set of directories.""" for link in self.links: rm_path(self.links[link]) - os.makedirs(self.links[link]) + Path(self.links[link]).mkdir(parents=True) def _expand_run_list(self, build_map): """Generate a list of tests to be run. @@ -540,8 +539,10 @@ def _create_deploy_objects(self) -> None: self._create_dirs() def _cov_analyze(self) -> None: - """Use the last regression coverage data to open up the GUI tool to - analyze the coverage. + """Open GUI tool for coverage analysis. + + Use the last regression coverage data to open up the GUI tool to analyze + the coverage. """ # Create initial set of directories, such as dispatched, passed etc. self._create_dirs() @@ -555,8 +556,10 @@ def cov_analyze(self) -> None: item._cov_analyze() def _cov_unr(self) -> None: - """Use the last regression coverage data to generate unreachable - coverage exclusions. + """Generate unreachable coverage exclusions. + + Use the last regression coverage data to generate unreachable coverage + exclusions. """ # TODO, Only support VCS if self.tool not in ["vcs", "xcelium"]: @@ -573,7 +576,7 @@ def cov_unr(self) -> None: for item in self.cfgs: item._cov_unr() - def _gen_json_results(self, run_results: Mapping[Deploy, str]) -> str: + def _gen_json_results(self, run_results: Sequence[CompletedJobStatus]) -> str: """Return the run results as json-formatted dictionary.""" def _pct_str_to_float(s: str) -> float | None: @@ -589,14 +592,8 @@ def _pct_str_to_float(s: str) -> float | None: def _test_result_to_dict(tr) -> dict: """Map a test result entry to a dict.""" - job_time_s = ( - tr.job_runtime.with_unit("s").get()[0] if tr.job_runtime is not None else None - ) - sim_time_us = ( - tr.simulated_time.with_unit("us").get()[0] - if tr.simulated_time is not None - else None - ) + job_time_s = tr.job_runtime + sim_time_us = tr.simulated_time pass_rate = tr.passing * 100.0 / tr.total if tr.total > 0 else 0 return { "name": tr.name, @@ -644,7 +641,7 @@ def _test_result_to_dict(tr) -> dict: # If the testplan does not yet have test results mapped to testpoints, # map them now. - sim_results = SimResults(self.deploy, run_results) + sim_results = SimResults(results=run_results) if not self.testplan.test_results_mapped: self.testplan.map_test_results(test_results=sim_results.table) @@ -707,12 +704,14 @@ def _test_result_to_dict(tr) -> dict: # Extract failure buckets. if sim_results.buckets: by_tests = sorted(sim_results.buckets.items(), key=lambda i: len(i[1]), reverse=True) + for bucket, tests in by_tests: unique_tests = defaultdict(list) for test, line, context in tests: - if not isinstance(test, RunTest): + if test.job_type != "RunTest": continue unique_tests[test.name].append((test, line, context)) + fts = [] for test_name, test_runs in unique_tests.items(): frs = [] @@ -721,12 +720,13 @@ def _test_result_to_dict(tr) -> dict: { "seed": str(test.seed), "failure_message": { - "log_file_path": test.get_log_path(), + "log_file_path": str(test.log_path), "log_file_line_num": line, "text": "".join(context), }, }, ) + fts.append( { "name": test_name, @@ -747,7 +747,7 @@ def _test_result_to_dict(tr) -> dict: # Return the `results` dictionary as json string. return json.dumps(self.results_dict) - def _gen_results(self, results: Mapping[Deploy, str]) -> str: + def _gen_results(self, results: Sequence[CompletedJobStatus]) -> str: """Generate simulation results. The function is called after the regression has completed. It collates the @@ -761,12 +761,12 @@ def _gen_results(self, results: Mapping[Deploy, str]) -> str: def indent_by(level: int) -> str: return " " * (4 * level) - def create_failure_message(test, line, context): + def create_failure_message(test: JobSpec, line, context): message = [f"{indent_by(2)}* {test.qual_name}\\"] if line: - message.append(f"{indent_by(2)} Line {line}, in log " + test.get_log_path()) + message.append(f"{indent_by(2)} Line {line}, in log {test.log_path}") else: - message.append(f"{indent_by(2)} Log {test.get_log_path()}") + message.append(f"{indent_by(2)} Log {test.log_path}") if context: message.append("") lines = [f"{indent_by(4)}{c.rstrip()}" for c in context] @@ -817,8 +817,7 @@ def create_bucket_report(buckets): fail_msgs.append("") return fail_msgs - deployed_items = self.deploy - sim_results = SimResults(deployed_items, results) + sim_results = SimResults(results=results) # Generate results table for runs. results_str = "## " + self.results_title + "\n" @@ -887,8 +886,17 @@ def create_bucket_report(buckets): # Append coverage results if coverage was enabled. if self.cov_report_deploy is not None: - report_status = results[self.cov_report_deploy.full_name] - if report_status == "P": + report_status: CompletedJobStatus | None = None + for job_status in results: + if job_status.full_name == self.cov_report_deploy.full_name: + report_status = job_status + break + + if report_status is None: + msg = f"Coverage report not found for {self.cov_report_deploy.full_name}" + raise KeyError(msg) + + if report_status.status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. if hasattr(self, "cov_report_page"): @@ -907,7 +915,7 @@ def create_bucket_report(buckets): self.results_md = results_str return results_str - def gen_results_summary(self): + def gen_results_summary(self) -> str: """Generate the summary results table. This method is specific to the primary cfg. It summarizes the results diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py new file mode 100644 index 0000000..600b37f --- /dev/null +++ b/src/dvsim/job/data.py @@ -0,0 +1,154 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job data models. + +The JobSpec is used to capture all the information required to be able to +schedule a job. Once the job has finished a CompletedJobStatus is used to +capture the results of the job run. +""" + +from collections.abc import Callable, Mapping, Sequence +from pathlib import Path + +from pydantic import BaseModel, ConfigDict + +from dvsim.launcher.base import ErrorMessage, Launcher + +__all__ = ( + "CompletedJobStatus", + "JobSpec", + "WorkspaceConfig", +) + + +class WorkspaceConfig(BaseModel): + """Workspace configuration.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + project: str + """Name of the project""" + timestamp: str + """Time stamp of the run.""" + + project_root: Path + """Path to the project root.""" + scratch_root: Path + """Path to the scratch directory root.""" + scratch_path: Path + """Path within the scratch directory to use for this run.""" + + +class JobSpec(BaseModel): + """Job specification.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + job_type: str + """Deployment type""" + + target: str + """run phase [build, run, ...]""" + flow: str + """Name of the flow config (e.g. tl_agent)""" + tool: str + """EDA tool used""" + + name: str + """Name of the job""" + seed: int | None + """Seed if there is one.""" + + full_name: str + """Full name disambiguates across multiple cfg being run (example: + 'aes:default', 'uart:default' builds. + """ + + qual_name: str + """Qualified name disambiguates the instance name with other instances + of the same class (example: 'uart_smoke' reseeded multiple times + needs to be disambiguated using the index -> '0.uart_smoke'. + """ + + workspace_cfg: WorkspaceConfig + """Workspace configuration.""" + + dependencies: list[str] + """Full names of the other Jobs that this one depends on.""" + needs_all_dependencies_passing: bool + """Wait for dependent jobs to pass before scheduling.""" + weight: int + """Weight to apply to the scheduling priority.""" + timeout_mins: int | None + """Timeout to apply to the launched job.""" + + cmd: str + """Command to run to execute the job.""" + exports: Mapping[str, str] + """Environment variables to set in the context of the running job.""" + dry_run: bool + """Go through the motions but don't actually run the job.""" + interactive: bool + """Enable interactive mode.""" + gui: bool + """Enable GUI mode.""" + + odir: Path + """Output directory for the job results files.""" + log_path: Path + """Path for the job log file.""" + links: Mapping[str, Path] + """Path for links directories.""" + + # TODO: remove the need for these callables here + pre_launch: Callable[[Launcher], None] + """Callback function for pre-launch actions.""" + post_finish: Callable[[str], None] + """Callback function for tidy up actions once the job is finished.""" + + pass_patterns: Sequence[str] + """regex patterns to match on to determine if the job is successful.""" + fail_patterns: Sequence[str] + """regex patterns to match on to determine if the job has failed.""" + + +class CompletedJobStatus(BaseModel): + """Job status.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + job_type: str + """Deployment type""" + name: str + """Name of the job""" + seed: int | None + """Seed if there is one.""" + + full_name: str + """Full name disambiguates across multiple cfg being run (example: + 'aes:default', 'uart:default' builds. + """ + + qual_name: str + """Qualified name disambiguates the instance name with other instances + of the same class (example: 'uart_smoke' reseeded multiple times + needs to be disambiguated using the index -> '0.uart_smoke'. + """ + + target: str + """run phase [build, run, ...]""" + + log_path: Path + """Path for the job log file.""" + + job_runtime: float + """Duration of the job.""" + simulated_time: float + """Simulation time.""" + + status: str + """Job status string [P,F,K,...]""" + fail_msg: ErrorMessage + """Error message.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 511f157..c620ddb 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -7,18 +7,17 @@ import pprint import random import shlex -from collections.abc import Mapping +from collections.abc import Callable, Mapping from pathlib import Path from typing import TYPE_CHECKING, ClassVar -from pydantic import BaseModel -from pydantic.config import ConfigDict from tabulate import tabulate +from dvsim.job.data import JobSpec, WorkspaceConfig from dvsim.job.time import JobTime from dvsim.launcher.base import Launcher from dvsim.logging import log -from dvsim.sim_utils import get_cov_summary_table, get_job_runtime, get_simulated_time +from dvsim.sim_utils import get_cov_summary_table from dvsim.utils import ( clean_odirs, find_and_substitute_wildcards, @@ -31,19 +30,6 @@ from dvsim.modes import BuildMode -class WorkspaceConfig(BaseModel): - """Workspace configuration.""" - - model_config = ConfigDict(frozen=True, extra="forbid") - - project: str - timestamp: str - - project_root: Path - scratch_root: Path - scratch_path: Path - - __all__ = ( "CompileSim", "Deploy", @@ -115,9 +101,6 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Construct the job's command. self.cmd = self._construct_cmd() - # Job's wall clock time (a.k.a CPU time, or runtime). - self.job_runtime = JobTime() - self.workspace_cfg = WorkspaceConfig( project=sim_cfg.name, project_root=sim_cfg.proj_root, @@ -126,6 +109,36 @@ def __init__(self, sim_cfg: "SimCfg") -> None: timestamp=sim_cfg.args.timestamp, ) + def get_job_spec(self) -> "JobSpec": + """Get the job spec for this deployment.""" + return JobSpec( + job_type=self.__class__.__name__, + target=self.target, + flow=self.flow, + tool=self.sim_cfg.tool, + name=self.name, + seed=getattr(self, "seed", None), + full_name=self.full_name, + qual_name=self.qual_name, + workspace_cfg=self.workspace_cfg, + dependencies=[d.full_name for d in self.dependencies], + weight=self.weight, + timeout_mins=self.get_timeout_mins(), + cmd=self.cmd, + exports=self.exports, + dry_run=self.dry_run, + interactive=self.sim_cfg.interactive, + gui=self.gui, + needs_all_dependencies_passing=self.needs_all_dependencies_passing, + pre_launch=self.pre_launch(), + post_finish=self.post_finish(), + odir=self.odir, + links=self.sim_cfg.links, + log_path=Path(f"{self.odir}/{self.target}.log"), + pass_patterns=self.pass_patterns, + fail_patterns=self.fail_patterns, + ) + def _define_attrs(self) -> None: """Define the attributes this instance needs to have. @@ -312,64 +325,30 @@ def is_equivalent_job(self, item: "Deploy") -> bool: log.verbose('Deploy job "%s" is equivalent to "%s"', item.name, self.name) return True - def pre_launch(self, launcher: Launcher) -> None: - """Perform additional pre-launch activities (callback). + def pre_launch(self) -> Callable[[Launcher], None]: + """Get pre-launch callback.""" - This is invoked by launcher::_pre_launch(). - """ + def callback(launcher: Launcher) -> None: + """Perform additional pre-launch activities (callback). - def post_finish(self, status: str) -> None: - """Perform additional post-finish activities (callback). + This is invoked by launcher::_pre_launch(). + """ - This is invoked by launcher::_post_finish(). - """ + return callback - def get_log_path(self) -> str: - """Return the log file path.""" - return f"{self.odir}/{self.target}.log" + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" - def get_timeout_mins(self) -> float | None: - """Return the timeout in minutes.""" - - def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: - """Extract information pertaining to the job from its log. + def callback(status: str) -> None: + """Perform additional post-finish activities (callback). - This method parses the log text after the job has completed, for the - extraction of information pertaining to the job's performance. This - base class method extracts the job's runtime (i.e. the wall clock time) - as reported by the tool. The tool reported runtime is the most accurate - since it is devoid of the delays incurred due to infrastructure and - setup overhead. + This is invoked by launcher::_post_finish(). + """ - The extended classes may override this method to extract other pieces - of information from the log. - - `log_text` is the job's log file contents as a list of lines. - """ - try: - time, unit = get_job_runtime(log_text, self.sim_cfg.tool) - self.job_runtime.set(time, unit) - except RuntimeError as e: - log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") - self.job_runtime.set(job_runtime_secs, "s") + return callback - def dump(self) -> Mapping: - """Dump the deployment object to mapping object. - - Returns: - Representation of a deployment object as a dict. - - """ - return { - "full_name": self.full_name, - "type": self.__class__.__name__, - "exports": self.exports, - "interactive": self.sim_cfg.interactive, - "log_path": self.get_log_path(), - "timeout_mins": self.get_timeout_mins(), - "cmd": self.cmd, - "gui": self.gui, - } + def get_timeout_mins(self) -> float | None: + """Return the timeout in minutes.""" class CompileSim(Deploy): @@ -454,11 +433,16 @@ def _set_attrs(self) -> None: if self.sim_cfg.args.build_timeout_mins is not None: self.build_timeout_mins = self.sim_cfg.args.build_timeout_mins - def pre_launch(self, launcher: Launcher) -> None: - """Perform pre-launch tasks.""" - # Delete old coverage database directories before building again. We - # need to do this because the build directory is not 'renewed'. - rm_path(self.cov_db_dir) + def pre_launch(self) -> Callable[[Launcher], None]: + """Get pre-launch callback.""" + + def callback(_: Launcher) -> None: + """Perform pre-launch tasks.""" + # Delete old coverage database directories before building again. We + # need to do this because the build directory is not 'renewed'. + rm_path(self.cov_db_dir) + + return callback def get_timeout_mins(self) -> float: """Return the timeout in minutes. @@ -637,15 +621,25 @@ def _set_attrs(self) -> None: self.run_timeout_multiplier, ) - def pre_launch(self, launcher: Launcher) -> None: + def pre_launch(self) -> Callable[[Launcher], None]: """Perform pre-launch tasks.""" - launcher.renew_odir = True - def post_finish(self, status) -> None: - """Perform tidy up tasks.""" - if status != "P": - # Delete the coverage data if available. - rm_path(self.cov_db_test_dir) + def callback(launcher: Launcher) -> None: + """Perform pre-launch tasks.""" + launcher.renew_odir = True + + return callback + + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" + + def callback(status: str) -> None: + """Perform tidy up tasks.""" + if status != "P": + # Delete the coverage data if available. + rm_path(self.cov_db_test_dir) + + return callback @staticmethod def get_seed() -> int: @@ -668,20 +662,6 @@ def get_timeout_mins(self): """ return self.run_timeout_mins if self.run_timeout_mins is not None else 60 - def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: - """Extract the time the design was simulated for, from the log.""" - super().extract_info_from_log( - job_runtime_secs=job_runtime_secs, - log_text=log_text, - ) - - try: - time, unit = get_simulated_time(log_text, self.sim_cfg.tool) - self.simulated_time.set(time, unit) - - except RuntimeError as e: - log.debug(f"{self.full_name}: {e}") - class CovUnr(Deploy): """Abstraction for coverage UNR flow.""" @@ -821,21 +801,28 @@ def _set_attrs(self) -> None: self.cov_results = "" self.cov_results_dict = {} - def post_finish(self, status) -> None: - """Extract the coverage results summary for the dashboard. + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" - If the extraction fails, an appropriate exception is raised, which must - be caught by the caller to mark the job as a failure. - """ - if self.dry_run or status != "P": - return + def callback(status: str) -> None: + """Extract the coverage results summary for the dashboard. + + If the extraction fails, an appropriate exception is raised, which must + be caught by the caller to mark the job as a failure. + """ + if self.dry_run or status != "P": + return - results, self.cov_total = get_cov_summary_table(self.cov_report_txt, self.sim_cfg.tool) + results, self.cov_total = get_cov_summary_table(self.cov_report_txt, self.sim_cfg.tool) + + colalign = ("center",) * len(results[0]) + self.cov_results = tabulate( + results, headers="firstrow", tablefmt="pipe", colalign=colalign + ) + for tup in zip(*results, strict=False): + self.cov_results_dict[tup[0]] = tup[1] - colalign = ("center",) * len(results[0]) - self.cov_results = tabulate(results, headers="firstrow", tablefmt="pipe", colalign=colalign) - for tup in zip(*results, strict=False): - self.cov_results_dict[tup[0]] = tup[1] + return callback class CovAnalyze(Deploy): diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index b852442..69850bc 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -15,11 +15,13 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.time import JobTime from dvsim.logging import log +from dvsim.sim_utils import get_job_runtime, get_simulated_time from dvsim.utils import clean_odirs, mk_symlink, rm_path if TYPE_CHECKING: - from dvsim.job.deploy import Deploy, WorkspaceConfig + from dvsim.job.data import JobSpec, WorkspaceConfig class LauncherError(Exception): @@ -90,14 +92,14 @@ class Launcher(ABC): context=[], ) - def __init__(self, deploy: "Deploy") -> None: + def __init__(self, job_spec: "JobSpec") -> None: """Initialise launcher. Args: - deploy: deployment object that will be launched. + job_spec: job specification for the job to be launched. """ - workspace_cfg = deploy.workspace_cfg + workspace_cfg = job_spec.workspace_cfg # One-time preparation of the workspace. if not Launcher.workspace_prepared: @@ -110,8 +112,7 @@ def __init__(self, deploy: "Deploy") -> None: self.prepare_workspace_for_cfg(workspace_cfg) Launcher.workspace_prepared_for_cfg.add(project) - # Store the deploy object handle. - self.deploy = deploy + self.job_spec = job_spec # Status of the job. This is primarily determined by the # _check_status() method, but eventually updated by the _post_finish() @@ -132,6 +133,16 @@ def __init__(self, deploy: "Deploy") -> None: # The actual job runtime computed by dvsim, in seconds. self.job_runtime_secs = 0 + # Job's wall clock time (a.k.a CPU time, or runtime). + # Field copied over from Deploy + # TODO: factor this out + self.job_runtime = JobTime() + self.simulated_time = JobTime() + + def __str__(self) -> str: + """Get a string representation.""" + return self.job_spec.full_name + ":launcher" + @staticmethod def set_pyvenv(project: str) -> None: """Activate a python virtualenv if available. @@ -191,30 +202,26 @@ def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """ - def __str__(self) -> str: - """Get a string representation.""" - return self.deploy.full_name + ":launcher" - def _make_odir(self) -> None: """Create the output directory.""" # If renew_odir flag is True - then move it. if self.renew_odir: - clean_odirs(odir=self.deploy.odir, max_odirs=self.max_odirs) + clean_odirs(odir=self.job_spec.odir, max_odirs=self.max_odirs) - Path(self.deploy.odir).mkdir(exist_ok=True, parents=True) + Path(self.job_spec.odir).mkdir(exist_ok=True, parents=True) - def _link_odir(self, status) -> None: + def _link_odir(self, status: str) -> None: """Soft-links the job's directory based on job's status. The dispatched, passed and failed directories in the scratch area provide a quick way to get to the job that was executed. """ - dest = Path(self.deploy.sim_cfg.links[status], self.deploy.qual_name) - mk_symlink(path=self.deploy.odir, link=dest) + dest = Path(self.job_spec.links[status], self.job_spec.qual_name) + mk_symlink(path=self.job_spec.odir, link=dest) # Delete the symlink from dispatched directory if it exists. if status != "D": - old = Path(self.deploy.sim_cfg.links["D"], self.deploy.qual_name) + old = Path(self.job_spec.links["D"], self.job_spec.qual_name) rm_path(old) def _dump_env_vars(self, exports: Mapping[str, str]) -> None: @@ -223,8 +230,7 @@ def _dump_env_vars(self, exports: Mapping[str, str]) -> None: Each extended class computes the list of exports and invokes this method right before launching the job. """ - with open( - self.deploy.odir + "/env_vars", + with (self.job_spec.odir / "env_vars").open( "w", encoding="UTF-8", errors="surrogateescape", @@ -238,7 +244,7 @@ def _pre_launch(self) -> None: old runs, creating the output directory, dumping all env variables etc. This method is already invoked by launch() as the first step. """ - self.deploy.pre_launch(self) + self.job_spec.pre_launch(self) self._make_odir() self.start_time = datetime.datetime.now() @@ -300,20 +306,19 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: return pattern return None - if self.deploy.dry_run: + if self.job_spec.dry_run: return "P", None # Only one fail pattern needs to be seen. - chk_failed = bool(self.deploy.fail_patterns) + chk_failed = bool(self.job_spec.fail_patterns) # All pass patterns need to be seen, so we replicate the list and remove # patterns as we encounter them. - pass_patterns = self.deploy.pass_patterns.copy() + pass_patterns = list(self.job_spec.pass_patterns).copy() chk_passed = bool(pass_patterns) and (self.exit_code == 0) try: - with open( - self.deploy.get_log_path(), + with self.job_spec.log_path.open( encoding="UTF-8", errors="surrogateescape", ) as f: @@ -321,21 +326,46 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: except OSError as e: return "F", ErrorMessage( line_number=None, - message=f"Error opening file {self.deploy.get_log_path()}:\n{e}", + message=f"Error opening file {self.job_spec.log_path}:\n{e}", context=[], ) # Since the log file is already opened and read to assess the job's # status, use this opportunity to also extract other pieces of # information. - self.deploy.extract_info_from_log( - job_runtime_secs=self.job_runtime_secs, - log_text=lines, - ) + + # Extracts the job's runtime (i.e. the wall clock time) + # as reported by the tool. The tool reported runtime is the most accurate + # since it is devoid of the delays incurred due to infrastructure and + # setup overhead. + + try: + time, unit = get_job_runtime( + log_text=lines, + tool=self.job_spec.tool, + ) + self.job_runtime.set(time, unit) + + except RuntimeError as e: + log.warning( + f"{self.job_spec.full_name}: {e} Using dvsim-maintained job_runtime instead." + ) + self.job_runtime.set(self.job_runtime_secs, "s") + + if self.job_spec.job_type == "RunTest": + try: + time, unit = get_simulated_time( + log_text=lines, + tool=self.job_spec.tool, + ) + self.simulated_time.set(time, unit) + + except RuntimeError as e: + log.debug(f"{self.job_spec.full_name}: {e}") if chk_failed or chk_passed: for cnt, line in enumerate(lines): - if chk_failed and _find_patterns(self.deploy.fail_patterns, line): + if chk_failed and _find_patterns(self.job_spec.fail_patterns, line): # If failed, then nothing else to do. Just return. # Provide some extra lines for context. end = cnt + 5 @@ -385,7 +415,7 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: try: # Run the target-specific cleanup tasks regardless of the job's # outcome. - self.deploy.post_finish(status) + self.job_spec.post_finish(status) except Exception as e: # If the job had already failed, then don't do anything. If it's diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 82e1da9..066b31d 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -4,37 +4,39 @@ """Fake Launcher that returns random results.""" -from random import choice, random +from random import choice from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: - from dvsim.job.deploy import CovReport, Deploy, RunTest, WorkspaceConfig + from dvsim.job.data import JobSpec, WorkspaceConfig __all__ = ("FakeLauncher",) -def _run_test_handler(deploy: "RunTest") -> str: +def _run_test_handler(job_spec: "JobSpec") -> str: """Handle a RunTest deploy job.""" return choice(("P", "F")) -def _cov_report_handler(deploy: "CovReport") -> str: +def _cov_report_handler(job_spec: "JobSpec") -> str: """Handle a CompileSim deploy job.""" - keys = [ - "score", - "line", - "cond", - "toggle", - "fsm", - "branch", - "assert", - "group", - ] - - deploy.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} + # TODO: this hack doesn't work any more and needs implementing by writing + # a file that can be parsed as if it's been generated by the tool. + # + # keys = [ + # "score", + # "line", + # "cond", + # "toggle", + # "fsm", + # "branch", + # "assert", + # "group", + # ] + # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} return "P" @@ -51,18 +53,14 @@ class FakeLauncher(Launcher): # Poll job's completion status every this many seconds poll_freq = 0 - def __init__(self, deploy: "Deploy") -> None: - """Initialize common class members.""" - super().__init__(deploy) - def _do_launch(self) -> None: """Do the launch.""" def poll(self) -> str | None: """Check status of the running process.""" - deploy_cls = self.deploy.__class__.__name__ + deploy_cls = self.job_spec.job_type if deploy_cls in _DEPLOY_HANDLER: - return _DEPLOY_HANDLER[deploy_cls](deploy=self.deploy) + return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) # Default result is Pass return "P" diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index bc703f4..d0f2c14 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -8,13 +8,13 @@ import os import shlex import subprocess -from pathlib import Path from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: - from dvsim.job.deploy import Deploy, WorkspaceConfig + from dvsim.job.data import JobSpec + from dvsim.job.deploy import WorkspaceConfig class LocalLauncher(Launcher): @@ -23,9 +23,9 @@ class LocalLauncher(Launcher): # Poll job's completion status every this many seconds poll_freq = 0.025 - def __init__(self, deploy: "Deploy") -> None: + def __init__(self, job_spec: "JobSpec") -> None: """Initialize common class members.""" - super().__init__(deploy) + super().__init__(job_spec) # Popen object when launching the job. self._process = None @@ -35,7 +35,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -47,9 +47,9 @@ def _do_launch(self) -> None: self._dump_env_vars(exports) - if not self.deploy.sim_cfg.interactive: - log_path = Path(self.deploy.get_log_path()) - timeout_mins = self.deploy.get_timeout_mins() + if not self.job_spec.interactive: + log_path = self.job_spec.log_path + timeout_mins = self.job_spec.timeout_mins self.timeout_secs = timeout_mins * 60 if timeout_mins else None @@ -59,11 +59,11 @@ def _do_launch(self) -> None: encoding="UTF-8", errors="surrogateescape", ) - self._log_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + self._log_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") self._log_file.flush() self._process = subprocess.Popen( - shlex.split(self.deploy.cmd), + shlex.split(self.job_spec.cmd), bufsize=4096, universal_newlines=True, stdout=self._log_file, @@ -88,7 +88,7 @@ def _do_launch(self) -> None: # Interactive. stdin / stdout are transparent # no timeout and blocking op as user controls the flow self._process = subprocess.Popen( - shlex.split(self.deploy.cmd), + shlex.split(self.job_spec.cmd), stdin=None, stdout=None, stderr=subprocess.STDOUT, @@ -121,10 +121,10 @@ def poll(self) -> str | None: if ( self.timeout_secs and (self.job_runtime_secs > self.timeout_secs) - and not (self.deploy.gui) + and not (self.job_spec.gui) ): self._kill() - timeout_mins = self.deploy.get_timeout_mins() + timeout_mins = self.job_spec.timeout_mins timeout_message = f"Job timed out after {timeout_mins} minutes" self._post_finish( "K", diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index e8d5277..c7715e0 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -92,7 +92,7 @@ def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: # Create the job dir. LsfLauncher.jobs_dir[cfg.project] = cfg.scratch_path / "lsf" / cfg.timestamp clean_odirs(odir=LsfLauncher.jobs_dir[cfg.project], max_odirs=2) - os.makedirs(Path(LsfLauncher.jobs_dir[cfg.project]), exist_ok=True) + Path(LsfLauncher.jobs_dir[cfg.project]).mkdir(exist_ok=True, parents=True) @staticmethod def make_job_script(cfg: "WorkspaceConfig", job_name: str): @@ -124,7 +124,7 @@ def make_job_script(cfg: "WorkspaceConfig", job_name: str): lines += ["case $1 in\n"] for job in LsfLauncher.jobs[cfg][job_name]: # Redirect the job's stdout and stderr to its log file. - cmd = f"{job.deploy.cmd} > {job.deploy.get_log_path()} 2>&1" + cmd = f"{job.deploy.cmd} > {job.deploy.log_path} 2>&1" lines += [f" {job.index})\n", f" {cmd};;\n"] # Throw error as a sanity check if the job index is invalid. @@ -134,7 +134,7 @@ def make_job_script(cfg: "WorkspaceConfig", job_name: str): job_script = LsfLauncher.jobs_dir[cfg] / job_name try: - with open(job_script, "w", encoding="utf-8") as f: + with Path(job_script).open("w", encoding="utf-8") as f: f.writelines(lines) except OSError as e: @@ -182,8 +182,8 @@ def __init__(self, deploy: "Deploy") -> None: def _do_launch(self) -> None: """Launch the job.""" # Add self to the list of jobs. - job_name = self.deploy.job_name - cfg = self.deploy.workspace_cfg + job_name = self.job_spec.job_name + cfg = self.job_spec.workspace_cfg job_total = len(LsfLauncher.jobs[cfg.project][job_name]) # The actual launching of the bsub command cannot happen until the @@ -196,7 +196,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -214,10 +214,10 @@ def _do_launch(self) -> None: job_array += "%100" # TODO: This needs to be moved to a HJson. - if self.deploy.sim_cfg.tool == "vcs": + if self.job_spec.tool == "vcs": job_rusage = "'rusage[vcssim=1,vcssim_dynamic=1:duration=1]'" - elif self.deploy.sim_cfg.tool == "xcelium": + elif self.job_spec.tool == "xcelium": job_rusage = "'rusage[xcelium=1,xcelium_dynamic=1:duration=1]'" else: @@ -236,8 +236,8 @@ def _do_launch(self) -> None: "-eo", f"{job_script}.%I.out", ] - if self.deploy.get_timeout_mins(): - cmd += ["-c", self.deploy.get_timeout_mins()] + if self.job_spec.timeout_mins: + cmd += ["-c", self.job_spec.timeout_mins] if job_rusage: cmd += ["-R", job_rusage] @@ -303,7 +303,7 @@ def poll(self) -> str | None: # If we got to this point, we can now open the job script output # file for reading. try: - self.bsub_out_fd = open(self.bsub_out) + self.bsub_out_fd = Path(self.bsub_out).open() except OSError as e: self._post_finish( @@ -435,7 +435,7 @@ def kill(self) -> None: except subprocess.CalledProcessError as e: log.exception("Failed to kill job: {}".format(e.stderr.decode("utf-8").strip())) else: - log.error("Job ID for %s not found", self.deploy.full_name) + log.error("Job ID for %s not found", self.job_spec.full_name) self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index ba7cc86..4b41bec 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -53,10 +53,10 @@ def create_run_sh(self, full_path, cmd) -> None: pathlib.Path(run_file).chmod(0o755) def get_submit_cmd(self): - exetool = self.deploy.sim_cfg.tool - job_name = self.deploy.full_name - cmd = self.deploy.cmd - odir = self.deploy.odir + exetool = self.job_spec.tool + job_name = self.job_spec.full_name + cmd = self.job_spec.cmd + odir = self.job_spec.odir # TODO: These tool-specific names need moving into an hjson config # file. @@ -96,7 +96,7 @@ def _do_launch(self) -> None: # Compute the environment for the subprocess by overriding environment # variables of this process with matching ones from self.deploy.exports exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -111,16 +111,16 @@ def _do_launch(self) -> None: # For reruns, delete the log file of the past run to avoid any race # condition between the log file getting updated for the new run # versus the logic that distinguishes the job wait versus run times. - rm_path(self.deploy.get_log_path()) + rm_path(self.job_spec.log_path) # using os.open instead of fopen as this allows # sharing of file descriptors across processes - fd = os.open(self.deploy.get_log_path(), os.O_WRONLY | os.O_CREAT) + fd = os.open(self.job_spec.log_path, os.O_WRONLY | os.O_CREAT) fobj = os.fdopen(fd, "w", encoding="UTF-8") os.set_inheritable(fd, True) - message = f"[Executing]:\n{self.deploy.cmd}\n\n" + message = f"[Executing]:\n{self.job_spec.cmd}\n\n" fobj.write(message) fobj.flush() - if self.deploy.sim_cfg.interactive: + if self.job_spec.interactive: # Interactive: Set RUN_INTERACTIVE to 1 exports["RUN_INTERACTIVE"] = "1" # Line buffering (buf_size = 1) chosen to enable @@ -147,9 +147,9 @@ def _do_launch(self) -> None: stdout=std_out, stderr=std_err, env=exports, - cwd=self.deploy.odir, + cwd=self.job_spec.odir, ) - if self.deploy.sim_cfg.interactive: + if self.job_spec.interactive: for line in self.process.stdout: fobj.write(line) sys.stdout.write(line) @@ -157,7 +157,7 @@ def _do_launch(self) -> None: # the subprocess closes the stdout but still keeps running self.process.wait() except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) finally: self._close_process() @@ -179,8 +179,8 @@ def poll(self): """ assert self.process is not None if self.process.poll() is None: - run_timeout_mins = self.deploy.get_timeout_mins() - if run_timeout_mins is not None and not self.deploy.gui: + run_timeout_mins = self.job_spec.timeout_mins + if run_timeout_mins is not None and not self.job_spec.gui: wait_timeout_mins = 180 # max wait time in job / license queue # We consider the job to have started once its log file contains # something. file_size_thresh_bytes is a threshold: once the log @@ -188,7 +188,7 @@ def poll(self): file_size_thresh_bytes = 5120 # log file size threshold # query the log file size - f_size = os.path.getsize(self.deploy.get_log_path()) # noqa: PTH202 + f_size = os.path.getsize(self.job_spec.log_path) # noqa: PTH202 if f_size >= file_size_thresh_bytes: # noqa: SIM102 if self.nc_job_state == "waiting": @@ -238,9 +238,9 @@ def _kill(self) -> None: and SIGKILL. """ try: - log.verbose(f"[Stopping] : {self.deploy.full_name}") + log.verbose(f"[Stopping] : {self.job_spec.full_name}") subprocess.run( - ["nc", "stop", "-set", self.deploy.full_name, "-sig", "TERM,KILL"], + ["nc", "stop", "-set", self.job_spec.full_name, "-sig", "TERM,KILL"], check=True, capture_output=True, ) diff --git a/src/dvsim/launcher/sge/engine.py b/src/dvsim/launcher/sge/engine.py index d79e565..1858430 100644 --- a/src/dvsim/launcher/sge/engine.py +++ b/src/dvsim/launcher/sge/engine.py @@ -11,6 +11,8 @@ import re import subprocess import time +from collections.abc import Generator, Sequence +from pathlib import Path from dvsim.logging import log @@ -18,7 +20,13 @@ class _JobData: """Internal helper class to manage job data from qstat.""" - def __init__(self, qstat_job_line) -> None: + def __init__(self, qstat_job_line: str) -> None: + """Initialise a JobData object for qstat job status parsing. + + Args: + qstat_job_line: string from qstat call + + """ # The format of the line goes like # job-ID prior name user state submit/start at queue slots ja-task-ID @@ -88,32 +96,33 @@ def __repr__(self) -> str: class JobList: """Internal helper class to manage job lists.""" - def __init__(self, qstat_output=None) -> None: + def __init__(self, qstat_output: str) -> None: + """Initialise JobList.""" self._joblist = [] for line in qstat_output.split("\n")[2:-1]: self._joblist.append(_JobData(line)) - def __iter__(self): + def __iter__(self) -> Generator[_JobData]: + """Get an Iterator over the job list.""" yield from self._joblist def __repr__(self) -> str: + """Get string representation of the job list.""" return "\n".join([str(job) for job in self._joblist]) class SGE: """External system call handler for Sun Grid Engine environment.""" - def __init__( - self, - q=None, - path="", - ) -> None: + def __init__(self, q=None, path: str | Path = "") -> None: + """Initialise Sun Grid Engine proxy.""" + path = Path(path) if q is None: # No queue specified. By default, submit to all available queues. - self.cmd_qconf = os.path.join(path, "qconf") + self.cmd_qconf = path / "qconf" try: - qliststr = _exec(self.cmd_qconf + " -sql") + qliststr = _exec(f"{self.cmd_qconf} -sql") except OSError: error_msg = "Error querying queue configuration" log.exception(error_msg) @@ -122,31 +131,42 @@ def __init__( self.q = qliststr.replace("\n", ",")[:-1] log.info( - """Sun Grid Engine handler initialized -Queues detected: %s""", + "Sun Grid Engine handler initialized\nQueues detected: %s", self.q, ) else: self.q = q - self.cmd_qsub = os.path.join(path, "qsub") - self.cmd_qstat = os.path.join(path, "qstat") + self.cmd_qsub = path / "qsub" + self.cmd_qstat = path / "qstat" - def wait(self, jobid, interval=10, name=None, pbar=None, pbar_mode=None) -> None: - """Waits for job running on SGE Grid Engine environment to finish. + def wait( + self, + jobid: int, + interval: int = 10, + name: str | None = None, + pbar: None = None, + ) -> None: + """Wait for job running on SGE Grid Engine environment to finish. If you are just waiting for one job, this becomes a dumb substitute for the -sync y option which can be specified to qsub. - Inputs: + Args: + jobid: job identifier + interval: polling interval of SGE queue, in seconds. (Default: 10) + name: name of the job + pbar: progress bar (not implemented?) - jobid - interval - Polling interval of SGE queue, in seconds. (Default: 10) """ dowait = True while dowait: - p = subprocess.Popen(self.cmd_qstat, shell=True, stdout=subprocess.PIPE) + p = subprocess.Popen( # noqa: S602 + self.cmd_qstat, + shell=True, + stdout=subprocess.PIPE, + ) pout, _ = p.communicate() if pbar is not None: @@ -192,9 +212,12 @@ def submit( nproc=1, wait=True, lammpi=True, - ): - """Submits a job to SGE - Returns jobid as a number. + ) -> int: + """Submit a job to SGE. + + Returns: + jobid as a number. + """ log.info( "Submitting job: " @@ -212,20 +235,28 @@ def submit( if name is not None: qsuboptslist.append("-N " + name) + if stdin is not None: qsuboptslist.append("-i " + stdin) + if stdout is not None: qsuboptslist.append("-o " + stdout) + if stderr is not None: qsuboptslist.append("-e " + stderr) + if joinstdouterr: qsuboptslist.append("-j") + if wait: qsuboptslist.append("-sync y") + if usecwd: qsuboptslist.append("-cwd") + if useenvironment: qsuboptslist.append("-V") + if array is not False: try: n = int(array[0]) @@ -237,6 +268,7 @@ def submit( error_msg = "array[0] being an out of bounds access." log.exception(error_msg) raise ValueError(error_msg) + try: m = int(array[1]) except ValueError: @@ -247,6 +279,7 @@ def submit( m = None msg = "array[1] being an out of bounds access." raise IndexError(msg) + try: s = int(array[2]) except IndexError: @@ -257,22 +290,28 @@ def submit( s = None msg = "Could not convert data to an integer." raise ValueError(msg) + if m == s is None: - qsuboptslist.append("-t %d" % n) + qsuboptslist.append(f"-t {n}") elif s is None: - qsuboptslist.append("-t %d-%d" % (n, m)) + qsuboptslist.append(f"-t {n}-{m}") else: - qsuboptslist.append("-t %d-%d:%d" % (n, m, s)) + qsuboptslist.append(f"-t {n}-{m}:{s}") qsubopts = " ".join(qsuboptslist) - pout = _exec(self.cmd_qsub, stdin=qsubopts + "\n" + job, print_command=False) + pout = _exec( + command=str(self.cmd_qsub), + stdin=qsubopts + "\n" + job, + print_command=False, + ) try: # Next to last line should be # "Your job 1389 (name) has been submitted" # parse for job id return int(pout.split("\n")[-2].split()[2]) + # except (ValueErrorValueError, IndexError, AttributeError) (e): except (ValueError, IndexError, AttributeError): error_msg = f"""Error submitting SGE job: @@ -284,21 +323,39 @@ def submit( log.exception(error_msg) raise OSError(error_msg) - def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]): - """Returns a list of SGE jobids run by a specific user. + def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]) -> Sequence[_JobData]: + """Return a list of SGE jobids run by a specific user. + + Args: + user: SGE user to poll (Default = '', i.e. current user) + qstat: path to qstat binary (Default = 'qstat') - Inputs - user - SGE user to poll (Default = '', i.e. current user) - qstat - path to qstat binary (Default = 'qstat') """ - p = subprocess.Popen(self.cmd_qstat + " -u " + user, shell=True, stdout=subprocess.PIPE) + p = subprocess.Popen( # noqa: S602 + f"{self.cmd_qstat} -u {user}", + shell=True, + stdout=subprocess.PIPE, + ) qstat_output, _ = p.communicate() joblist = JobList(qstat_output) return [job for job in joblist if job.user == user] - def run_job(self, command, name="default", logfnm="default.log", wait=True): + def run_job( + self, + command: str, + name: str = "default", + log_filename: str = "default.log", + *, + wait: bool = True, + ) -> int: """Run job on SGE with piped logging.""" - return self.submit(command, name=name, stdout=logfnm, stderr=logfnm, wait=wait) + return self.submit( + command, + name=name, + stdout=log_filename, + stderr=log_filename, + wait=wait, + ) def get_queue_instance_status(self): """Get loads for each queue instance.""" @@ -319,11 +376,17 @@ def get_queue_instance_status(self): return data -def _exec(command, print_to_screen=False, logfnm=None, stdin="", print_command=False): - """Runs command line using subprocess, optionally returning stdout.""" +def _exec( + command: str, + log_path: str | None = None, + stdin: str = "", + *, + print_command: bool = False, +) -> bytes: + """Run command line using subprocess, optionally returning stdout.""" - def _call_cmd(command, stdin=""): - p = subprocess.Popen( + def _call_cmd(command: str, stdin: str = "") -> bytes: + p = subprocess.Popen( # noqa: S602 command, shell=True, stdin=subprocess.PIPE, @@ -334,26 +397,26 @@ def _call_cmd(command, stdin=""): return output if print_command: - log.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s", command, logfnm) + log.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s", command, log_path) output = "" - if logfnm is not None: + if log_path is not None: try: - with open(logfnm, "a") as f: + with Path(log_path).open("a") as f: if print_command: pass + output = _call_cmd(command, stdin) f.write(output) + except OSError: - error_msg = "Error: File: " + str(logfnm) + " does not appear to exist." + error_msg = "Error: File: " + str(log_path) + " does not appear to exist." log.exception(error_msg) raise OSError(error_msg) + else: output = _call_cmd(command, stdin) log.info("Output of command is:\n%s", output) - if print_to_screen: - pass - return output diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 3989d17..2eab022 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -5,9 +5,9 @@ """SgeLauncher Class.""" import os -import pathlib import shlex import subprocess +from pathlib import Path from subprocess import PIPE, Popen from typing import TYPE_CHECKING @@ -37,7 +37,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -50,15 +50,14 @@ def _do_launch(self) -> None: self._dump_env_vars(exports) try: - f = pathlib.Path(self.deploy.get_log_path()).open( - "w", encoding="UTF-8", errors="surrogateescape" - ) - f.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + f = self.job_spec.log_path.open("w", encoding="UTF-8", errors="surrogateescape") + f.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") f.flush() + # ---------- prepare SGE job struct ----- sge_job = SGE.QSubOptions() # noqa: F405 sge_job.args.N = "VCS_RUN_" + str(pid) # Name of Grid Engine job - if "build.log" in self.deploy.get_log_path(): + if "build.log" in self.job_spec.log_path: sge_job.args.N = "VCS_BUILD_" + str(pid) # Name of Grid Engine job job_name = sge_job.args.N @@ -68,14 +67,15 @@ def _do_launch(self) -> None: sge_job.args.q = "vcs_q" # Define the sge queue name sge_job.args.p = "0" # Set priority to 0 sge_job.args.ll = "mf=20G" # memory req,request the given resources + # pecifies a range of priorities from -1023 to 1024. # The higher the number, the higher the priority. # The default priority for jobs is zero - sge_job.args.command = '"' + self.deploy.cmd + '"' + sge_job.args.command = '"' + self.job_spec.cmd + '"' sge_job.args.b = "y" # This is a binary file - sge_job.args.o = self.deploy.get_log_path() + ".sge" + sge_job.args.o = f"{self.job_spec.log_path}.sge" cmd = str(sge_job.execute(mode="echo")) - # --------------- + self.process = subprocess.Popen( shlex.split(cmd), bufsize=4096, @@ -85,16 +85,18 @@ def _do_launch(self) -> None: env=exports, ) f.close() + except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) + finally: self._close_process() self._link_odir("D") f.close() - def poll(self): + def poll(self) -> str: """Check status of the running process. This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', @@ -107,17 +109,16 @@ def poll(self): if self.process.poll() is None: return "D" # ------------------------------------- - # copy SGE jobb results to log file - if pathlib.Path(self.deploy.get_log_path() + ".sge").exists(): - file1 = pathlib.Path(self.deploy.get_log_path() + ".sge").open(errors="replace") + # copy SGE job results to log file + sge_log_path = Path(f"{self.job_spec.log_path}.sge") + if sge_log_path.exists(): + file1 = sge_log_path.open(errors="replace") lines = file1.readlines() file1.close() - f = pathlib.Path(self.deploy.get_log_path()).open( - "a", encoding="UTF-8", errors="surrogateescape" - ) + f = self.job_spec.log_path.open("a", encoding="UTF-8", errors="surrogateescape") f.writelines(lines) f.flush() - pathlib.Path(self.deploy.get_log_path() + ".sge").unlink() + sge_log_path.unlink() f.close() # ------------------------------------- @@ -146,13 +147,13 @@ def kill(self) -> None: # ---------------------------- # qdel -f kill sge job_name cmd = "qstatus -a | grep " + job_name - with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: + with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: # noqa: S602 output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") if output != "": output_l = output.split() cmd = "qdel " + output_l[0] - with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: + with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: # noqa: S602 output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") # ---------------------------- diff --git a/src/dvsim/launcher/sge/qsubopts.py b/src/dvsim/launcher/sge/qsubopts.py index 4c593f9..0c4d033 100644 --- a/src/dvsim/launcher/sge/qsubopts.py +++ b/src/dvsim/launcher/sge/qsubopts.py @@ -1893,7 +1893,7 @@ def write_qsub_script(self, filename, echo=False) -> None: if echo: pass - f = open(filename, "w") + f = pathlib.Path(filename).open("w") f.write("\n".join(buf)) f.close() @@ -1937,7 +1937,7 @@ def execute(self, mode="local", path=""): cwd = pathlib.Path.cwd() command_file = cwd + "/command_file_" + str(os.getpid()) + "_" + test_id try: - with open(command_file, "w") as f_command: + with pathlib.Path(command_file).open("w") as f_command: command_temp = str(self.args.command) command_temp = command_temp.replace('"', "") f_command.write(command_temp + "\n/bin/rm -f " + command_file) @@ -1957,7 +1957,7 @@ def execute(self, mode="local", path=""): if mode == "local": import subprocess - subprocess.Popen( + subprocess.Popen( # noqa: S602 command_file, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index d246d31..deab4bc 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -33,12 +33,12 @@ def __init__(self, deploy) -> None: # Popen object when launching the job. self.process = None - self.slurm_log_file = self.deploy.get_log_path() + ".slurm" + self.slurm_log_file = f"{self.job_spec.log_path}.slurm" def _do_launch(self) -> None: # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -59,12 +59,12 @@ def _do_launch(self) -> None: slurm_cmd = ( f"srun -p {SLURM_QUEUE} --mem={SLURM_MEM} --mincpus={SLURM_MINCPUS} " f"--time={SLURM_TIMEOUT} --cpus-per-task={SLURM_CPUS_PER_TASK} " - f'bash -c "{slurm_setup_cmd} {self.deploy.cmd}"' + f'bash -c "{slurm_setup_cmd} {self.job_spec.cmd}"' ) try: with pathlib.Path(self.slurm_log_file).open("w") as out_file: - out_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + out_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") out_file.flush() log.info(f"Executing slurm command: {slurm_cmd}") @@ -80,7 +80,7 @@ def _do_launch(self) -> None: msg = f"File Error: {e}\nError while handling {self.slurm_log_file}" raise LauncherError(msg) except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) finally: self._close_process() @@ -105,10 +105,10 @@ def poll(self): try: with pathlib.Path(self.slurm_log_file).open() as slurm_file: try: - with pathlib.Path(self.deploy.get_log_path()).open("a") as out_file: + with self.job_spec.log_path.open("a") as out_file: shutil.copyfileobj(slurm_file, out_file) except OSError as e: - msg = f"File Error: {e} when handling {self.deploy.get_log_path()}" + msg = f"File Error: {e} when handling {self.job_spec.log_path}" raise LauncherError( msg, ) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index de5f904..28306b6 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -17,10 +17,8 @@ from types import FrameType from typing import TYPE_CHECKING, Any -from pydantic import BaseModel, ConfigDict - -from dvsim.job.deploy import Deploy -from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError +from dvsim.job.data import CompletedJobStatus, JobSpec +from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer from dvsim.utils.timer import Timer @@ -29,17 +27,8 @@ from dvsim.flow.base import FlowCfg -class CompletedJobStatus(BaseModel): - """Job status.""" - - model_config = ConfigDict(frozen=True, extra="forbid") - - status: str - fail_msg: ErrorMessage - - def total_sub_items( - d: Mapping[str, Sequence[Deploy]] | Mapping["FlowCfg", Sequence[Deploy]], + d: Mapping[str, Sequence[JobSpec]] | Mapping["FlowCfg", Sequence[JobSpec]], ) -> int: """Return the total number of sub items in a mapping. @@ -82,11 +71,11 @@ def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: class Scheduler: - """An object that runs one or more Deploy items.""" + """An object that runs one or more jobs from JobSpec items.""" def __init__( self, - items: Sequence[Deploy], + items: Sequence[JobSpec], launcher_cls: type[Launcher], *, interactive: bool, @@ -99,25 +88,25 @@ def __init__( interactive: launch the tools in interactive mode. """ - self.items: Sequence[Deploy] = items + self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} - # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen + # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen # target and cfg. As items in _scheduled are ready to be run (once # their dependencies pass), they are moved to the _queued list, where # they wait until slots are available for them to be dispatched. # When all items (in all cfgs) of a target are done, it is removed from # this dictionary. - self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[Deploy]]] = {} - self.add_to_scheduled(items) + self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[str]]] = {} + self.add_to_scheduled(jobs=self._jobs) # Print status periodically using an external status printer. - self.status_printer = get_status_printer(interactive) - self.status_printer.print_header( + self._status_printer = get_status_printer(interactive) + self._status_printer.print_header( msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, T: total", ) # Sets of items, split up by their current state. The sets are - # disjoint and their union equals the keys of self.item_to_status. + # disjoint and their union equals the keys of self.item_status. # _queued is a list so that we dispatch things in order (relevant # for things like tests where we have ordered things cleverly to # try to see failures early). They are maintained for each target. @@ -128,49 +117,55 @@ def __init__( # the entire regression. We keep rotating through our list of running # items, picking up where we left off on the last poll. self._targets: Sequence[str] = list(self._scheduled.keys()) - self._queued: MutableMapping[str, MutableSequence[Deploy]] = {} - self._running: MutableMapping[str, MutableSequence[Deploy]] = {} - self._passed: MutableMapping[str, MutableSet[Deploy]] = {} - self._failed: MutableMapping[str, MutableSet[Deploy]] = {} - self._killed: MutableMapping[str, MutableSet[Deploy]] = {} - self._total = {} - self.last_target_polled_idx = -1 - self.last_item_polled_idx = {} + self._total: MutableMapping[str, int] = {} + + self._queued: MutableMapping[str, MutableSequence[str]] = {} + self._running: MutableMapping[str, MutableSequence[str]] = {} + + self._passed: MutableMapping[str, MutableSet[str]] = {} + self._failed: MutableMapping[str, MutableSet[str]] = {} + self._killed: MutableMapping[str, MutableSet[str]] = {} + + self._last_target_polled_idx = -1 + self._last_item_polled_idx = {} + for target in self._scheduled: self._queued[target] = [] self._running[target] = [] + self._passed[target] = set() self._failed[target] = set() self._killed[target] = set() + self._total[target] = total_sub_items(self._scheduled[target]) - self.last_item_polled_idx[target] = -1 + self._last_item_polled_idx[target] = -1 # Stuff for printing the status. width = len(str(self._total[target])) field_fmt = f"{{:0{width}d}}" - self.msg_fmt = ( + self._msg_fmt = ( f"Q: {field_fmt}, D: {field_fmt}, P: {field_fmt}, " f"F: {field_fmt}, K: {field_fmt}, T: {field_fmt}" ) - msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) - self.status_printer.init_target(target=target, msg=msg) + msg = self._msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) + self._status_printer.init_target(target=target, msg=msg) - # A map from the Deployment names tracked by this class to their + # A map from the job names tracked by this class to their # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. - self.item_status: MutableMapping[str, str] = {} + self.job_status: MutableMapping[str, str] = {} # Create the launcher instance for all items. self._launchers: Mapping[str, Launcher] = { - item.full_name: launcher_cls(item) for item in self.items + full_name: launcher_cls(job_spec) for full_name, job_spec in self._jobs.items() } # The chosen launcher class. This allows us to access launcher # variant-specific settings such as max parallel jobs & poll rate. - self.launcher_cls: type[Launcher] = launcher_cls + self._launcher_cls: type[Launcher] = launcher_cls - def run(self) -> Mapping[str, CompletedJobStatus]: + def run(self) -> Sequence[CompletedJobStatus]: """Run all scheduled jobs and return the results. Returns the results (status) of all items dispatched for all @@ -226,50 +221,68 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: # polling loop. But we do it with a bounded wait on stop_now so # that we jump back to the polling loop immediately on a # signal. - stop_now.wait(timeout=self.launcher_cls.poll_freq) + stop_now.wait(timeout=self._launcher_cls.poll_freq) finally: signal(SIGINT, old_handler) # Cleanup the status printer. - self.status_printer.exit() + self._status_printer.exit() # We got to the end without anything exploding. Return the results. - return { - name: CompletedJobStatus( - status=status, - fail_msg=self._launchers[name].fail_msg, + results = [] + for name, status in self.job_status.items(): + launcher = self._launchers[name] + job_spec = self._jobs[name] + + results.append( + CompletedJobStatus( + job_type=job_spec.job_type, + name=job_spec.name, + seed=job_spec.seed, + full_name=name, + qual_name=job_spec.qual_name, + target=job_spec.target, + status=status, + fail_msg=launcher.fail_msg, + job_runtime=launcher.job_runtime.with_unit("s").get()[0], + simulated_time=launcher.simulated_time.with_unit("us").get()[0], + log_path=job_spec.log_path, + ) ) - for name, status in self.item_status.items() - } - def add_to_scheduled(self, items: Sequence[Deploy]) -> None: - """Add items to the schedule. + return results + + def add_to_scheduled(self, jobs: Mapping[str, JobSpec]) -> None: + """Add jobs to the schedule. Args: - items: Deploy objects to add to the schedule. + jobs: the jobs to add to the schedule. """ - for item in items: - target_dict = self._scheduled.setdefault(item.target, {}) - cfg_list = target_dict.setdefault(item.flow, []) - if item not in cfg_list: - cfg_list.append(item) + for full_name, job_spec in jobs.items(): + target_dict = self._scheduled.setdefault(job_spec.target, {}) + cfg_list = target_dict.setdefault(job_spec.flow, []) - def _unschedule_item(self, item: Deploy) -> None: + if job_spec not in cfg_list: + cfg_list.append(full_name) + + def _unschedule_item(self, job_name: str) -> None: """Remove deploy item from the schedule.""" - target_dict = self._scheduled[item.target] - cfg_list = target_dict.get(item.flow) + job = self._jobs[job_name] + target_dict = self._scheduled[job.target] + cfg_list = target_dict.get(job.flow) + if cfg_list is not None: with contextlib.suppress(ValueError): - cfg_list.remove(item) + cfg_list.remove(job_name) # When all items in _scheduled[target][cfg] are finally removed, # the cfg key is deleted. if not cfg_list: - del target_dict[item.flow] + del target_dict[job.flow] - def _enqueue_successors(self, item: Deploy | None = None) -> None: + def _enqueue_successors(self, job_name: str | None = None) -> None: """Move an item's successors from _scheduled to _queued. 'item' is the recently run job that has completed. If None, then we @@ -277,34 +290,32 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None: target. If 'item' is specified, then we find its successors and move them to _queued. """ - for next_item in self._get_successors(item): - if ( - next_item.full_name in self.item_status - or next_item in self._queued[next_item.target] - ): - msg = f"Job {next_item.full_name} already scheduled" + for next_job_name in self._get_successors(job_name): + target = self._jobs[next_job_name].target + if next_job_name in self.job_status or next_job_name in self._queued[target]: + msg = f"Job {next_job_name} already scheduled" raise RuntimeError(msg) - self.item_status[next_item.full_name] = "Q" - self._queued[next_item.target].append(next_item) - self._unschedule_item(next_item) + self.job_status[next_job_name] = "Q" + self._queued[target].append(next_job_name) + self._unschedule_item(next_job_name) - def _cancel_successors(self, item: Deploy) -> None: + def _cancel_successors(self, job_name: str) -> None: """Cancel an item's successors. Recursively move them from _scheduled or _queued to _killed. Args: - item: job whose successors are to be canceled. + job_name: job whose successors are to be canceled. """ - items = list(self._get_successors(item)) + items = list(self._get_successors(job_name)) while items: next_item = items.pop() self._cancel_item(next_item, cancel_successors=False) items.extend(self._get_successors(next_item)) - def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: + def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """Find immediate successors of an item. We choose the target that follows the 'item''s current target and find @@ -313,13 +324,13 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: from the cfg to which the item belongs. Args: - item: is a job that has completed. + job_name: name of the job Returns: - list of item's successors, or an empty list if there are none. + list of the jobs successors, or an empty list if there are none. """ - if item is None: + if job_name is None: target = next(iter(self._scheduled)) if target is None: @@ -328,8 +339,10 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: cfgs = set(self._scheduled[target]) else: - if item.target not in self._scheduled: - msg = f"Scheduler does not contain target {item.target}" + job: JobSpec = self._jobs[job_name] + + if job.target not in self._scheduled: + msg = f"Scheduler does not contain target {job.target}" raise KeyError(msg) target_iterator = iter(self._scheduled) @@ -337,7 +350,7 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: found = False while not found: - if target == item.target: + if target == job.target: found = True try: @@ -349,7 +362,7 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: if target is None: return [] - cfgs = {item.flow} + cfgs = {job.flow} # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this @@ -357,11 +370,12 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: successors = [] for cfg in cfgs: for next_item in self._scheduled[target][cfg]: - if item is not None: + if job_name is not None: + job = self._jobs[next_item] # Something is terribly wrong if item exists but the # next_item's dependency list is empty. - assert next_item.dependencies - if item not in next_item.dependencies: + assert job.dependencies + if job_name not in job.dependencies: continue if self._ok_to_enqueue(next_item): @@ -369,32 +383,32 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: return successors - def _ok_to_enqueue(self, item: Deploy) -> bool: + def _ok_to_enqueue(self, job_name: str) -> bool: """Check if all dependencies jobs are completed. Args: - item: is a deployment job. + job_name: name of job. Returns: true if ALL dependencies of item are complete. """ - for dep in item.dependencies: + for dep in self._jobs[job_name].dependencies: # Ignore dependencies that were not scheduled to run. - if dep not in self.items: + if dep not in self._jobs: continue # Has the dep even been enqueued? - if dep.full_name not in self.item_status: + if dep not in self.job_status: return False # Has the dep completed? - if self.item_status[dep.full_name] not in ["P", "F", "K"]: + if self.job_status[dep] not in ["P", "F", "K"]: return False return True - def _ok_to_run(self, item: Deploy) -> bool: + def _ok_to_run(self, job_name: str) -> bool: """Check if a job is ready to start. The item's needs_all_dependencies_passing setting is used to figure @@ -402,31 +416,32 @@ def _ok_to_run(self, item: Deploy) -> bool: statuses. Args: - item: is a deployment job. + job_name: name of the job to check Returns: true if the required dependencies have passed. """ + job: JobSpec = self._jobs[job_name] # 'item' can run only if its dependencies have passed (their results # should already show up in the item to status map). - for dep in item.dependencies: + for dep_name in job.dependencies: # Ignore dependencies that were not scheduled to run. - if dep not in self.items: + if dep_name not in self._jobs: continue - dep_status = self.item_status[dep.full_name] + dep_status = self.job_status[dep_name] if dep_status not in ["P", "F", "K"]: raise ValueError("Status must be one of P, F, or K") - if item.needs_all_dependencies_passing: + if job.needs_all_dependencies_passing: if dep_status in ["F", "K"]: return False elif dep_status in ["P"]: return True - return item.needs_all_dependencies_passing + return job.needs_all_dependencies_passing def _poll(self, hms: str) -> bool: """Check for running items that have finished. @@ -436,7 +451,7 @@ def _poll(self, hms: str) -> bool: """ max_poll = min( - self.launcher_cls.max_poll, + self._launcher_cls.max_poll, total_sub_items(self._running), ) @@ -448,18 +463,18 @@ def _poll(self, hms: str) -> bool: changed = False while max_poll: - target, self.last_target_polled_idx = get_next_item( + target, self._last_target_polled_idx = get_next_item( self._targets, - self.last_target_polled_idx, + self._last_target_polled_idx, ) while self._running[target] and max_poll: max_poll -= 1 - item, self.last_item_polled_idx[target] = get_next_item( + job_name, self._last_item_polled_idx[target] = get_next_item( self._running[target], - self.last_item_polled_idx[target], + self._last_item_polled_idx[target], ) - status = self._launchers[item.full_name].poll() + status = self._launchers[job_name].poll() level = log.VERBOSE if status not in ["D", "P", "F", "E", "K"]: @@ -470,27 +485,27 @@ def _poll(self, hms: str) -> bool: continue if status == "P": - self._passed[target].add(item) + self._passed[target].add(job_name) elif status == "F": - self._failed[target].add(item) + self._failed[target].add(job_name) level = log.ERROR else: # Killed or Error dispatching - self._killed[target].add(item) + self._killed[target].add(job_name) level = log.ERROR - self._running[target].pop(self.last_item_polled_idx[target]) - self.last_item_polled_idx[target] -= 1 - self.item_status[item.full_name] = status + self._running[target].pop(self._last_item_polled_idx[target]) + self._last_item_polled_idx[target] -= 1 + self.job_status[job_name] = status log.log( level, "[%s]: [%s]: [status] [%s: %s]", hms, target, - item.full_name, + job_name, status, ) @@ -501,14 +516,14 @@ def _poll(self, hms: str) -> bool: # jobs). Hence we enqueue all successors rather than canceling # them right here. We leave it to _dispatch() to figure out # whether an enqueued item can be run or not. - self._enqueue_successors(item) + self._enqueue_successors(job_name) changed = True return changed def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible.""" - slots = self.launcher_cls.max_parallel - total_sub_items(self._running) + slots = self._launcher_cls.max_parallel - total_sub_items(self._running) if slots <= 0: return @@ -516,7 +531,9 @@ def _dispatch(self, hms: str) -> None: # weights. sum_weight = 0 slots_filled = 0 - total_weight = sum(self._queued[t][0].weight for t in self._queued if self._queued[t]) + total_weight = sum( + self._jobs[self._queued[t][0]].weight for t in self._queued if self._queued[t] + ) for target in self._scheduled: if not self._queued[target]: @@ -541,7 +558,7 @@ def _dispatch(self, hms: str) -> None: # solution, except that it prioritizes the slot allocation to # targets that are earlier in the list such that in the end, all # slots are fully consumed. - sum_weight += self._queued[target][0].weight + sum_weight += self._jobs[self._queued[target][0]].weight target_slots = round((slots * sum_weight) / total_weight) - slots_filled if target_slots <= 0: continue @@ -565,32 +582,32 @@ def _dispatch(self, hms: str) -> None: "[%s]: [%s]: [dispatch]:\n%s", hms, target, - ", ".join(item.full_name for item in to_dispatch), + ", ".join(job_name for job_name in to_dispatch), ) - for item in to_dispatch: + for job_name in to_dispatch: try: - self._launchers[item.full_name].launch() + self._launchers[job_name].launch() except LauncherError: - log.exception("Error launching %s", item) - self._kill_item(item) + log.exception("Error launching %s", job_name) + self._kill_item(job_name) except LauncherBusyError: log.exception("Launcher busy") - self._queued[target].append(item) + self._queued[target].append(job_name) log.verbose( "[%s]: [%s]: [reqeued]: %s", hms, target, - item.full_name, + job_name, ) continue - self._running[target].append(item) - self.item_status[item.full_name] = "D" + self._running[target].append(job_name) + self.job_status[job_name] = "D" def _kill(self) -> None: """Kill any running items and cancel any that are waiting.""" @@ -629,9 +646,9 @@ def _check_if_done(self, hms: str) -> bool: perc = done_cnt / self._total[target] * 100 running = ", ".join( - [f"{item.full_name}" for item in self._running[target]], + [f"{job_name}" for job_name in self._running[target]], ) - msg = self.msg_fmt.format( + msg = self._msg_fmt.format( len(self._queued[target]), len(self._running[target]), len(self._passed[target]), @@ -639,7 +656,7 @@ def _check_if_done(self, hms: str) -> bool: len(self._killed[target]), self._total[target], ) - self.status_printer.update_target( + self._status_printer.update_target( target=target, msg=msg, hms=hms, @@ -648,36 +665,38 @@ def _check_if_done(self, hms: str) -> bool: ) return done - def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: + def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None: """Cancel an item and optionally all of its successors. Supplied item may be in _scheduled list or the _queued list. From either, we move it straight to _killed. Args: - item: is a deployment job. + job_name: name of the job to cancel cancel_successors: if set then cancel successors as well (True). """ - self.item_status[item.full_name] = "K" - self._killed[item.target].add(item) - if item in self._queued[item.target]: - self._queued[item.target].remove(item) + target = self._jobs[job_name].target + self.job_status[job_name] = "K" + self._killed[target].add(job_name) + if job_name in self._queued[target]: + self._queued[target].remove(job_name) else: - self._unschedule_item(item) + self._unschedule_item(job_name) if cancel_successors: - self._cancel_successors(item) + self._cancel_successors(job_name) - def _kill_item(self, item: Deploy) -> None: + def _kill_item(self, job_name: str) -> None: """Kill a running item and cancel all of its successors. Args: - item: is a deployment job. + job_name: name of the job to kill """ - self._launchers[item.full_name].kill() - self.item_status[item.full_name] = "K" - self._killed[item.target].add(item) - self._running[item.target].remove(item) - self._cancel_successors(item) + target = self._jobs[job_name].target + self._launchers[job_name].kill() + self.job_status[job_name] = "K" + self._killed[target].add(job_name) + self._running[target].remove(job_name) + self._cancel_successors(job_name) diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index 7b4765f..10ae09e 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -6,9 +6,9 @@ import collections import re -from collections.abc import Mapping +from collections.abc import Sequence -from dvsim.scheduler import CompletedJobStatus +from dvsim.job.data import CompletedJobStatus from dvsim.testplan import Result _REGEX_REMOVE = [ @@ -76,48 +76,47 @@ class SimResults: holding all failing tests with the same signature. """ - def __init__(self, items, results) -> None: + def __init__(self, results: Sequence[CompletedJobStatus]) -> None: self.table = [] self.buckets = collections.defaultdict(list) self._name_to_row = {} - for item in items: - self._add_item(item, results) + for job_status in results: + self._add_item(job_status=job_status) - def _add_item(self, item, results: Mapping[str, CompletedJobStatus]) -> None: + def _add_item(self, job_status: CompletedJobStatus) -> None: """Recursively add a single item to the table of results.""" - job_status = results[item.full_name] if job_status.status in ["F", "K"]: bucket = self._bucketize(job_status.fail_msg.message) self.buckets[bucket].append( ( - item, + job_status, job_status.fail_msg.line_number, job_status.fail_msg.context, ), ) # Runs get added to the table directly - if item.target == "run": - self._add_run(item, job_status.status) + if job_status.target == "run": + self._add_run(job_status) - def _add_run(self, item, status) -> None: + def _add_run(self, job_status: CompletedJobStatus) -> None: """Add an entry to table for item.""" - row = self._name_to_row.get(item.name) + row = self._name_to_row.get(job_status.name) if row is None: row = Result( - item.name, - job_runtime=item.job_runtime, - simulated_time=item.simulated_time, + job_status.name, + job_runtime=job_status.job_runtime, + simulated_time=job_status.simulated_time, ) self.table.append(row) - self._name_to_row[item.name] = row + self._name_to_row[job_status.name] = row # Record the max job_runtime of all reseeds. - elif item.job_runtime > row.job_runtime: - row.job_runtime = item.job_runtime - row.simulated_time = item.simulated_time + elif job_status.job_runtime > row.job_runtime: + row.job_runtime = job_status.job_runtime + row.simulated_time = job_status.simulated_time - if status == "P": + if job_status.status == "P": row.passing += 1 row.total += 1 diff --git a/src/dvsim/testplan.py b/src/dvsim/testplan.py index e47de6f..4b76f6c 100644 --- a/src/dvsim/testplan.py +++ b/src/dvsim/testplan.py @@ -248,7 +248,7 @@ class Testplan: def _parse_hjson(filename): """Parses an input file with HJson and returns a dict.""" try: - return hjson.load(open(filename)) + return hjson.load(Path(filename).open()) except OSError: pass except hjson.scanner.HjsonDecodeError: