From 4aea952b58d81bd369b357f3f84ef3ac5f7079f1 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 30 Oct 2025 09:18:40 +0000 Subject: [PATCH 1/3] refactor: add JobSpec common abstraction The `Deploy` classes are coupled to the scheduler and also the flow objects in a way that is not clear. It's also not clear which of the public attributes are depended on externally and which are internal working out. The intention is to make clear what the interface is that the scheduler depends on and what the flow object depends on as a result of the job runs. Initially I tried to refactor the `Deploy` objects themselves to make the existing objects clear, however this failed due to too many moving parts and the level of coupling. This commit introduces a `JobSpec` pydantic data model that gives a fully typed and runtime validated model containing everything the scheduler needs to run the job. The scheduler no longer receives `Deploy` objects, but instead `JobSpec` objects created from `Deploy` objects. Which means `Deploy` objects become `JobSpec` factories. In addition, the obvious dependencies on `Deploy` objects within the flow objects have been replaced with an expanded `CompletedJobStatus` pydantic model. There may still be dependencies that exist that have been missed, but the main ones have been removed in the results processing path. This opens the way for writing tests for the scheduler, and potentially the launchers, against clear interfaces using data classes as inputs and outputs. There is room for improvement in the data classes, as some attributes can potentially be removed with some refactoring. Signed-off-by: James McCorrie --- ruff-ci.toml | 1 - src/dvsim/cli.py | 4 +- src/dvsim/flow/base.py | 17 +- src/dvsim/flow/formal.py | 2 +- src/dvsim/flow/one_shot.py | 4 +- src/dvsim/flow/sim.py | 66 ++++--- src/dvsim/job/data.py | 154 +++++++++++++++ src/dvsim/job/deploy.py | 200 ++++++++++--------- src/dvsim/launcher/base.py | 92 ++++++--- src/dvsim/launcher/fake.py | 42 ++-- src/dvsim/launcher/local.py | 26 +-- src/dvsim/launcher/lsf.py | 24 +-- src/dvsim/launcher/nc.py | 34 ++-- src/dvsim/launcher/sge/engine.py | 155 ++++++++++----- src/dvsim/launcher/sge/launcher.py | 43 ++-- src/dvsim/launcher/sge/qsubopts.py | 6 +- src/dvsim/launcher/slurm.py | 14 +- src/dvsim/scheduler.py | 303 +++++++++++++++-------------- src/dvsim/sim_results.py | 39 ++-- src/dvsim/testplan.py | 2 +- 20 files changed, 753 insertions(+), 475 deletions(-) create mode 100644 src/dvsim/job/data.py diff --git a/ruff-ci.toml b/ruff-ci.toml index f0c39acd..dca968d7 100644 --- a/ruff-ci.toml +++ b/ruff-ci.toml @@ -92,7 +92,6 @@ ignore = [ "S103", "S202", "S311", - "S602", "S605", "SIM115", "SLF001", diff --git a/src/dvsim/cli.py b/src/dvsim/cli.py index 9690d275..8868dc30 100644 --- a/src/dvsim/cli.py +++ b/src/dvsim/cli.py @@ -82,7 +82,7 @@ def resolve_scratch_root(arg_scratch_root, proj_root): arg_scratch_root = os.path.realpath(arg_scratch_root) try: - os.makedirs(arg_scratch_root, exist_ok=True) + Path(arg_scratch_root).mkdir(exist_ok=True, parents=True) except PermissionError as e: log.fatal(f"Failed to create scratch root {arg_scratch_root}:\n{e}.") sys.exit(1) @@ -238,7 +238,7 @@ def copy_repo(src, dest) -> None: log.verbose("[copy_repo] [cmd]: \n%s", " ".join(cmd)) # Make sure the dest exists first. - os.makedirs(dest, exist_ok=True) + Path(dest).mkdir(exist_ok=True, parents=True) try: subprocess.run(cmd, check=True, capture_output=True) except subprocess.CalledProcessError as e: diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index ff126f08..6f2389ac 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -16,9 +16,10 @@ import hjson from dvsim.flow.hjson import set_target_attribute +from dvsim.job.data import CompletedJobStatus from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log -from dvsim.scheduler import CompletedJobStatus, Scheduler +from dvsim.scheduler import Scheduler from dvsim.utils import ( find_and_substitute_wildcards, md_results_to_html, @@ -403,7 +404,7 @@ def create_deploy_objects(self) -> None: for item in self.cfgs: item._create_deploy_objects() - def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: + def deploy_objects(self) -> Sequence[CompletedJobStatus]: """Public facing API for deploying all available objects. Runs each job and returns a map from item to status. @@ -430,13 +431,13 @@ def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: ) return Scheduler( - items=deploy, + items=[d.get_job_spec() for d in deploy], launcher_cls=get_launcher_cls(), interactive=self.interactive, ).run() @abstractmethod - def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: + def _gen_results(self, results: Sequence[CompletedJobStatus]) -> str: """Generate flow results. The function is called after the flow has completed. It collates @@ -446,18 +447,18 @@ def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: report, which is in markdown format. Args: - results: dictionary mapping deployed item names to completed job status. + results: completed job status objects. Returns: Results as a formatted string """ - def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: + def gen_results(self, results: Sequence[CompletedJobStatus]) -> None: """Generate flow results. Args: - results: dictionary mapping deployed item names to completed job status. + results: completed job status objects. """ for item in self.cfgs: @@ -476,7 +477,7 @@ def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: self.write_results(self.results_html_name, self.results_summary_md) @abstractmethod - def gen_results_summary(self) -> None: + def gen_results_summary(self) -> str: """Public facing API to generate summary results for each IP/cfg file.""" def write_results(self, html_filename: str, text_md: str, json_str: str | None = None) -> None: diff --git a/src/dvsim/flow/formal.py b/src/dvsim/flow/formal.py index 716f1610..5710ece5 100644 --- a/src/dvsim/flow/formal.py +++ b/src/dvsim/flow/formal.py @@ -228,7 +228,7 @@ def _gen_results(self, results): "results.hjson", ) try: - with open(result_data) as results_file: + with Path(result_data).open() as results_file: self.result = hjson.load(results_file, use_decimal=True) except OSError as err: log.warning("%s", err) diff --git a/src/dvsim/flow/one_shot.py b/src/dvsim/flow/one_shot.py index 1a30f295..ea8dc15d 100644 --- a/src/dvsim/flow/one_shot.py +++ b/src/dvsim/flow/one_shot.py @@ -4,7 +4,7 @@ """Class describing a one-shot build configuration object.""" -import os +import pathlib from collections import OrderedDict from dvsim.flow.base import FlowCfg @@ -135,7 +135,7 @@ def _create_dirs(self) -> None: """Create initial set of directories.""" for link in self.links: rm_path(self.links[link]) - os.makedirs(self.links[link]) + pathlib.Path(self.links[link]).mkdir(parents=True) def _create_deploy_objects(self) -> None: """Create deploy objects from build modes.""" diff --git a/src/dvsim/flow/sim.py b/src/dvsim/flow/sim.py index e492a9e1..1624f9a0 100644 --- a/src/dvsim/flow/sim.py +++ b/src/dvsim/flow/sim.py @@ -6,11 +6,10 @@ import fnmatch import json -import os import re import sys from collections import OrderedDict, defaultdict -from collections.abc import Mapping +from collections.abc import Sequence from datetime import datetime, timezone from pathlib import Path from typing import ClassVar @@ -18,13 +17,13 @@ from tabulate import tabulate from dvsim.flow.base import FlowCfg +from dvsim.job.data import CompletedJobStatus, JobSpec from dvsim.job.deploy import ( CompileSim, CovAnalyze, CovMerge, CovReport, CovUnr, - Deploy, RunTest, ) from dvsim.logging import log @@ -423,7 +422,7 @@ def _create_dirs(self) -> None: """Create initial set of directories.""" for link in self.links: rm_path(self.links[link]) - os.makedirs(self.links[link]) + Path(self.links[link]).mkdir(parents=True) def _expand_run_list(self, build_map): """Generate a list of tests to be run. @@ -540,8 +539,10 @@ def _create_deploy_objects(self) -> None: self._create_dirs() def _cov_analyze(self) -> None: - """Use the last regression coverage data to open up the GUI tool to - analyze the coverage. + """Open GUI tool for coverage analysis. + + Use the last regression coverage data to open up the GUI tool to analyze + the coverage. """ # Create initial set of directories, such as dispatched, passed etc. self._create_dirs() @@ -555,8 +556,10 @@ def cov_analyze(self) -> None: item._cov_analyze() def _cov_unr(self) -> None: - """Use the last regression coverage data to generate unreachable - coverage exclusions. + """Generate unreachable coverage exclusions. + + Use the last regression coverage data to generate unreachable coverage + exclusions. """ # TODO, Only support VCS if self.tool not in ["vcs", "xcelium"]: @@ -573,7 +576,7 @@ def cov_unr(self) -> None: for item in self.cfgs: item._cov_unr() - def _gen_json_results(self, run_results: Mapping[Deploy, str]) -> str: + def _gen_json_results(self, run_results: Sequence[CompletedJobStatus]) -> str: """Return the run results as json-formatted dictionary.""" def _pct_str_to_float(s: str) -> float | None: @@ -589,14 +592,8 @@ def _pct_str_to_float(s: str) -> float | None: def _test_result_to_dict(tr) -> dict: """Map a test result entry to a dict.""" - job_time_s = ( - tr.job_runtime.with_unit("s").get()[0] if tr.job_runtime is not None else None - ) - sim_time_us = ( - tr.simulated_time.with_unit("us").get()[0] - if tr.simulated_time is not None - else None - ) + job_time_s = tr.job_runtime + sim_time_us = tr.simulated_time pass_rate = tr.passing * 100.0 / tr.total if tr.total > 0 else 0 return { "name": tr.name, @@ -644,7 +641,7 @@ def _test_result_to_dict(tr) -> dict: # If the testplan does not yet have test results mapped to testpoints, # map them now. - sim_results = SimResults(self.deploy, run_results) + sim_results = SimResults(results=run_results) if not self.testplan.test_results_mapped: self.testplan.map_test_results(test_results=sim_results.table) @@ -707,12 +704,14 @@ def _test_result_to_dict(tr) -> dict: # Extract failure buckets. if sim_results.buckets: by_tests = sorted(sim_results.buckets.items(), key=lambda i: len(i[1]), reverse=True) + for bucket, tests in by_tests: unique_tests = defaultdict(list) for test, line, context in tests: - if not isinstance(test, RunTest): + if test.job_type != "RunTest": continue unique_tests[test.name].append((test, line, context)) + fts = [] for test_name, test_runs in unique_tests.items(): frs = [] @@ -721,12 +720,13 @@ def _test_result_to_dict(tr) -> dict: { "seed": str(test.seed), "failure_message": { - "log_file_path": test.get_log_path(), + "log_file_path": str(test.log_path), "log_file_line_num": line, "text": "".join(context), }, }, ) + fts.append( { "name": test_name, @@ -747,7 +747,7 @@ def _test_result_to_dict(tr) -> dict: # Return the `results` dictionary as json string. return json.dumps(self.results_dict) - def _gen_results(self, results: Mapping[Deploy, str]) -> str: + def _gen_results(self, results: Sequence[CompletedJobStatus]) -> str: """Generate simulation results. The function is called after the regression has completed. It collates the @@ -761,12 +761,12 @@ def _gen_results(self, results: Mapping[Deploy, str]) -> str: def indent_by(level: int) -> str: return " " * (4 * level) - def create_failure_message(test, line, context): + def create_failure_message(test: JobSpec, line, context): message = [f"{indent_by(2)}* {test.qual_name}\\"] if line: - message.append(f"{indent_by(2)} Line {line}, in log " + test.get_log_path()) + message.append(f"{indent_by(2)} Line {line}, in log {test.log_path}") else: - message.append(f"{indent_by(2)} Log {test.get_log_path()}") + message.append(f"{indent_by(2)} Log {test.log_path}") if context: message.append("") lines = [f"{indent_by(4)}{c.rstrip()}" for c in context] @@ -817,8 +817,7 @@ def create_bucket_report(buckets): fail_msgs.append("") return fail_msgs - deployed_items = self.deploy - sim_results = SimResults(deployed_items, results) + sim_results = SimResults(results=results) # Generate results table for runs. results_str = "## " + self.results_title + "\n" @@ -887,8 +886,17 @@ def create_bucket_report(buckets): # Append coverage results if coverage was enabled. if self.cov_report_deploy is not None: - report_status = results[self.cov_report_deploy.full_name] - if report_status == "P": + report_status: CompletedJobStatus | None = None + for job_status in results: + if job_status.full_name == self.cov_report_deploy.full_name: + report_status = job_status + break + + if report_status is None: + msg = f"Coverage report not found for {self.cov_report_deploy.full_name}" + raise KeyError(msg) + + if report_status.status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. if hasattr(self, "cov_report_page"): @@ -907,7 +915,7 @@ def create_bucket_report(buckets): self.results_md = results_str return results_str - def gen_results_summary(self): + def gen_results_summary(self) -> str: """Generate the summary results table. This method is specific to the primary cfg. It summarizes the results diff --git a/src/dvsim/job/data.py b/src/dvsim/job/data.py new file mode 100644 index 00000000..600b37f6 --- /dev/null +++ b/src/dvsim/job/data.py @@ -0,0 +1,154 @@ +# Copyright lowRISC contributors (OpenTitan project). +# Licensed under the Apache License, Version 2.0, see LICENSE for details. +# SPDX-License-Identifier: Apache-2.0 + +"""Job data models. + +The JobSpec is used to capture all the information required to be able to +schedule a job. Once the job has finished a CompletedJobStatus is used to +capture the results of the job run. +""" + +from collections.abc import Callable, Mapping, Sequence +from pathlib import Path + +from pydantic import BaseModel, ConfigDict + +from dvsim.launcher.base import ErrorMessage, Launcher + +__all__ = ( + "CompletedJobStatus", + "JobSpec", + "WorkspaceConfig", +) + + +class WorkspaceConfig(BaseModel): + """Workspace configuration.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + project: str + """Name of the project""" + timestamp: str + """Time stamp of the run.""" + + project_root: Path + """Path to the project root.""" + scratch_root: Path + """Path to the scratch directory root.""" + scratch_path: Path + """Path within the scratch directory to use for this run.""" + + +class JobSpec(BaseModel): + """Job specification.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + job_type: str + """Deployment type""" + + target: str + """run phase [build, run, ...]""" + flow: str + """Name of the flow config (e.g. tl_agent)""" + tool: str + """EDA tool used""" + + name: str + """Name of the job""" + seed: int | None + """Seed if there is one.""" + + full_name: str + """Full name disambiguates across multiple cfg being run (example: + 'aes:default', 'uart:default' builds. + """ + + qual_name: str + """Qualified name disambiguates the instance name with other instances + of the same class (example: 'uart_smoke' reseeded multiple times + needs to be disambiguated using the index -> '0.uart_smoke'. + """ + + workspace_cfg: WorkspaceConfig + """Workspace configuration.""" + + dependencies: list[str] + """Full names of the other Jobs that this one depends on.""" + needs_all_dependencies_passing: bool + """Wait for dependent jobs to pass before scheduling.""" + weight: int + """Weight to apply to the scheduling priority.""" + timeout_mins: int | None + """Timeout to apply to the launched job.""" + + cmd: str + """Command to run to execute the job.""" + exports: Mapping[str, str] + """Environment variables to set in the context of the running job.""" + dry_run: bool + """Go through the motions but don't actually run the job.""" + interactive: bool + """Enable interactive mode.""" + gui: bool + """Enable GUI mode.""" + + odir: Path + """Output directory for the job results files.""" + log_path: Path + """Path for the job log file.""" + links: Mapping[str, Path] + """Path for links directories.""" + + # TODO: remove the need for these callables here + pre_launch: Callable[[Launcher], None] + """Callback function for pre-launch actions.""" + post_finish: Callable[[str], None] + """Callback function for tidy up actions once the job is finished.""" + + pass_patterns: Sequence[str] + """regex patterns to match on to determine if the job is successful.""" + fail_patterns: Sequence[str] + """regex patterns to match on to determine if the job has failed.""" + + +class CompletedJobStatus(BaseModel): + """Job status.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + job_type: str + """Deployment type""" + name: str + """Name of the job""" + seed: int | None + """Seed if there is one.""" + + full_name: str + """Full name disambiguates across multiple cfg being run (example: + 'aes:default', 'uart:default' builds. + """ + + qual_name: str + """Qualified name disambiguates the instance name with other instances + of the same class (example: 'uart_smoke' reseeded multiple times + needs to be disambiguated using the index -> '0.uart_smoke'. + """ + + target: str + """run phase [build, run, ...]""" + + log_path: Path + """Path for the job log file.""" + + job_runtime: float + """Duration of the job.""" + simulated_time: float + """Simulation time.""" + + status: str + """Job status string [P,F,K,...]""" + fail_msg: ErrorMessage + """Error message.""" diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 511f157a..f7a82bd0 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -7,18 +7,17 @@ import pprint import random import shlex -from collections.abc import Mapping +from collections.abc import Callable, Mapping from pathlib import Path from typing import TYPE_CHECKING, ClassVar -from pydantic import BaseModel -from pydantic.config import ConfigDict from tabulate import tabulate +from dvsim.job.data import JobSpec, WorkspaceConfig from dvsim.job.time import JobTime from dvsim.launcher.base import Launcher from dvsim.logging import log -from dvsim.sim_utils import get_cov_summary_table, get_job_runtime, get_simulated_time +from dvsim.sim_utils import get_cov_summary_table from dvsim.utils import ( clean_odirs, find_and_substitute_wildcards, @@ -31,19 +30,6 @@ from dvsim.modes import BuildMode -class WorkspaceConfig(BaseModel): - """Workspace configuration.""" - - model_config = ConfigDict(frozen=True, extra="forbid") - - project: str - timestamp: str - - project_root: Path - scratch_root: Path - scratch_path: Path - - __all__ = ( "CompileSim", "Deploy", @@ -115,9 +101,6 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Construct the job's command. self.cmd = self._construct_cmd() - # Job's wall clock time (a.k.a CPU time, or runtime). - self.job_runtime = JobTime() - self.workspace_cfg = WorkspaceConfig( project=sim_cfg.name, project_root=sim_cfg.proj_root, @@ -126,6 +109,36 @@ def __init__(self, sim_cfg: "SimCfg") -> None: timestamp=sim_cfg.args.timestamp, ) + def get_job_spec(self) -> "JobSpec": + """Get the job spec for this deployment.""" + return JobSpec( + job_type=self.__class__.__name__, + target=self.target, + flow=self.flow, + tool=self.sim_cfg.tool, + name=self.name, + seed=getattr(self, "seed", None), + full_name=self.full_name, + qual_name=self.qual_name, + workspace_cfg=self.workspace_cfg, + dependencies=[d.full_name for d in self.dependencies], + weight=self.weight, + timeout_mins=self.get_timeout_mins(), + cmd=self.cmd, + exports=self.exports, + dry_run=self.dry_run, + interactive=self.sim_cfg.interactive, + gui=self.gui, + needs_all_dependencies_passing=self.needs_all_dependencies_passing, + pre_launch=self.pre_launch(), + post_finish=self.post_finish(), + odir=self.odir, + links=self.sim_cfg.links, + log_path=Path(f"{self.odir}/{self.target}.log"), + pass_patterns=self.pass_patterns, + fail_patterns=self.fail_patterns, + ) + def _define_attrs(self) -> None: """Define the attributes this instance needs to have. @@ -312,46 +325,30 @@ def is_equivalent_job(self, item: "Deploy") -> bool: log.verbose('Deploy job "%s" is equivalent to "%s"', item.name, self.name) return True - def pre_launch(self, launcher: Launcher) -> None: - """Perform additional pre-launch activities (callback). + def pre_launch(self) -> Callable[[Launcher], None]: + """Get pre-launch callback.""" - This is invoked by launcher::_pre_launch(). - """ - - def post_finish(self, status: str) -> None: - """Perform additional post-finish activities (callback). + def callback(launcher: Launcher) -> None: + """Perform additional pre-launch activities (callback). - This is invoked by launcher::_post_finish(). - """ + This is invoked by launcher::_pre_launch(). + """ - def get_log_path(self) -> str: - """Return the log file path.""" - return f"{self.odir}/{self.target}.log" + return callback - def get_timeout_mins(self) -> float | None: - """Return the timeout in minutes.""" + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" - def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: - """Extract information pertaining to the job from its log. + def callback(status: str) -> None: + """Perform additional post-finish activities (callback). - This method parses the log text after the job has completed, for the - extraction of information pertaining to the job's performance. This - base class method extracts the job's runtime (i.e. the wall clock time) - as reported by the tool. The tool reported runtime is the most accurate - since it is devoid of the delays incurred due to infrastructure and - setup overhead. + This is invoked by launcher::_post_finish(). + """ - The extended classes may override this method to extract other pieces - of information from the log. + return callback - `log_text` is the job's log file contents as a list of lines. - """ - try: - time, unit = get_job_runtime(log_text, self.sim_cfg.tool) - self.job_runtime.set(time, unit) - except RuntimeError as e: - log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") - self.job_runtime.set(job_runtime_secs, "s") + def get_timeout_mins(self) -> float | None: + """Return the timeout in minutes.""" def dump(self) -> Mapping: """Dump the deployment object to mapping object. @@ -360,15 +357,16 @@ def dump(self) -> Mapping: Representation of a deployment object as a dict. """ + job_spec = self.get_job_spec() return { - "full_name": self.full_name, + "full_name": job_spec.full_name, "type": self.__class__.__name__, - "exports": self.exports, - "interactive": self.sim_cfg.interactive, - "log_path": self.get_log_path(), - "timeout_mins": self.get_timeout_mins(), - "cmd": self.cmd, - "gui": self.gui, + "exports": job_spec.exports, + "interactive": job_spec.interactive, + "log_path": str(job_spec.log_path), + "timeout_mins": job_spec.timeout_mins, + "cmd": job_spec.cmd, + "gui": job_spec.gui, } @@ -454,11 +452,16 @@ def _set_attrs(self) -> None: if self.sim_cfg.args.build_timeout_mins is not None: self.build_timeout_mins = self.sim_cfg.args.build_timeout_mins - def pre_launch(self, launcher: Launcher) -> None: - """Perform pre-launch tasks.""" - # Delete old coverage database directories before building again. We - # need to do this because the build directory is not 'renewed'. - rm_path(self.cov_db_dir) + def pre_launch(self) -> Callable[[Launcher], None]: + """Get pre-launch callback.""" + + def callback(_: Launcher) -> None: + """Perform pre-launch tasks.""" + # Delete old coverage database directories before building again. We + # need to do this because the build directory is not 'renewed'. + rm_path(self.cov_db_dir) + + return callback def get_timeout_mins(self) -> float: """Return the timeout in minutes. @@ -637,15 +640,25 @@ def _set_attrs(self) -> None: self.run_timeout_multiplier, ) - def pre_launch(self, launcher: Launcher) -> None: + def pre_launch(self) -> Callable[[Launcher], None]: """Perform pre-launch tasks.""" - launcher.renew_odir = True - def post_finish(self, status) -> None: - """Perform tidy up tasks.""" - if status != "P": - # Delete the coverage data if available. - rm_path(self.cov_db_test_dir) + def callback(launcher: Launcher) -> None: + """Perform pre-launch tasks.""" + launcher.renew_odir = True + + return callback + + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" + + def callback(status: str) -> None: + """Perform tidy up tasks.""" + if status != "P": + # Delete the coverage data if available. + rm_path(self.cov_db_test_dir) + + return callback @staticmethod def get_seed() -> int: @@ -668,20 +681,6 @@ def get_timeout_mins(self): """ return self.run_timeout_mins if self.run_timeout_mins is not None else 60 - def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: - """Extract the time the design was simulated for, from the log.""" - super().extract_info_from_log( - job_runtime_secs=job_runtime_secs, - log_text=log_text, - ) - - try: - time, unit = get_simulated_time(log_text, self.sim_cfg.tool) - self.simulated_time.set(time, unit) - - except RuntimeError as e: - log.debug(f"{self.full_name}: {e}") - class CovUnr(Deploy): """Abstraction for coverage UNR flow.""" @@ -821,21 +820,28 @@ def _set_attrs(self) -> None: self.cov_results = "" self.cov_results_dict = {} - def post_finish(self, status) -> None: - """Extract the coverage results summary for the dashboard. + def post_finish(self) -> Callable[[str], None]: + """Get post finish callback.""" - If the extraction fails, an appropriate exception is raised, which must - be caught by the caller to mark the job as a failure. - """ - if self.dry_run or status != "P": - return + def callback(status: str) -> None: + """Extract the coverage results summary for the dashboard. - results, self.cov_total = get_cov_summary_table(self.cov_report_txt, self.sim_cfg.tool) + If the extraction fails, an appropriate exception is raised, which must + be caught by the caller to mark the job as a failure. + """ + if self.dry_run or status != "P": + return + + results, self.cov_total = get_cov_summary_table(self.cov_report_txt, self.sim_cfg.tool) + + colalign = ("center",) * len(results[0]) + self.cov_results = tabulate( + results, headers="firstrow", tablefmt="pipe", colalign=colalign + ) + for tup in zip(*results, strict=False): + self.cov_results_dict[tup[0]] = tup[1] - colalign = ("center",) * len(results[0]) - self.cov_results = tabulate(results, headers="firstrow", tablefmt="pipe", colalign=colalign) - for tup in zip(*results, strict=False): - self.cov_results_dict[tup[0]] = tup[1] + return callback class CovAnalyze(Deploy): diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index b852442f..69850bcb 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -15,11 +15,13 @@ from pydantic import BaseModel, ConfigDict +from dvsim.job.time import JobTime from dvsim.logging import log +from dvsim.sim_utils import get_job_runtime, get_simulated_time from dvsim.utils import clean_odirs, mk_symlink, rm_path if TYPE_CHECKING: - from dvsim.job.deploy import Deploy, WorkspaceConfig + from dvsim.job.data import JobSpec, WorkspaceConfig class LauncherError(Exception): @@ -90,14 +92,14 @@ class Launcher(ABC): context=[], ) - def __init__(self, deploy: "Deploy") -> None: + def __init__(self, job_spec: "JobSpec") -> None: """Initialise launcher. Args: - deploy: deployment object that will be launched. + job_spec: job specification for the job to be launched. """ - workspace_cfg = deploy.workspace_cfg + workspace_cfg = job_spec.workspace_cfg # One-time preparation of the workspace. if not Launcher.workspace_prepared: @@ -110,8 +112,7 @@ def __init__(self, deploy: "Deploy") -> None: self.prepare_workspace_for_cfg(workspace_cfg) Launcher.workspace_prepared_for_cfg.add(project) - # Store the deploy object handle. - self.deploy = deploy + self.job_spec = job_spec # Status of the job. This is primarily determined by the # _check_status() method, but eventually updated by the _post_finish() @@ -132,6 +133,16 @@ def __init__(self, deploy: "Deploy") -> None: # The actual job runtime computed by dvsim, in seconds. self.job_runtime_secs = 0 + # Job's wall clock time (a.k.a CPU time, or runtime). + # Field copied over from Deploy + # TODO: factor this out + self.job_runtime = JobTime() + self.simulated_time = JobTime() + + def __str__(self) -> str: + """Get a string representation.""" + return self.job_spec.full_name + ":launcher" + @staticmethod def set_pyvenv(project: str) -> None: """Activate a python virtualenv if available. @@ -191,30 +202,26 @@ def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: """ - def __str__(self) -> str: - """Get a string representation.""" - return self.deploy.full_name + ":launcher" - def _make_odir(self) -> None: """Create the output directory.""" # If renew_odir flag is True - then move it. if self.renew_odir: - clean_odirs(odir=self.deploy.odir, max_odirs=self.max_odirs) + clean_odirs(odir=self.job_spec.odir, max_odirs=self.max_odirs) - Path(self.deploy.odir).mkdir(exist_ok=True, parents=True) + Path(self.job_spec.odir).mkdir(exist_ok=True, parents=True) - def _link_odir(self, status) -> None: + def _link_odir(self, status: str) -> None: """Soft-links the job's directory based on job's status. The dispatched, passed and failed directories in the scratch area provide a quick way to get to the job that was executed. """ - dest = Path(self.deploy.sim_cfg.links[status], self.deploy.qual_name) - mk_symlink(path=self.deploy.odir, link=dest) + dest = Path(self.job_spec.links[status], self.job_spec.qual_name) + mk_symlink(path=self.job_spec.odir, link=dest) # Delete the symlink from dispatched directory if it exists. if status != "D": - old = Path(self.deploy.sim_cfg.links["D"], self.deploy.qual_name) + old = Path(self.job_spec.links["D"], self.job_spec.qual_name) rm_path(old) def _dump_env_vars(self, exports: Mapping[str, str]) -> None: @@ -223,8 +230,7 @@ def _dump_env_vars(self, exports: Mapping[str, str]) -> None: Each extended class computes the list of exports and invokes this method right before launching the job. """ - with open( - self.deploy.odir + "/env_vars", + with (self.job_spec.odir / "env_vars").open( "w", encoding="UTF-8", errors="surrogateescape", @@ -238,7 +244,7 @@ def _pre_launch(self) -> None: old runs, creating the output directory, dumping all env variables etc. This method is already invoked by launch() as the first step. """ - self.deploy.pre_launch(self) + self.job_spec.pre_launch(self) self._make_odir() self.start_time = datetime.datetime.now() @@ -300,20 +306,19 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: return pattern return None - if self.deploy.dry_run: + if self.job_spec.dry_run: return "P", None # Only one fail pattern needs to be seen. - chk_failed = bool(self.deploy.fail_patterns) + chk_failed = bool(self.job_spec.fail_patterns) # All pass patterns need to be seen, so we replicate the list and remove # patterns as we encounter them. - pass_patterns = self.deploy.pass_patterns.copy() + pass_patterns = list(self.job_spec.pass_patterns).copy() chk_passed = bool(pass_patterns) and (self.exit_code == 0) try: - with open( - self.deploy.get_log_path(), + with self.job_spec.log_path.open( encoding="UTF-8", errors="surrogateescape", ) as f: @@ -321,21 +326,46 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: except OSError as e: return "F", ErrorMessage( line_number=None, - message=f"Error opening file {self.deploy.get_log_path()}:\n{e}", + message=f"Error opening file {self.job_spec.log_path}:\n{e}", context=[], ) # Since the log file is already opened and read to assess the job's # status, use this opportunity to also extract other pieces of # information. - self.deploy.extract_info_from_log( - job_runtime_secs=self.job_runtime_secs, - log_text=lines, - ) + + # Extracts the job's runtime (i.e. the wall clock time) + # as reported by the tool. The tool reported runtime is the most accurate + # since it is devoid of the delays incurred due to infrastructure and + # setup overhead. + + try: + time, unit = get_job_runtime( + log_text=lines, + tool=self.job_spec.tool, + ) + self.job_runtime.set(time, unit) + + except RuntimeError as e: + log.warning( + f"{self.job_spec.full_name}: {e} Using dvsim-maintained job_runtime instead." + ) + self.job_runtime.set(self.job_runtime_secs, "s") + + if self.job_spec.job_type == "RunTest": + try: + time, unit = get_simulated_time( + log_text=lines, + tool=self.job_spec.tool, + ) + self.simulated_time.set(time, unit) + + except RuntimeError as e: + log.debug(f"{self.job_spec.full_name}: {e}") if chk_failed or chk_passed: for cnt, line in enumerate(lines): - if chk_failed and _find_patterns(self.deploy.fail_patterns, line): + if chk_failed and _find_patterns(self.job_spec.fail_patterns, line): # If failed, then nothing else to do. Just return. # Provide some extra lines for context. end = cnt + 5 @@ -385,7 +415,7 @@ def _post_finish(self, status: str, err_msg: ErrorMessage) -> None: try: # Run the target-specific cleanup tasks regardless of the job's # outcome. - self.deploy.post_finish(status) + self.job_spec.post_finish(status) except Exception as e: # If the job had already failed, then don't do anything. If it's diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 82e1da96..066b31df 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -4,37 +4,39 @@ """Fake Launcher that returns random results.""" -from random import choice, random +from random import choice from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher if TYPE_CHECKING: - from dvsim.job.deploy import CovReport, Deploy, RunTest, WorkspaceConfig + from dvsim.job.data import JobSpec, WorkspaceConfig __all__ = ("FakeLauncher",) -def _run_test_handler(deploy: "RunTest") -> str: +def _run_test_handler(job_spec: "JobSpec") -> str: """Handle a RunTest deploy job.""" return choice(("P", "F")) -def _cov_report_handler(deploy: "CovReport") -> str: +def _cov_report_handler(job_spec: "JobSpec") -> str: """Handle a CompileSim deploy job.""" - keys = [ - "score", - "line", - "cond", - "toggle", - "fsm", - "branch", - "assert", - "group", - ] - - deploy.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} + # TODO: this hack doesn't work any more and needs implementing by writing + # a file that can be parsed as if it's been generated by the tool. + # + # keys = [ + # "score", + # "line", + # "cond", + # "toggle", + # "fsm", + # "branch", + # "assert", + # "group", + # ] + # job_spec.cov_results_dict = {k: f"{random() * 100:.2f} %" for k in keys} return "P" @@ -51,18 +53,14 @@ class FakeLauncher(Launcher): # Poll job's completion status every this many seconds poll_freq = 0 - def __init__(self, deploy: "Deploy") -> None: - """Initialize common class members.""" - super().__init__(deploy) - def _do_launch(self) -> None: """Do the launch.""" def poll(self) -> str | None: """Check status of the running process.""" - deploy_cls = self.deploy.__class__.__name__ + deploy_cls = self.job_spec.job_type if deploy_cls in _DEPLOY_HANDLER: - return _DEPLOY_HANDLER[deploy_cls](deploy=self.deploy) + return _DEPLOY_HANDLER[deploy_cls](job_spec=self.job_spec) # Default result is Pass return "P" diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index bc703f4d..d0f2c14c 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -8,13 +8,13 @@ import os import shlex import subprocess -from pathlib import Path from typing import TYPE_CHECKING from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError if TYPE_CHECKING: - from dvsim.job.deploy import Deploy, WorkspaceConfig + from dvsim.job.data import JobSpec + from dvsim.job.deploy import WorkspaceConfig class LocalLauncher(Launcher): @@ -23,9 +23,9 @@ class LocalLauncher(Launcher): # Poll job's completion status every this many seconds poll_freq = 0.025 - def __init__(self, deploy: "Deploy") -> None: + def __init__(self, job_spec: "JobSpec") -> None: """Initialize common class members.""" - super().__init__(deploy) + super().__init__(job_spec) # Popen object when launching the job. self._process = None @@ -35,7 +35,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -47,9 +47,9 @@ def _do_launch(self) -> None: self._dump_env_vars(exports) - if not self.deploy.sim_cfg.interactive: - log_path = Path(self.deploy.get_log_path()) - timeout_mins = self.deploy.get_timeout_mins() + if not self.job_spec.interactive: + log_path = self.job_spec.log_path + timeout_mins = self.job_spec.timeout_mins self.timeout_secs = timeout_mins * 60 if timeout_mins else None @@ -59,11 +59,11 @@ def _do_launch(self) -> None: encoding="UTF-8", errors="surrogateescape", ) - self._log_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + self._log_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") self._log_file.flush() self._process = subprocess.Popen( - shlex.split(self.deploy.cmd), + shlex.split(self.job_spec.cmd), bufsize=4096, universal_newlines=True, stdout=self._log_file, @@ -88,7 +88,7 @@ def _do_launch(self) -> None: # Interactive. stdin / stdout are transparent # no timeout and blocking op as user controls the flow self._process = subprocess.Popen( - shlex.split(self.deploy.cmd), + shlex.split(self.job_spec.cmd), stdin=None, stdout=None, stderr=subprocess.STDOUT, @@ -121,10 +121,10 @@ def poll(self) -> str | None: if ( self.timeout_secs and (self.job_runtime_secs > self.timeout_secs) - and not (self.deploy.gui) + and not (self.job_spec.gui) ): self._kill() - timeout_mins = self.deploy.get_timeout_mins() + timeout_mins = self.job_spec.timeout_mins timeout_message = f"Job timed out after {timeout_mins} minutes" self._post_finish( "K", diff --git a/src/dvsim/launcher/lsf.py b/src/dvsim/launcher/lsf.py index e8d5277c..c7715e06 100644 --- a/src/dvsim/launcher/lsf.py +++ b/src/dvsim/launcher/lsf.py @@ -92,7 +92,7 @@ def prepare_workspace_for_cfg(cfg: "WorkspaceConfig") -> None: # Create the job dir. LsfLauncher.jobs_dir[cfg.project] = cfg.scratch_path / "lsf" / cfg.timestamp clean_odirs(odir=LsfLauncher.jobs_dir[cfg.project], max_odirs=2) - os.makedirs(Path(LsfLauncher.jobs_dir[cfg.project]), exist_ok=True) + Path(LsfLauncher.jobs_dir[cfg.project]).mkdir(exist_ok=True, parents=True) @staticmethod def make_job_script(cfg: "WorkspaceConfig", job_name: str): @@ -124,7 +124,7 @@ def make_job_script(cfg: "WorkspaceConfig", job_name: str): lines += ["case $1 in\n"] for job in LsfLauncher.jobs[cfg][job_name]: # Redirect the job's stdout and stderr to its log file. - cmd = f"{job.deploy.cmd} > {job.deploy.get_log_path()} 2>&1" + cmd = f"{job.deploy.cmd} > {job.deploy.log_path} 2>&1" lines += [f" {job.index})\n", f" {cmd};;\n"] # Throw error as a sanity check if the job index is invalid. @@ -134,7 +134,7 @@ def make_job_script(cfg: "WorkspaceConfig", job_name: str): job_script = LsfLauncher.jobs_dir[cfg] / job_name try: - with open(job_script, "w", encoding="utf-8") as f: + with Path(job_script).open("w", encoding="utf-8") as f: f.writelines(lines) except OSError as e: @@ -182,8 +182,8 @@ def __init__(self, deploy: "Deploy") -> None: def _do_launch(self) -> None: """Launch the job.""" # Add self to the list of jobs. - job_name = self.deploy.job_name - cfg = self.deploy.workspace_cfg + job_name = self.job_spec.job_name + cfg = self.job_spec.workspace_cfg job_total = len(LsfLauncher.jobs[cfg.project][job_name]) # The actual launching of the bsub command cannot happen until the @@ -196,7 +196,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -214,10 +214,10 @@ def _do_launch(self) -> None: job_array += "%100" # TODO: This needs to be moved to a HJson. - if self.deploy.sim_cfg.tool == "vcs": + if self.job_spec.tool == "vcs": job_rusage = "'rusage[vcssim=1,vcssim_dynamic=1:duration=1]'" - elif self.deploy.sim_cfg.tool == "xcelium": + elif self.job_spec.tool == "xcelium": job_rusage = "'rusage[xcelium=1,xcelium_dynamic=1:duration=1]'" else: @@ -236,8 +236,8 @@ def _do_launch(self) -> None: "-eo", f"{job_script}.%I.out", ] - if self.deploy.get_timeout_mins(): - cmd += ["-c", self.deploy.get_timeout_mins()] + if self.job_spec.timeout_mins: + cmd += ["-c", self.job_spec.timeout_mins] if job_rusage: cmd += ["-R", job_rusage] @@ -303,7 +303,7 @@ def poll(self) -> str | None: # If we got to this point, we can now open the job script output # file for reading. try: - self.bsub_out_fd = open(self.bsub_out) + self.bsub_out_fd = Path(self.bsub_out).open() except OSError as e: self._post_finish( @@ -435,7 +435,7 @@ def kill(self) -> None: except subprocess.CalledProcessError as e: log.exception("Failed to kill job: {}".format(e.stderr.decode("utf-8").strip())) else: - log.error("Job ID for %s not found", self.deploy.full_name) + log.error("Job ID for %s not found", self.job_spec.full_name) self._post_finish("K", ErrorMessage(message="Job killed!", context=[])) diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index ba7cc865..4b41bec7 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -53,10 +53,10 @@ def create_run_sh(self, full_path, cmd) -> None: pathlib.Path(run_file).chmod(0o755) def get_submit_cmd(self): - exetool = self.deploy.sim_cfg.tool - job_name = self.deploy.full_name - cmd = self.deploy.cmd - odir = self.deploy.odir + exetool = self.job_spec.tool + job_name = self.job_spec.full_name + cmd = self.job_spec.cmd + odir = self.job_spec.odir # TODO: These tool-specific names need moving into an hjson config # file. @@ -96,7 +96,7 @@ def _do_launch(self) -> None: # Compute the environment for the subprocess by overriding environment # variables of this process with matching ones from self.deploy.exports exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -111,16 +111,16 @@ def _do_launch(self) -> None: # For reruns, delete the log file of the past run to avoid any race # condition between the log file getting updated for the new run # versus the logic that distinguishes the job wait versus run times. - rm_path(self.deploy.get_log_path()) + rm_path(self.job_spec.log_path) # using os.open instead of fopen as this allows # sharing of file descriptors across processes - fd = os.open(self.deploy.get_log_path(), os.O_WRONLY | os.O_CREAT) + fd = os.open(self.job_spec.log_path, os.O_WRONLY | os.O_CREAT) fobj = os.fdopen(fd, "w", encoding="UTF-8") os.set_inheritable(fd, True) - message = f"[Executing]:\n{self.deploy.cmd}\n\n" + message = f"[Executing]:\n{self.job_spec.cmd}\n\n" fobj.write(message) fobj.flush() - if self.deploy.sim_cfg.interactive: + if self.job_spec.interactive: # Interactive: Set RUN_INTERACTIVE to 1 exports["RUN_INTERACTIVE"] = "1" # Line buffering (buf_size = 1) chosen to enable @@ -147,9 +147,9 @@ def _do_launch(self) -> None: stdout=std_out, stderr=std_err, env=exports, - cwd=self.deploy.odir, + cwd=self.job_spec.odir, ) - if self.deploy.sim_cfg.interactive: + if self.job_spec.interactive: for line in self.process.stdout: fobj.write(line) sys.stdout.write(line) @@ -157,7 +157,7 @@ def _do_launch(self) -> None: # the subprocess closes the stdout but still keeps running self.process.wait() except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) finally: self._close_process() @@ -179,8 +179,8 @@ def poll(self): """ assert self.process is not None if self.process.poll() is None: - run_timeout_mins = self.deploy.get_timeout_mins() - if run_timeout_mins is not None and not self.deploy.gui: + run_timeout_mins = self.job_spec.timeout_mins + if run_timeout_mins is not None and not self.job_spec.gui: wait_timeout_mins = 180 # max wait time in job / license queue # We consider the job to have started once its log file contains # something. file_size_thresh_bytes is a threshold: once the log @@ -188,7 +188,7 @@ def poll(self): file_size_thresh_bytes = 5120 # log file size threshold # query the log file size - f_size = os.path.getsize(self.deploy.get_log_path()) # noqa: PTH202 + f_size = os.path.getsize(self.job_spec.log_path) # noqa: PTH202 if f_size >= file_size_thresh_bytes: # noqa: SIM102 if self.nc_job_state == "waiting": @@ -238,9 +238,9 @@ def _kill(self) -> None: and SIGKILL. """ try: - log.verbose(f"[Stopping] : {self.deploy.full_name}") + log.verbose(f"[Stopping] : {self.job_spec.full_name}") subprocess.run( - ["nc", "stop", "-set", self.deploy.full_name, "-sig", "TERM,KILL"], + ["nc", "stop", "-set", self.job_spec.full_name, "-sig", "TERM,KILL"], check=True, capture_output=True, ) diff --git a/src/dvsim/launcher/sge/engine.py b/src/dvsim/launcher/sge/engine.py index d79e5652..18584308 100644 --- a/src/dvsim/launcher/sge/engine.py +++ b/src/dvsim/launcher/sge/engine.py @@ -11,6 +11,8 @@ import re import subprocess import time +from collections.abc import Generator, Sequence +from pathlib import Path from dvsim.logging import log @@ -18,7 +20,13 @@ class _JobData: """Internal helper class to manage job data from qstat.""" - def __init__(self, qstat_job_line) -> None: + def __init__(self, qstat_job_line: str) -> None: + """Initialise a JobData object for qstat job status parsing. + + Args: + qstat_job_line: string from qstat call + + """ # The format of the line goes like # job-ID prior name user state submit/start at queue slots ja-task-ID @@ -88,32 +96,33 @@ def __repr__(self) -> str: class JobList: """Internal helper class to manage job lists.""" - def __init__(self, qstat_output=None) -> None: + def __init__(self, qstat_output: str) -> None: + """Initialise JobList.""" self._joblist = [] for line in qstat_output.split("\n")[2:-1]: self._joblist.append(_JobData(line)) - def __iter__(self): + def __iter__(self) -> Generator[_JobData]: + """Get an Iterator over the job list.""" yield from self._joblist def __repr__(self) -> str: + """Get string representation of the job list.""" return "\n".join([str(job) for job in self._joblist]) class SGE: """External system call handler for Sun Grid Engine environment.""" - def __init__( - self, - q=None, - path="", - ) -> None: + def __init__(self, q=None, path: str | Path = "") -> None: + """Initialise Sun Grid Engine proxy.""" + path = Path(path) if q is None: # No queue specified. By default, submit to all available queues. - self.cmd_qconf = os.path.join(path, "qconf") + self.cmd_qconf = path / "qconf" try: - qliststr = _exec(self.cmd_qconf + " -sql") + qliststr = _exec(f"{self.cmd_qconf} -sql") except OSError: error_msg = "Error querying queue configuration" log.exception(error_msg) @@ -122,31 +131,42 @@ def __init__( self.q = qliststr.replace("\n", ",")[:-1] log.info( - """Sun Grid Engine handler initialized -Queues detected: %s""", + "Sun Grid Engine handler initialized\nQueues detected: %s", self.q, ) else: self.q = q - self.cmd_qsub = os.path.join(path, "qsub") - self.cmd_qstat = os.path.join(path, "qstat") + self.cmd_qsub = path / "qsub" + self.cmd_qstat = path / "qstat" - def wait(self, jobid, interval=10, name=None, pbar=None, pbar_mode=None) -> None: - """Waits for job running on SGE Grid Engine environment to finish. + def wait( + self, + jobid: int, + interval: int = 10, + name: str | None = None, + pbar: None = None, + ) -> None: + """Wait for job running on SGE Grid Engine environment to finish. If you are just waiting for one job, this becomes a dumb substitute for the -sync y option which can be specified to qsub. - Inputs: + Args: + jobid: job identifier + interval: polling interval of SGE queue, in seconds. (Default: 10) + name: name of the job + pbar: progress bar (not implemented?) - jobid - interval - Polling interval of SGE queue, in seconds. (Default: 10) """ dowait = True while dowait: - p = subprocess.Popen(self.cmd_qstat, shell=True, stdout=subprocess.PIPE) + p = subprocess.Popen( # noqa: S602 + self.cmd_qstat, + shell=True, + stdout=subprocess.PIPE, + ) pout, _ = p.communicate() if pbar is not None: @@ -192,9 +212,12 @@ def submit( nproc=1, wait=True, lammpi=True, - ): - """Submits a job to SGE - Returns jobid as a number. + ) -> int: + """Submit a job to SGE. + + Returns: + jobid as a number. + """ log.info( "Submitting job: " @@ -212,20 +235,28 @@ def submit( if name is not None: qsuboptslist.append("-N " + name) + if stdin is not None: qsuboptslist.append("-i " + stdin) + if stdout is not None: qsuboptslist.append("-o " + stdout) + if stderr is not None: qsuboptslist.append("-e " + stderr) + if joinstdouterr: qsuboptslist.append("-j") + if wait: qsuboptslist.append("-sync y") + if usecwd: qsuboptslist.append("-cwd") + if useenvironment: qsuboptslist.append("-V") + if array is not False: try: n = int(array[0]) @@ -237,6 +268,7 @@ def submit( error_msg = "array[0] being an out of bounds access." log.exception(error_msg) raise ValueError(error_msg) + try: m = int(array[1]) except ValueError: @@ -247,6 +279,7 @@ def submit( m = None msg = "array[1] being an out of bounds access." raise IndexError(msg) + try: s = int(array[2]) except IndexError: @@ -257,22 +290,28 @@ def submit( s = None msg = "Could not convert data to an integer." raise ValueError(msg) + if m == s is None: - qsuboptslist.append("-t %d" % n) + qsuboptslist.append(f"-t {n}") elif s is None: - qsuboptslist.append("-t %d-%d" % (n, m)) + qsuboptslist.append(f"-t {n}-{m}") else: - qsuboptslist.append("-t %d-%d:%d" % (n, m, s)) + qsuboptslist.append(f"-t {n}-{m}:{s}") qsubopts = " ".join(qsuboptslist) - pout = _exec(self.cmd_qsub, stdin=qsubopts + "\n" + job, print_command=False) + pout = _exec( + command=str(self.cmd_qsub), + stdin=qsubopts + "\n" + job, + print_command=False, + ) try: # Next to last line should be # "Your job 1389 (name) has been submitted" # parse for job id return int(pout.split("\n")[-2].split()[2]) + # except (ValueErrorValueError, IndexError, AttributeError) (e): except (ValueError, IndexError, AttributeError): error_msg = f"""Error submitting SGE job: @@ -284,21 +323,39 @@ def submit( log.exception(error_msg) raise OSError(error_msg) - def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]): - """Returns a list of SGE jobids run by a specific user. + def getuserjobs(self, user=pwd.getpwuid(os.getuid())[0]) -> Sequence[_JobData]: + """Return a list of SGE jobids run by a specific user. + + Args: + user: SGE user to poll (Default = '', i.e. current user) + qstat: path to qstat binary (Default = 'qstat') - Inputs - user - SGE user to poll (Default = '', i.e. current user) - qstat - path to qstat binary (Default = 'qstat') """ - p = subprocess.Popen(self.cmd_qstat + " -u " + user, shell=True, stdout=subprocess.PIPE) + p = subprocess.Popen( # noqa: S602 + f"{self.cmd_qstat} -u {user}", + shell=True, + stdout=subprocess.PIPE, + ) qstat_output, _ = p.communicate() joblist = JobList(qstat_output) return [job for job in joblist if job.user == user] - def run_job(self, command, name="default", logfnm="default.log", wait=True): + def run_job( + self, + command: str, + name: str = "default", + log_filename: str = "default.log", + *, + wait: bool = True, + ) -> int: """Run job on SGE with piped logging.""" - return self.submit(command, name=name, stdout=logfnm, stderr=logfnm, wait=wait) + return self.submit( + command, + name=name, + stdout=log_filename, + stderr=log_filename, + wait=wait, + ) def get_queue_instance_status(self): """Get loads for each queue instance.""" @@ -319,11 +376,17 @@ def get_queue_instance_status(self): return data -def _exec(command, print_to_screen=False, logfnm=None, stdin="", print_command=False): - """Runs command line using subprocess, optionally returning stdout.""" +def _exec( + command: str, + log_path: str | None = None, + stdin: str = "", + *, + print_command: bool = False, +) -> bytes: + """Run command line using subprocess, optionally returning stdout.""" - def _call_cmd(command, stdin=""): - p = subprocess.Popen( + def _call_cmd(command: str, stdin: str = "") -> bytes: + p = subprocess.Popen( # noqa: S602 command, shell=True, stdin=subprocess.PIPE, @@ -334,26 +397,26 @@ def _call_cmd(command, stdin=""): return output if print_command: - log.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s", command, logfnm) + log.info("Executing process: \x1b[1;92m%-50s\x1b[0m Logfile: %s", command, log_path) output = "" - if logfnm is not None: + if log_path is not None: try: - with open(logfnm, "a") as f: + with Path(log_path).open("a") as f: if print_command: pass + output = _call_cmd(command, stdin) f.write(output) + except OSError: - error_msg = "Error: File: " + str(logfnm) + " does not appear to exist." + error_msg = "Error: File: " + str(log_path) + " does not appear to exist." log.exception(error_msg) raise OSError(error_msg) + else: output = _call_cmd(command, stdin) log.info("Output of command is:\n%s", output) - if print_to_screen: - pass - return output diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index 3989d175..2eab0228 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -5,9 +5,9 @@ """SgeLauncher Class.""" import os -import pathlib import shlex import subprocess +from pathlib import Path from subprocess import PIPE, Popen from typing import TYPE_CHECKING @@ -37,7 +37,7 @@ def _do_launch(self) -> None: # Update the shell's env vars with self.exports. Values in exports must # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -50,15 +50,14 @@ def _do_launch(self) -> None: self._dump_env_vars(exports) try: - f = pathlib.Path(self.deploy.get_log_path()).open( - "w", encoding="UTF-8", errors="surrogateescape" - ) - f.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + f = self.job_spec.log_path.open("w", encoding="UTF-8", errors="surrogateescape") + f.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") f.flush() + # ---------- prepare SGE job struct ----- sge_job = SGE.QSubOptions() # noqa: F405 sge_job.args.N = "VCS_RUN_" + str(pid) # Name of Grid Engine job - if "build.log" in self.deploy.get_log_path(): + if "build.log" in self.job_spec.log_path: sge_job.args.N = "VCS_BUILD_" + str(pid) # Name of Grid Engine job job_name = sge_job.args.N @@ -68,14 +67,15 @@ def _do_launch(self) -> None: sge_job.args.q = "vcs_q" # Define the sge queue name sge_job.args.p = "0" # Set priority to 0 sge_job.args.ll = "mf=20G" # memory req,request the given resources + # pecifies a range of priorities from -1023 to 1024. # The higher the number, the higher the priority. # The default priority for jobs is zero - sge_job.args.command = '"' + self.deploy.cmd + '"' + sge_job.args.command = '"' + self.job_spec.cmd + '"' sge_job.args.b = "y" # This is a binary file - sge_job.args.o = self.deploy.get_log_path() + ".sge" + sge_job.args.o = f"{self.job_spec.log_path}.sge" cmd = str(sge_job.execute(mode="echo")) - # --------------- + self.process = subprocess.Popen( shlex.split(cmd), bufsize=4096, @@ -85,16 +85,18 @@ def _do_launch(self) -> None: env=exports, ) f.close() + except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) + finally: self._close_process() self._link_odir("D") f.close() - def poll(self): + def poll(self) -> str: """Check status of the running process. This returns 'D', 'P' or 'F'. If 'D', the job is still running. If 'P', @@ -107,17 +109,16 @@ def poll(self): if self.process.poll() is None: return "D" # ------------------------------------- - # copy SGE jobb results to log file - if pathlib.Path(self.deploy.get_log_path() + ".sge").exists(): - file1 = pathlib.Path(self.deploy.get_log_path() + ".sge").open(errors="replace") + # copy SGE job results to log file + sge_log_path = Path(f"{self.job_spec.log_path}.sge") + if sge_log_path.exists(): + file1 = sge_log_path.open(errors="replace") lines = file1.readlines() file1.close() - f = pathlib.Path(self.deploy.get_log_path()).open( - "a", encoding="UTF-8", errors="surrogateescape" - ) + f = self.job_spec.log_path.open("a", encoding="UTF-8", errors="surrogateescape") f.writelines(lines) f.flush() - pathlib.Path(self.deploy.get_log_path() + ".sge").unlink() + sge_log_path.unlink() f.close() # ------------------------------------- @@ -146,13 +147,13 @@ def kill(self) -> None: # ---------------------------- # qdel -f kill sge job_name cmd = "qstatus -a | grep " + job_name - with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: + with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: # noqa: S602 output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") if output != "": output_l = output.split() cmd = "qdel " + output_l[0] - with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: + with Popen(cmd, stdout=PIPE, stderr=None, shell=True) as process: # noqa: S602 output = process.communicate()[0].decode("utf-8") output = output.rstrip("\n") # ---------------------------- diff --git a/src/dvsim/launcher/sge/qsubopts.py b/src/dvsim/launcher/sge/qsubopts.py index 4c593f9b..0c4d0335 100644 --- a/src/dvsim/launcher/sge/qsubopts.py +++ b/src/dvsim/launcher/sge/qsubopts.py @@ -1893,7 +1893,7 @@ def write_qsub_script(self, filename, echo=False) -> None: if echo: pass - f = open(filename, "w") + f = pathlib.Path(filename).open("w") f.write("\n".join(buf)) f.close() @@ -1937,7 +1937,7 @@ def execute(self, mode="local", path=""): cwd = pathlib.Path.cwd() command_file = cwd + "/command_file_" + str(os.getpid()) + "_" + test_id try: - with open(command_file, "w") as f_command: + with pathlib.Path(command_file).open("w") as f_command: command_temp = str(self.args.command) command_temp = command_temp.replace('"', "") f_command.write(command_temp + "\n/bin/rm -f " + command_file) @@ -1957,7 +1957,7 @@ def execute(self, mode="local", path=""): if mode == "local": import subprocess - subprocess.Popen( + subprocess.Popen( # noqa: S602 command_file, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index d246d31d..deab4bc5 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -33,12 +33,12 @@ def __init__(self, deploy) -> None: # Popen object when launching the job. self.process = None - self.slurm_log_file = self.deploy.get_log_path() + ".slurm" + self.slurm_log_file = f"{self.job_spec.log_path}.slurm" def _do_launch(self) -> None: # replace the values in the shell's env vars if the keys match. exports = os.environ.copy() - exports.update(self.deploy.exports) + exports.update(self.job_spec.exports) # Clear the magic MAKEFLAGS variable from exports if necessary. This # variable is used by recursive Make calls to pass variables from one @@ -59,12 +59,12 @@ def _do_launch(self) -> None: slurm_cmd = ( f"srun -p {SLURM_QUEUE} --mem={SLURM_MEM} --mincpus={SLURM_MINCPUS} " f"--time={SLURM_TIMEOUT} --cpus-per-task={SLURM_CPUS_PER_TASK} " - f'bash -c "{slurm_setup_cmd} {self.deploy.cmd}"' + f'bash -c "{slurm_setup_cmd} {self.job_spec.cmd}"' ) try: with pathlib.Path(self.slurm_log_file).open("w") as out_file: - out_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n") + out_file.write(f"[Executing]:\n{self.job_spec.cmd}\n\n") out_file.flush() log.info(f"Executing slurm command: {slurm_cmd}") @@ -80,7 +80,7 @@ def _do_launch(self) -> None: msg = f"File Error: {e}\nError while handling {self.slurm_log_file}" raise LauncherError(msg) except subprocess.SubprocessError as e: - msg = f"IO Error: {e}\nSee {self.deploy.get_log_path()}" + msg = f"IO Error: {e}\nSee {self.job_spec.log_path}" raise LauncherError(msg) finally: self._close_process() @@ -105,10 +105,10 @@ def poll(self): try: with pathlib.Path(self.slurm_log_file).open() as slurm_file: try: - with pathlib.Path(self.deploy.get_log_path()).open("a") as out_file: + with self.job_spec.log_path.open("a") as out_file: shutil.copyfileobj(slurm_file, out_file) except OSError as e: - msg = f"File Error: {e} when handling {self.deploy.get_log_path()}" + msg = f"File Error: {e} when handling {self.job_spec.log_path}" raise LauncherError( msg, ) diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index de5f9041..28306b60 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -17,10 +17,8 @@ from types import FrameType from typing import TYPE_CHECKING, Any -from pydantic import BaseModel, ConfigDict - -from dvsim.job.deploy import Deploy -from dvsim.launcher.base import ErrorMessage, Launcher, LauncherBusyError, LauncherError +from dvsim.job.data import CompletedJobStatus, JobSpec +from dvsim.launcher.base import Launcher, LauncherBusyError, LauncherError from dvsim.logging import log from dvsim.utils.status_printer import get_status_printer from dvsim.utils.timer import Timer @@ -29,17 +27,8 @@ from dvsim.flow.base import FlowCfg -class CompletedJobStatus(BaseModel): - """Job status.""" - - model_config = ConfigDict(frozen=True, extra="forbid") - - status: str - fail_msg: ErrorMessage - - def total_sub_items( - d: Mapping[str, Sequence[Deploy]] | Mapping["FlowCfg", Sequence[Deploy]], + d: Mapping[str, Sequence[JobSpec]] | Mapping["FlowCfg", Sequence[JobSpec]], ) -> int: """Return the total number of sub items in a mapping. @@ -82,11 +71,11 @@ def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: class Scheduler: - """An object that runs one or more Deploy items.""" + """An object that runs one or more jobs from JobSpec items.""" def __init__( self, - items: Sequence[Deploy], + items: Sequence[JobSpec], launcher_cls: type[Launcher], *, interactive: bool, @@ -99,25 +88,25 @@ def __init__( interactive: launch the tools in interactive mode. """ - self.items: Sequence[Deploy] = items + self._jobs: Mapping[str, JobSpec] = {i.full_name: i for i in items} - # 'scheduled[target][cfg]' is a list of Deploy objects for the chosen + # 'scheduled[target][cfg]' is a list of JobSpec object names for the chosen # target and cfg. As items in _scheduled are ready to be run (once # their dependencies pass), they are moved to the _queued list, where # they wait until slots are available for them to be dispatched. # When all items (in all cfgs) of a target are done, it is removed from # this dictionary. - self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[Deploy]]] = {} - self.add_to_scheduled(items) + self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[str]]] = {} + self.add_to_scheduled(jobs=self._jobs) # Print status periodically using an external status printer. - self.status_printer = get_status_printer(interactive) - self.status_printer.print_header( + self._status_printer = get_status_printer(interactive) + self._status_printer.print_header( msg="Q: queued, D: dispatched, P: passed, F: failed, K: killed, T: total", ) # Sets of items, split up by their current state. The sets are - # disjoint and their union equals the keys of self.item_to_status. + # disjoint and their union equals the keys of self.item_status. # _queued is a list so that we dispatch things in order (relevant # for things like tests where we have ordered things cleverly to # try to see failures early). They are maintained for each target. @@ -128,49 +117,55 @@ def __init__( # the entire regression. We keep rotating through our list of running # items, picking up where we left off on the last poll. self._targets: Sequence[str] = list(self._scheduled.keys()) - self._queued: MutableMapping[str, MutableSequence[Deploy]] = {} - self._running: MutableMapping[str, MutableSequence[Deploy]] = {} - self._passed: MutableMapping[str, MutableSet[Deploy]] = {} - self._failed: MutableMapping[str, MutableSet[Deploy]] = {} - self._killed: MutableMapping[str, MutableSet[Deploy]] = {} - self._total = {} - self.last_target_polled_idx = -1 - self.last_item_polled_idx = {} + self._total: MutableMapping[str, int] = {} + + self._queued: MutableMapping[str, MutableSequence[str]] = {} + self._running: MutableMapping[str, MutableSequence[str]] = {} + + self._passed: MutableMapping[str, MutableSet[str]] = {} + self._failed: MutableMapping[str, MutableSet[str]] = {} + self._killed: MutableMapping[str, MutableSet[str]] = {} + + self._last_target_polled_idx = -1 + self._last_item_polled_idx = {} + for target in self._scheduled: self._queued[target] = [] self._running[target] = [] + self._passed[target] = set() self._failed[target] = set() self._killed[target] = set() + self._total[target] = total_sub_items(self._scheduled[target]) - self.last_item_polled_idx[target] = -1 + self._last_item_polled_idx[target] = -1 # Stuff for printing the status. width = len(str(self._total[target])) field_fmt = f"{{:0{width}d}}" - self.msg_fmt = ( + self._msg_fmt = ( f"Q: {field_fmt}, D: {field_fmt}, P: {field_fmt}, " f"F: {field_fmt}, K: {field_fmt}, T: {field_fmt}" ) - msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) - self.status_printer.init_target(target=target, msg=msg) + msg = self._msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) + self._status_printer.init_target(target=target, msg=msg) - # A map from the Deployment names tracked by this class to their + # A map from the job names tracked by this class to their # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. - self.item_status: MutableMapping[str, str] = {} + self.job_status: MutableMapping[str, str] = {} # Create the launcher instance for all items. self._launchers: Mapping[str, Launcher] = { - item.full_name: launcher_cls(item) for item in self.items + full_name: launcher_cls(job_spec) for full_name, job_spec in self._jobs.items() } # The chosen launcher class. This allows us to access launcher # variant-specific settings such as max parallel jobs & poll rate. - self.launcher_cls: type[Launcher] = launcher_cls + self._launcher_cls: type[Launcher] = launcher_cls - def run(self) -> Mapping[str, CompletedJobStatus]: + def run(self) -> Sequence[CompletedJobStatus]: """Run all scheduled jobs and return the results. Returns the results (status) of all items dispatched for all @@ -226,50 +221,68 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: # polling loop. But we do it with a bounded wait on stop_now so # that we jump back to the polling loop immediately on a # signal. - stop_now.wait(timeout=self.launcher_cls.poll_freq) + stop_now.wait(timeout=self._launcher_cls.poll_freq) finally: signal(SIGINT, old_handler) # Cleanup the status printer. - self.status_printer.exit() + self._status_printer.exit() # We got to the end without anything exploding. Return the results. - return { - name: CompletedJobStatus( - status=status, - fail_msg=self._launchers[name].fail_msg, + results = [] + for name, status in self.job_status.items(): + launcher = self._launchers[name] + job_spec = self._jobs[name] + + results.append( + CompletedJobStatus( + job_type=job_spec.job_type, + name=job_spec.name, + seed=job_spec.seed, + full_name=name, + qual_name=job_spec.qual_name, + target=job_spec.target, + status=status, + fail_msg=launcher.fail_msg, + job_runtime=launcher.job_runtime.with_unit("s").get()[0], + simulated_time=launcher.simulated_time.with_unit("us").get()[0], + log_path=job_spec.log_path, + ) ) - for name, status in self.item_status.items() - } - def add_to_scheduled(self, items: Sequence[Deploy]) -> None: - """Add items to the schedule. + return results + + def add_to_scheduled(self, jobs: Mapping[str, JobSpec]) -> None: + """Add jobs to the schedule. Args: - items: Deploy objects to add to the schedule. + jobs: the jobs to add to the schedule. """ - for item in items: - target_dict = self._scheduled.setdefault(item.target, {}) - cfg_list = target_dict.setdefault(item.flow, []) - if item not in cfg_list: - cfg_list.append(item) + for full_name, job_spec in jobs.items(): + target_dict = self._scheduled.setdefault(job_spec.target, {}) + cfg_list = target_dict.setdefault(job_spec.flow, []) - def _unschedule_item(self, item: Deploy) -> None: + if job_spec not in cfg_list: + cfg_list.append(full_name) + + def _unschedule_item(self, job_name: str) -> None: """Remove deploy item from the schedule.""" - target_dict = self._scheduled[item.target] - cfg_list = target_dict.get(item.flow) + job = self._jobs[job_name] + target_dict = self._scheduled[job.target] + cfg_list = target_dict.get(job.flow) + if cfg_list is not None: with contextlib.suppress(ValueError): - cfg_list.remove(item) + cfg_list.remove(job_name) # When all items in _scheduled[target][cfg] are finally removed, # the cfg key is deleted. if not cfg_list: - del target_dict[item.flow] + del target_dict[job.flow] - def _enqueue_successors(self, item: Deploy | None = None) -> None: + def _enqueue_successors(self, job_name: str | None = None) -> None: """Move an item's successors from _scheduled to _queued. 'item' is the recently run job that has completed. If None, then we @@ -277,34 +290,32 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None: target. If 'item' is specified, then we find its successors and move them to _queued. """ - for next_item in self._get_successors(item): - if ( - next_item.full_name in self.item_status - or next_item in self._queued[next_item.target] - ): - msg = f"Job {next_item.full_name} already scheduled" + for next_job_name in self._get_successors(job_name): + target = self._jobs[next_job_name].target + if next_job_name in self.job_status or next_job_name in self._queued[target]: + msg = f"Job {next_job_name} already scheduled" raise RuntimeError(msg) - self.item_status[next_item.full_name] = "Q" - self._queued[next_item.target].append(next_item) - self._unschedule_item(next_item) + self.job_status[next_job_name] = "Q" + self._queued[target].append(next_job_name) + self._unschedule_item(next_job_name) - def _cancel_successors(self, item: Deploy) -> None: + def _cancel_successors(self, job_name: str) -> None: """Cancel an item's successors. Recursively move them from _scheduled or _queued to _killed. Args: - item: job whose successors are to be canceled. + job_name: job whose successors are to be canceled. """ - items = list(self._get_successors(item)) + items = list(self._get_successors(job_name)) while items: next_item = items.pop() self._cancel_item(next_item, cancel_successors=False) items.extend(self._get_successors(next_item)) - def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: + def _get_successors(self, job_name: str | None = None) -> Sequence[str]: """Find immediate successors of an item. We choose the target that follows the 'item''s current target and find @@ -313,13 +324,13 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: from the cfg to which the item belongs. Args: - item: is a job that has completed. + job_name: name of the job Returns: - list of item's successors, or an empty list if there are none. + list of the jobs successors, or an empty list if there are none. """ - if item is None: + if job_name is None: target = next(iter(self._scheduled)) if target is None: @@ -328,8 +339,10 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: cfgs = set(self._scheduled[target]) else: - if item.target not in self._scheduled: - msg = f"Scheduler does not contain target {item.target}" + job: JobSpec = self._jobs[job_name] + + if job.target not in self._scheduled: + msg = f"Scheduler does not contain target {job.target}" raise KeyError(msg) target_iterator = iter(self._scheduled) @@ -337,7 +350,7 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: found = False while not found: - if target == item.target: + if target == job.target: found = True try: @@ -349,7 +362,7 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: if target is None: return [] - cfgs = {item.flow} + cfgs = {job.flow} # Find item's successors that can be enqueued. We assume here that # only the immediately succeeding target can be enqueued at this @@ -357,11 +370,12 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: successors = [] for cfg in cfgs: for next_item in self._scheduled[target][cfg]: - if item is not None: + if job_name is not None: + job = self._jobs[next_item] # Something is terribly wrong if item exists but the # next_item's dependency list is empty. - assert next_item.dependencies - if item not in next_item.dependencies: + assert job.dependencies + if job_name not in job.dependencies: continue if self._ok_to_enqueue(next_item): @@ -369,32 +383,32 @@ def _get_successors(self, item: Deploy | None = None) -> Sequence[Deploy]: return successors - def _ok_to_enqueue(self, item: Deploy) -> bool: + def _ok_to_enqueue(self, job_name: str) -> bool: """Check if all dependencies jobs are completed. Args: - item: is a deployment job. + job_name: name of job. Returns: true if ALL dependencies of item are complete. """ - for dep in item.dependencies: + for dep in self._jobs[job_name].dependencies: # Ignore dependencies that were not scheduled to run. - if dep not in self.items: + if dep not in self._jobs: continue # Has the dep even been enqueued? - if dep.full_name not in self.item_status: + if dep not in self.job_status: return False # Has the dep completed? - if self.item_status[dep.full_name] not in ["P", "F", "K"]: + if self.job_status[dep] not in ["P", "F", "K"]: return False return True - def _ok_to_run(self, item: Deploy) -> bool: + def _ok_to_run(self, job_name: str) -> bool: """Check if a job is ready to start. The item's needs_all_dependencies_passing setting is used to figure @@ -402,31 +416,32 @@ def _ok_to_run(self, item: Deploy) -> bool: statuses. Args: - item: is a deployment job. + job_name: name of the job to check Returns: true if the required dependencies have passed. """ + job: JobSpec = self._jobs[job_name] # 'item' can run only if its dependencies have passed (their results # should already show up in the item to status map). - for dep in item.dependencies: + for dep_name in job.dependencies: # Ignore dependencies that were not scheduled to run. - if dep not in self.items: + if dep_name not in self._jobs: continue - dep_status = self.item_status[dep.full_name] + dep_status = self.job_status[dep_name] if dep_status not in ["P", "F", "K"]: raise ValueError("Status must be one of P, F, or K") - if item.needs_all_dependencies_passing: + if job.needs_all_dependencies_passing: if dep_status in ["F", "K"]: return False elif dep_status in ["P"]: return True - return item.needs_all_dependencies_passing + return job.needs_all_dependencies_passing def _poll(self, hms: str) -> bool: """Check for running items that have finished. @@ -436,7 +451,7 @@ def _poll(self, hms: str) -> bool: """ max_poll = min( - self.launcher_cls.max_poll, + self._launcher_cls.max_poll, total_sub_items(self._running), ) @@ -448,18 +463,18 @@ def _poll(self, hms: str) -> bool: changed = False while max_poll: - target, self.last_target_polled_idx = get_next_item( + target, self._last_target_polled_idx = get_next_item( self._targets, - self.last_target_polled_idx, + self._last_target_polled_idx, ) while self._running[target] and max_poll: max_poll -= 1 - item, self.last_item_polled_idx[target] = get_next_item( + job_name, self._last_item_polled_idx[target] = get_next_item( self._running[target], - self.last_item_polled_idx[target], + self._last_item_polled_idx[target], ) - status = self._launchers[item.full_name].poll() + status = self._launchers[job_name].poll() level = log.VERBOSE if status not in ["D", "P", "F", "E", "K"]: @@ -470,27 +485,27 @@ def _poll(self, hms: str) -> bool: continue if status == "P": - self._passed[target].add(item) + self._passed[target].add(job_name) elif status == "F": - self._failed[target].add(item) + self._failed[target].add(job_name) level = log.ERROR else: # Killed or Error dispatching - self._killed[target].add(item) + self._killed[target].add(job_name) level = log.ERROR - self._running[target].pop(self.last_item_polled_idx[target]) - self.last_item_polled_idx[target] -= 1 - self.item_status[item.full_name] = status + self._running[target].pop(self._last_item_polled_idx[target]) + self._last_item_polled_idx[target] -= 1 + self.job_status[job_name] = status log.log( level, "[%s]: [%s]: [status] [%s: %s]", hms, target, - item.full_name, + job_name, status, ) @@ -501,14 +516,14 @@ def _poll(self, hms: str) -> bool: # jobs). Hence we enqueue all successors rather than canceling # them right here. We leave it to _dispatch() to figure out # whether an enqueued item can be run or not. - self._enqueue_successors(item) + self._enqueue_successors(job_name) changed = True return changed def _dispatch(self, hms: str) -> None: """Dispatch some queued items if possible.""" - slots = self.launcher_cls.max_parallel - total_sub_items(self._running) + slots = self._launcher_cls.max_parallel - total_sub_items(self._running) if slots <= 0: return @@ -516,7 +531,9 @@ def _dispatch(self, hms: str) -> None: # weights. sum_weight = 0 slots_filled = 0 - total_weight = sum(self._queued[t][0].weight for t in self._queued if self._queued[t]) + total_weight = sum( + self._jobs[self._queued[t][0]].weight for t in self._queued if self._queued[t] + ) for target in self._scheduled: if not self._queued[target]: @@ -541,7 +558,7 @@ def _dispatch(self, hms: str) -> None: # solution, except that it prioritizes the slot allocation to # targets that are earlier in the list such that in the end, all # slots are fully consumed. - sum_weight += self._queued[target][0].weight + sum_weight += self._jobs[self._queued[target][0]].weight target_slots = round((slots * sum_weight) / total_weight) - slots_filled if target_slots <= 0: continue @@ -565,32 +582,32 @@ def _dispatch(self, hms: str) -> None: "[%s]: [%s]: [dispatch]:\n%s", hms, target, - ", ".join(item.full_name for item in to_dispatch), + ", ".join(job_name for job_name in to_dispatch), ) - for item in to_dispatch: + for job_name in to_dispatch: try: - self._launchers[item.full_name].launch() + self._launchers[job_name].launch() except LauncherError: - log.exception("Error launching %s", item) - self._kill_item(item) + log.exception("Error launching %s", job_name) + self._kill_item(job_name) except LauncherBusyError: log.exception("Launcher busy") - self._queued[target].append(item) + self._queued[target].append(job_name) log.verbose( "[%s]: [%s]: [reqeued]: %s", hms, target, - item.full_name, + job_name, ) continue - self._running[target].append(item) - self.item_status[item.full_name] = "D" + self._running[target].append(job_name) + self.job_status[job_name] = "D" def _kill(self) -> None: """Kill any running items and cancel any that are waiting.""" @@ -629,9 +646,9 @@ def _check_if_done(self, hms: str) -> bool: perc = done_cnt / self._total[target] * 100 running = ", ".join( - [f"{item.full_name}" for item in self._running[target]], + [f"{job_name}" for job_name in self._running[target]], ) - msg = self.msg_fmt.format( + msg = self._msg_fmt.format( len(self._queued[target]), len(self._running[target]), len(self._passed[target]), @@ -639,7 +656,7 @@ def _check_if_done(self, hms: str) -> bool: len(self._killed[target]), self._total[target], ) - self.status_printer.update_target( + self._status_printer.update_target( target=target, msg=msg, hms=hms, @@ -648,36 +665,38 @@ def _check_if_done(self, hms: str) -> bool: ) return done - def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: + def _cancel_item(self, job_name: str, *, cancel_successors: bool = True) -> None: """Cancel an item and optionally all of its successors. Supplied item may be in _scheduled list or the _queued list. From either, we move it straight to _killed. Args: - item: is a deployment job. + job_name: name of the job to cancel cancel_successors: if set then cancel successors as well (True). """ - self.item_status[item.full_name] = "K" - self._killed[item.target].add(item) - if item in self._queued[item.target]: - self._queued[item.target].remove(item) + target = self._jobs[job_name].target + self.job_status[job_name] = "K" + self._killed[target].add(job_name) + if job_name in self._queued[target]: + self._queued[target].remove(job_name) else: - self._unschedule_item(item) + self._unschedule_item(job_name) if cancel_successors: - self._cancel_successors(item) + self._cancel_successors(job_name) - def _kill_item(self, item: Deploy) -> None: + def _kill_item(self, job_name: str) -> None: """Kill a running item and cancel all of its successors. Args: - item: is a deployment job. + job_name: name of the job to kill """ - self._launchers[item.full_name].kill() - self.item_status[item.full_name] = "K" - self._killed[item.target].add(item) - self._running[item.target].remove(item) - self._cancel_successors(item) + target = self._jobs[job_name].target + self._launchers[job_name].kill() + self.job_status[job_name] = "K" + self._killed[target].add(job_name) + self._running[target].remove(job_name) + self._cancel_successors(job_name) diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index 7b4765f7..10ae09ea 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -6,9 +6,9 @@ import collections import re -from collections.abc import Mapping +from collections.abc import Sequence -from dvsim.scheduler import CompletedJobStatus +from dvsim.job.data import CompletedJobStatus from dvsim.testplan import Result _REGEX_REMOVE = [ @@ -76,48 +76,47 @@ class SimResults: holding all failing tests with the same signature. """ - def __init__(self, items, results) -> None: + def __init__(self, results: Sequence[CompletedJobStatus]) -> None: self.table = [] self.buckets = collections.defaultdict(list) self._name_to_row = {} - for item in items: - self._add_item(item, results) + for job_status in results: + self._add_item(job_status=job_status) - def _add_item(self, item, results: Mapping[str, CompletedJobStatus]) -> None: + def _add_item(self, job_status: CompletedJobStatus) -> None: """Recursively add a single item to the table of results.""" - job_status = results[item.full_name] if job_status.status in ["F", "K"]: bucket = self._bucketize(job_status.fail_msg.message) self.buckets[bucket].append( ( - item, + job_status, job_status.fail_msg.line_number, job_status.fail_msg.context, ), ) # Runs get added to the table directly - if item.target == "run": - self._add_run(item, job_status.status) + if job_status.target == "run": + self._add_run(job_status) - def _add_run(self, item, status) -> None: + def _add_run(self, job_status: CompletedJobStatus) -> None: """Add an entry to table for item.""" - row = self._name_to_row.get(item.name) + row = self._name_to_row.get(job_status.name) if row is None: row = Result( - item.name, - job_runtime=item.job_runtime, - simulated_time=item.simulated_time, + job_status.name, + job_runtime=job_status.job_runtime, + simulated_time=job_status.simulated_time, ) self.table.append(row) - self._name_to_row[item.name] = row + self._name_to_row[job_status.name] = row # Record the max job_runtime of all reseeds. - elif item.job_runtime > row.job_runtime: - row.job_runtime = item.job_runtime - row.simulated_time = item.simulated_time + elif job_status.job_runtime > row.job_runtime: + row.job_runtime = job_status.job_runtime + row.simulated_time = job_status.simulated_time - if status == "P": + if job_status.status == "P": row.passing += 1 row.total += 1 diff --git a/src/dvsim/testplan.py b/src/dvsim/testplan.py index e47de6f1..4b76f6c8 100644 --- a/src/dvsim/testplan.py +++ b/src/dvsim/testplan.py @@ -248,7 +248,7 @@ class Testplan: def _parse_hjson(filename): """Parses an input file with HJson and returns a dict.""" try: - return hjson.load(open(filename)) + return hjson.load(Path(filename).open()) except OSError: pass except hjson.scanner.HjsonDecodeError: From ebccaedc232b7da185f24d12713849699f24f22e Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Thu, 6 Nov 2025 17:19:59 +0000 Subject: [PATCH 2/3] chore: nix flake update Signed-off-by: James McCorrie --- flake.lock | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/flake.lock b/flake.lock index 86cca316..946349e6 100644 --- a/flake.lock +++ b/flake.lock @@ -20,11 +20,11 @@ }, "nixpkgs": { "locked": { - "lastModified": 1760423683, - "narHash": "sha256-Tb+NYuJhWZieDZUxN6PgglB16yuqBYQeMJyYBGCXlt8=", + "lastModified": 1762233356, + "narHash": "sha256-cGS3lLTYusbEP/IJIWGgnkzIl+FA5xDvtiHyjalGr4k=", "owner": "nixos", "repo": "nixpkgs", - "rev": "a493e93b4a259cd9fea8073f89a7ed9b1c5a1da2", + "rev": "ca534a76c4afb2bdc07b681dbc11b453bab21af8", "type": "github" }, "original": { @@ -36,11 +36,11 @@ }, "nixpkgs-unstable": { "locked": { - "lastModified": 1760524057, - "narHash": "sha256-EVAqOteLBFmd7pKkb0+FIUyzTF61VKi7YmvP1tw4nEw=", + "lastModified": 1762111121, + "narHash": "sha256-4vhDuZ7OZaZmKKrnDpxLZZpGIJvAeMtK6FKLJYUtAdw=", "owner": "nixos", "repo": "nixpkgs", - "rev": "544961dfcce86422ba200ed9a0b00dd4b1486ec5", + "rev": "b3d51a0365f6695e7dd5cdf3e180604530ed33b4", "type": "github" }, "original": { @@ -63,11 +63,11 @@ ] }, "locked": { - "lastModified": 1759113590, - "narHash": "sha256-fgxP2RCN4cg0jYiMYoETYc7TZ2JjgyvJa2y9l8oSUFE=", + "lastModified": 1761781027, + "narHash": "sha256-YDvxPAm2WnxrznRqWwHLjryBGG5Ey1ATEJXrON+TWt8=", "owner": "pyproject-nix", "repo": "build-system-pkgs", - "rev": "dbfc0483b5952c6b86e36f8b3afeb9dde30ea4b5", + "rev": "795a980d25301e5133eca37adae37283ec3c8e66", "type": "github" }, "original": { @@ -83,11 +83,11 @@ ] }, "locked": { - "lastModified": 1760402624, - "narHash": "sha256-jF6UKLs2uGc2rtved8Vrt58oTWjTQoAssuYs/0578Z4=", + "lastModified": 1762427963, + "narHash": "sha256-CkPlAbIQ87wmjy5qHibfzk4DmMGBNqFer+lLfXjpP5M=", "owner": "nix-community", "repo": "pyproject.nix", - "rev": "84c4ea102127c77058ea1ed7be7300261fafc7d2", + "rev": "4540ea004e04fcd12dd2738d51383d10f956f7b9", "type": "github" }, "original": { @@ -131,11 +131,11 @@ ] }, "locked": { - "lastModified": 1760161183, - "narHash": "sha256-1USClOZthg+pGJp+p3ouVtTMO+ZY8Cd0+FbsNN/RpO8=", + "lastModified": 1761872265, + "narHash": "sha256-i25GRgp2vUOebY70L3NTAgkd+Pr1hnn5xM3qHxH0ONU=", "owner": "pyproject-nix", "repo": "uv2nix", - "rev": "b6ed0901aec29583532abe65117b18d86a49b617", + "rev": "74dfb62871be152ad3673b143b0cc56105a4f3c5", "type": "github" }, "original": { From 7bd5fba3e36a00852e43eaccfe20db57a13dd211 Mon Sep 17 00:00:00 2001 From: James McCorrie Date: Fri, 7 Nov 2025 17:04:33 +0000 Subject: [PATCH 3/3] refactor: migrate from Depoy.dump to JobSpec.model_dump Now we have a Pydantic model that represents the full data requirement of the scheduler, we can use the builtin `JobSpec.model_dump()` instead of the custom `Deploy.dump()`. At the moment this model has to contain a couple of callback functions which cannot be serialised, so these attributes are excluded. This commit does change the format of the dumped "deployment" objects file. So if this is being used to check for breaking changes, then hashes need to be compared against one generated from this commit from now on. Signed-off-by: James McCorrie --- src/dvsim/flow/base.py | 15 ++++++++++++--- src/dvsim/job/deploy.py | 19 ------------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index 6f2389ac..2a12980a 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -417,21 +417,30 @@ def deploy_objects(self) -> Sequence[CompletedJobStatus]: log.error("Nothing to run!") sys.exit(1) + jobs = [d.get_job_spec() for d in deploy] + if os.environ.get("DVSIM_DEPLOY_DUMP", "true"): filename = f"deploy_{self.branch}_{self.timestamp}.json" (Path(self.scratch_root) / filename).write_text( json.dumps( # Sort on full name to ensure consistent ordering sorted( - [d.dump() for d in deploy], - key=lambda d: d["full_name"], + [ + j.model_dump( + # callback functions can't be serialised + exclude={"pre_launch", "post_finish"}, + mode="json", + ) + for j in jobs + ], + key=lambda j: j["full_name"], ), indent=2, ), ) return Scheduler( - items=[d.get_job_spec() for d in deploy], + items=jobs, launcher_cls=get_launcher_cls(), interactive=self.interactive, ).run() diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index f7a82bd0..c620ddbd 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -350,25 +350,6 @@ def callback(status: str) -> None: def get_timeout_mins(self) -> float | None: """Return the timeout in minutes.""" - def dump(self) -> Mapping: - """Dump the deployment object to mapping object. - - Returns: - Representation of a deployment object as a dict. - - """ - job_spec = self.get_job_spec() - return { - "full_name": job_spec.full_name, - "type": self.__class__.__name__, - "exports": job_spec.exports, - "interactive": job_spec.interactive, - "log_path": str(job_spec.log_path), - "timeout_mins": job_spec.timeout_mins, - "cmd": job_spec.cmd, - "gui": job_spec.gui, - } - class CompileSim(Deploy): """Abstraction for building the simulation executable."""