diff --git a/src/dvsim/flow/base.py b/src/dvsim/flow/base.py index a54bed3..a84ae67 100644 --- a/src/dvsim/flow/base.py +++ b/src/dvsim/flow/base.py @@ -11,15 +11,14 @@ from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from pathlib import Path -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar import hjson from dvsim.flow.hjson import set_target_attribute -from dvsim.job.deploy import Deploy from dvsim.launcher.factory import get_launcher_cls from dvsim.logging import log -from dvsim.scheduler import Scheduler +from dvsim.scheduler import CompletedJobStatus, Scheduler from dvsim.utils import ( find_and_substitute_wildcards, md_results_to_html, @@ -28,6 +27,11 @@ subst_wildcards, ) +if TYPE_CHECKING: + from dvsim.job.deploy import Deploy + +__all__ = ("FlowCfg",) + # Interface class for extensions. class FlowCfg(ABC): @@ -399,7 +403,7 @@ def create_deploy_objects(self) -> None: for item in self.cfgs: item._create_deploy_objects() - def deploy_objects(self) -> Mapping[Deploy, str]: + def deploy_objects(self) -> Mapping[str, CompletedJobStatus]: """Public facing API for deploying all available objects. Runs each job and returns a map from item to status. @@ -432,7 +436,7 @@ def deploy_objects(self) -> Mapping[Deploy, str]: ).run() @abstractmethod - def _gen_results(self, results: Mapping[Deploy, str]) -> str: + def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str: """Generate flow results. The function is called after the flow has completed. It collates @@ -441,13 +445,16 @@ def _gen_results(self, results: Mapping[Deploy, str]) -> str: prints the full list of failures for debug / triage to the final report, which is in markdown format. - results should be a dictionary mapping deployed item to result. + Results: + dictionary mapping deployed item names to job status. + """ - def gen_results(self, results: Mapping[Deploy, str]) -> None: + def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None: """Public facing API for _gen_results(). - results should be a dictionary mapping deployed item to result. + Args: + results: dictionary mapping deployed item names to job status. """ for item in self.cfgs: diff --git a/src/dvsim/flow/factory.py b/src/dvsim/flow/factory.py index 844462a..11b88a2 100644 --- a/src/dvsim/flow/factory.py +++ b/src/dvsim/flow/factory.py @@ -5,6 +5,7 @@ import pathlib import sys +from dvsim.flow.base import FlowCfg from dvsim.flow.cdc import CdcCfg from dvsim.flow.formal import FormalCfg from dvsim.flow.hjson import load_hjson @@ -105,7 +106,7 @@ def _make_child_cfg(path, args, initial_values): return cls(path, hjson_data, args, None) -def make_cfg(path, args, proj_root): +def make_cfg(path, args, proj_root) -> FlowCfg: """Make a flow config by loading the config file at path. args is the arguments passed to the dvsim.py tool and proj_root is the top diff --git a/src/dvsim/flow/sim.py b/src/dvsim/flow/sim.py index 0cd2697..e492a9e 100644 --- a/src/dvsim/flow/sim.py +++ b/src/dvsim/flow/sim.py @@ -887,7 +887,7 @@ def create_bucket_report(buckets): # Append coverage results if coverage was enabled. if self.cov_report_deploy is not None: - report_status = results[self.cov_report_deploy.qual_name] + report_status = results[self.cov_report_deploy.full_name] if report_status == "P": results_str += "\n## Coverage Results\n" # Link the dashboard page using "cov_report_page" value. diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index 5e1c509..c570b7b 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -70,6 +70,7 @@ def __init__(self, sim_cfg: "SimCfg") -> None: # Cross ref the whole cfg object for ease. self.sim_cfg = sim_cfg self.flow = sim_cfg.name + self._variant_suffix = f"_{self.sim_cfg.variant}" if self.sim_cfg.variant else "" # A list of jobs on which this job depends. self.dependencies = [] @@ -166,7 +167,7 @@ def _set_attrs(self) -> None: # Full name disambiguates across multiple cfg being run (example: # 'aes:default', 'uart:default' builds. - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" # Job name is used to group the job by cfg and target. The scratch path # directory name is assumed to be uniquified, in case there are more @@ -574,7 +575,7 @@ def _set_attrs(self) -> None: self.test = self.name self.build_mode = self.test_obj.build_mode.name self.qual_name = self.run_dir_name + "." + str(self.seed) - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" self.job_name += f"_{self.build_mode}" if self.sim_cfg.cov: self.output_dirs += [self.cov_db_dir] @@ -679,7 +680,7 @@ def _define_attrs(self) -> None: def _set_attrs(self) -> None: super()._set_attrs() self.qual_name = self.target - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" self.input_dirs += [self.cov_merge_db_dir] # Reuse the build_fail_patterns set in the HJson. @@ -734,7 +735,7 @@ def _define_attrs(self) -> None: def _set_attrs(self) -> None: super()._set_attrs() self.qual_name = self.target - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" # For merging coverage db, the precise output dir is set in the HJson. self.odir = self.cov_merge_db_dir @@ -768,7 +769,7 @@ def _define_attrs(self) -> None: def _set_attrs(self) -> None: super()._set_attrs() self.qual_name = self.target - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" # Keep track of coverage results, once the job is finished. self.cov_total = "" @@ -819,5 +820,5 @@ def _define_attrs(self) -> None: def _set_attrs(self) -> None: super()._set_attrs() self.qual_name = self.target - self.full_name = self.sim_cfg.name + ":" + self.qual_name + self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}" self.input_dirs += [self.cov_merge_db_dir] diff --git a/src/dvsim/launcher/fake.py b/src/dvsim/launcher/fake.py index 7ad0317..8bd3844 100644 --- a/src/dvsim/launcher/fake.py +++ b/src/dvsim/launcher/fake.py @@ -4,6 +4,7 @@ """Fake Launcher that returns random results.""" +from collections.abc import Mapping from random import choice, random from typing import TYPE_CHECKING @@ -12,6 +13,8 @@ if TYPE_CHECKING: from dvsim.job.deploy import CovReport, Deploy, RunTest +__all__ = ("FakeLauncher",) + def _run_test_handler(deploy: "RunTest") -> str: """Handle a RunTest deploy job.""" @@ -70,3 +73,24 @@ def kill(self) -> None: "K", ErrorMessage(line_number=None, message="Job killed!", context=[]), ) + + @staticmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + """Prepare the workspace based on the chosen launcher's needs. + + This is done once for the entire duration for the flow run. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + + """ + + @staticmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + """ diff --git a/src/dvsim/scheduler.py b/src/dvsim/scheduler.py index 85a6470..dc5bf0d 100644 --- a/src/dvsim/scheduler.py +++ b/src/dvsim/scheduler.py @@ -56,11 +56,16 @@ def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]: is already the last item on the list, it loops back to the start, thus implementing a circular list. - arr is a subscriptable list. - index is the index of the last item returned. + Args: + arr: subscriptable list. + index: index of the last item returned. + + Returns: + (item, index) if successful. + + Raises: + IndexError if arr is empty. - Returns (item, index) if successful. - Raises IndexError if arr is empty. """ index += 1 try: @@ -95,7 +100,7 @@ def __init__( # they wait until slots are available for them to be dispatched. # When all items (in all cfgs) of a target are done, it is removed from # this dictionary. - self._scheduled: MutableMapping[str, MutableMapping[FlowCfg, MutableSequence[Deploy]]] = {} + self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[Deploy]]] = {} self.add_to_scheduled(items) # Print status periodically using an external status printer. @@ -143,20 +148,15 @@ def __init__( msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target]) self.status_printer.init_target(target=target, msg=msg) - # A map from the Deploy objects tracked by this class to their + # A map from the Deploy object names tracked by this class to their # current status. This status is 'Q', 'D', 'P', 'F' or 'K', # corresponding to membership in the dicts above. This is not # per-target. - self.item_to_status: MutableMapping[Deploy, str] = {} - - # TODO: Why is the deployment object asked about which launcher to use when - # the launcher class is explicitly passed. Either each deployment can have it's - # own distinct Launcher class type or all deployments must have the same - # Launcher class? Both can't be true. + self.item_status: MutableMapping[str, str] = {} # Create the launcher instance for all items. self._launchers: Mapping[str, Launcher] = { - item.qual_name: launcher_cls(item) for item in self.items + item.full_name: launcher_cls(item) for item in self.items } # The chosen launcher class. This allows us to access launcher @@ -229,11 +229,11 @@ def on_signal(signal_received: int, _: FrameType | None) -> None: # We got to the end without anything exploding. Return the results. return { - d.qual_name: CompletedJobStatus( + name: CompletedJobStatus( status=status, - fail_msg=self._launchers[d.qual_name].fail_msg, + fail_msg=self._launchers[name].fail_msg, ) - for d, status in self.item_to_status.items() + for name, status in self.item_status.items() } def add_to_scheduled(self, items: Sequence[Deploy]) -> None: @@ -271,9 +271,9 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None: them to _queued. """ for next_item in self._get_successors(item): - assert next_item not in self.item_to_status + assert next_item.full_name not in self.item_status assert next_item not in self._queued[next_item.target] - self.item_to_status[next_item] = "Q" + self.item_status[next_item.full_name] = "Q" self._queued[next_item.target].append(next_item) self._unschedule_item(next_item) @@ -356,11 +356,11 @@ def _ok_to_enqueue(self, item: Deploy) -> bool: continue # Has the dep even been enqueued? - if dep not in self.item_to_status: + if dep.full_name not in self.item_status: return False # Has the dep completed? - if self.item_to_status[dep] not in ["P", "F", "K"]: + if self.item_status[dep.full_name] not in ["P", "F", "K"]: return False return True @@ -379,7 +379,7 @@ def _ok_to_run(self, item: Deploy) -> bool: if dep not in self.items: continue - dep_status = self.item_to_status[dep] + dep_status = self.item_status[dep.full_name] if dep_status not in ["P", "F", "K"]: raise ValueError("Status must be one of P, F, or K") @@ -421,7 +421,7 @@ def _poll(self, hms: str) -> bool: self._running[target], self.last_item_polled_idx[target], ) - status = self._launchers[item.qual_name].poll() + status = self._launchers[item.full_name].poll() level = log.VERBOSE if status not in ["D", "P", "F", "E", "K"]: @@ -445,7 +445,7 @@ def _poll(self, hms: str) -> bool: self._running[target].pop(self.last_item_polled_idx[target]) self.last_item_polled_idx[target] -= 1 - self.item_to_status[item] = status + self.item_status[item.full_name] = status log.log( level, @@ -532,7 +532,7 @@ def _dispatch(self, hms: str) -> None: for item in to_dispatch: try: - self._launchers[item.qual_name].launch() + self._launchers[item.full_name].launch() except LauncherError: log.exception("Error launching %s", item) @@ -552,7 +552,7 @@ def _dispatch(self, hms: str) -> None: continue self._running[target].append(item) - self.item_to_status[item] = "D" + self.item_status[item.full_name] = "D" def _kill(self) -> None: """Kill any running items and cancel any that are waiting.""" @@ -616,7 +616,7 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: Supplied item may be in _scheduled list or the _queued list. From either, we move it straight to _killed. """ - self.item_to_status[item] = "K" + self.item_status[item.full_name] = "K" self._killed[item.target].add(item) if item in self._queued[item.target]: self._queued[item.target].remove(item) @@ -628,8 +628,8 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None: def _kill_item(self, item: Deploy) -> None: """Kill a running item and cancel all of its successors.""" - self._launchers[item.qual_name].kill() - self.item_to_status[item] = "K" + self._launchers[item.full_name].kill() + self.item_status[item.full_name] = "K" self._killed[item.target].add(item) self._running[item.target].remove(item) self._cancel_successors(item) diff --git a/src/dvsim/sim_results.py b/src/dvsim/sim_results.py index 946a9dc..7b4765f 100644 --- a/src/dvsim/sim_results.py +++ b/src/dvsim/sim_results.py @@ -85,7 +85,7 @@ def __init__(self, items, results) -> None: def _add_item(self, item, results: Mapping[str, CompletedJobStatus]) -> None: """Recursively add a single item to the table of results.""" - job_status = results[item.qual_name] + job_status = results[item.full_name] if job_status.status in ["F", "K"]: bucket = self._bucketize(job_status.fail_msg.message) self.buckets[bucket].append(