Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions src/dvsim/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@
from abc import ABC, abstractmethod
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import ClassVar
from typing import TYPE_CHECKING, ClassVar

import hjson

from dvsim.flow.hjson import set_target_attribute
from dvsim.job.deploy import Deploy
from dvsim.launcher.factory import get_launcher_cls
from dvsim.logging import log
from dvsim.scheduler import Scheduler
from dvsim.scheduler import CompletedJobStatus, Scheduler
from dvsim.utils import (
find_and_substitute_wildcards,
md_results_to_html,
Expand All @@ -28,6 +27,11 @@
subst_wildcards,
)

if TYPE_CHECKING:
from dvsim.job.deploy import Deploy

__all__ = ("FlowCfg",)


# Interface class for extensions.
class FlowCfg(ABC):
Expand Down Expand Up @@ -399,7 +403,7 @@ def create_deploy_objects(self) -> None:
for item in self.cfgs:
item._create_deploy_objects()

def deploy_objects(self) -> Mapping[Deploy, str]:
def deploy_objects(self) -> Mapping[str, CompletedJobStatus]:
"""Public facing API for deploying all available objects.

Runs each job and returns a map from item to status.
Expand Down Expand Up @@ -432,7 +436,7 @@ def deploy_objects(self) -> Mapping[Deploy, str]:
).run()

@abstractmethod
def _gen_results(self, results: Mapping[Deploy, str]) -> str:
def _gen_results(self, results: Mapping[str, CompletedJobStatus]) -> str:
"""Generate flow results.

The function is called after the flow has completed. It collates
Expand All @@ -441,13 +445,16 @@ def _gen_results(self, results: Mapping[Deploy, str]) -> str:
prints the full list of failures for debug / triage to the final
report, which is in markdown format.

results should be a dictionary mapping deployed item to result.
Results:
dictionary mapping deployed item names to job status.

"""

def gen_results(self, results: Mapping[Deploy, str]) -> None:
def gen_results(self, results: Mapping[str, CompletedJobStatus]) -> None:
"""Public facing API for _gen_results().

results should be a dictionary mapping deployed item to result.
Args:
results: dictionary mapping deployed item names to job status.

"""
for item in self.cfgs:
Expand Down
3 changes: 2 additions & 1 deletion src/dvsim/flow/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pathlib
import sys

from dvsim.flow.base import FlowCfg
from dvsim.flow.cdc import CdcCfg
from dvsim.flow.formal import FormalCfg
from dvsim.flow.hjson import load_hjson
Expand Down Expand Up @@ -105,7 +106,7 @@ def _make_child_cfg(path, args, initial_values):
return cls(path, hjson_data, args, None)


def make_cfg(path, args, proj_root):
def make_cfg(path, args, proj_root) -> FlowCfg:
"""Make a flow config by loading the config file at path.

args is the arguments passed to the dvsim.py tool and proj_root is the top
Expand Down
2 changes: 1 addition & 1 deletion src/dvsim/flow/sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ def create_bucket_report(buckets):

# Append coverage results if coverage was enabled.
if self.cov_report_deploy is not None:
report_status = results[self.cov_report_deploy.qual_name]
report_status = results[self.cov_report_deploy.full_name]
if report_status == "P":
results_str += "\n## Coverage Results\n"
# Link the dashboard page using "cov_report_page" value.
Expand Down
13 changes: 7 additions & 6 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def __init__(self, sim_cfg: "SimCfg") -> None:
# Cross ref the whole cfg object for ease.
self.sim_cfg = sim_cfg
self.flow = sim_cfg.name
self._variant_suffix = f"_{self.sim_cfg.variant}" if self.sim_cfg.variant else ""

# A list of jobs on which this job depends.
self.dependencies = []
Expand Down Expand Up @@ -166,7 +167,7 @@ def _set_attrs(self) -> None:

# Full name disambiguates across multiple cfg being run (example:
# 'aes:default', 'uart:default' builds.
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"

# Job name is used to group the job by cfg and target. The scratch path
# directory name is assumed to be uniquified, in case there are more
Expand Down Expand Up @@ -574,7 +575,7 @@ def _set_attrs(self) -> None:
self.test = self.name
self.build_mode = self.test_obj.build_mode.name
self.qual_name = self.run_dir_name + "." + str(self.seed)
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"
self.job_name += f"_{self.build_mode}"
if self.sim_cfg.cov:
self.output_dirs += [self.cov_db_dir]
Expand Down Expand Up @@ -679,7 +680,7 @@ def _define_attrs(self) -> None:
def _set_attrs(self) -> None:
super()._set_attrs()
self.qual_name = self.target
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"
self.input_dirs += [self.cov_merge_db_dir]

# Reuse the build_fail_patterns set in the HJson.
Expand Down Expand Up @@ -734,7 +735,7 @@ def _define_attrs(self) -> None:
def _set_attrs(self) -> None:
super()._set_attrs()
self.qual_name = self.target
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"

# For merging coverage db, the precise output dir is set in the HJson.
self.odir = self.cov_merge_db_dir
Expand Down Expand Up @@ -768,7 +769,7 @@ def _define_attrs(self) -> None:
def _set_attrs(self) -> None:
super()._set_attrs()
self.qual_name = self.target
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"

# Keep track of coverage results, once the job is finished.
self.cov_total = ""
Expand Down Expand Up @@ -819,5 +820,5 @@ def _define_attrs(self) -> None:
def _set_attrs(self) -> None:
super()._set_attrs()
self.qual_name = self.target
self.full_name = self.sim_cfg.name + ":" + self.qual_name
self.full_name = f"{self.sim_cfg.name}{self._variant_suffix}:{self.qual_name}"
self.input_dirs += [self.cov_merge_db_dir]
24 changes: 24 additions & 0 deletions src/dvsim/launcher/fake.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

"""Fake Launcher that returns random results."""

from collections.abc import Mapping
from random import choice, random
from typing import TYPE_CHECKING

Expand All @@ -12,6 +13,8 @@
if TYPE_CHECKING:
from dvsim.job.deploy import CovReport, Deploy, RunTest

__all__ = ("FakeLauncher",)


def _run_test_handler(deploy: "RunTest") -> str:
"""Handle a RunTest deploy job."""
Expand Down Expand Up @@ -70,3 +73,24 @@ def kill(self) -> None:
"K",
ErrorMessage(line_number=None, message="Job killed!", context=[]),
)

@staticmethod
def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None:
"""Prepare the workspace based on the chosen launcher's needs.

This is done once for the entire duration for the flow run.

Args:
project: the name of the project.
repo_top: the path to the repository.
args: command line args passed to dvsim.

"""

@staticmethod
def prepare_workspace_for_cfg(cfg: Mapping) -> None:
"""Prepare the workspace for a cfg.

This is invoked once for each cfg.
'cfg' is the flow configuration object.
"""
56 changes: 28 additions & 28 deletions src/dvsim/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ def get_next_item(arr: Sequence, index: int) -> tuple[Any, int]:
is already the last item on the list, it loops back to the start, thus
implementing a circular list.

arr is a subscriptable list.
index is the index of the last item returned.
Args:
arr: subscriptable list.
index: index of the last item returned.

Returns:
(item, index) if successful.

Raises:
IndexError if arr is empty.

Returns (item, index) if successful.
Raises IndexError if arr is empty.
"""
index += 1
try:
Expand Down Expand Up @@ -95,7 +100,7 @@ def __init__(
# they wait until slots are available for them to be dispatched.
# When all items (in all cfgs) of a target are done, it is removed from
# this dictionary.
self._scheduled: MutableMapping[str, MutableMapping[FlowCfg, MutableSequence[Deploy]]] = {}
self._scheduled: MutableMapping[str, MutableMapping[str, MutableSequence[Deploy]]] = {}
self.add_to_scheduled(items)

# Print status periodically using an external status printer.
Expand Down Expand Up @@ -143,20 +148,15 @@ def __init__(
msg = self.msg_fmt.format(0, 0, 0, 0, 0, self._total[target])
self.status_printer.init_target(target=target, msg=msg)

# A map from the Deploy objects tracked by this class to their
# A map from the Deploy object names tracked by this class to their
# current status. This status is 'Q', 'D', 'P', 'F' or 'K',
# corresponding to membership in the dicts above. This is not
# per-target.
self.item_to_status: MutableMapping[Deploy, str] = {}

# TODO: Why is the deployment object asked about which launcher to use when
# the launcher class is explicitly passed. Either each deployment can have it's
# own distinct Launcher class type or all deployments must have the same
# Launcher class? Both can't be true.
self.item_status: MutableMapping[str, str] = {}

# Create the launcher instance for all items.
self._launchers: Mapping[str, Launcher] = {
item.qual_name: launcher_cls(item) for item in self.items
item.full_name: launcher_cls(item) for item in self.items
}

# The chosen launcher class. This allows us to access launcher
Expand Down Expand Up @@ -229,11 +229,11 @@ def on_signal(signal_received: int, _: FrameType | None) -> None:

# We got to the end without anything exploding. Return the results.
return {
d.qual_name: CompletedJobStatus(
name: CompletedJobStatus(
status=status,
fail_msg=self._launchers[d.qual_name].fail_msg,
fail_msg=self._launchers[name].fail_msg,
)
for d, status in self.item_to_status.items()
for name, status in self.item_status.items()
}

def add_to_scheduled(self, items: Sequence[Deploy]) -> None:
Expand Down Expand Up @@ -271,9 +271,9 @@ def _enqueue_successors(self, item: Deploy | None = None) -> None:
them to _queued.
"""
for next_item in self._get_successors(item):
assert next_item not in self.item_to_status
assert next_item.full_name not in self.item_status
assert next_item not in self._queued[next_item.target]
self.item_to_status[next_item] = "Q"
self.item_status[next_item.full_name] = "Q"
self._queued[next_item.target].append(next_item)
self._unschedule_item(next_item)

Expand Down Expand Up @@ -356,11 +356,11 @@ def _ok_to_enqueue(self, item: Deploy) -> bool:
continue

# Has the dep even been enqueued?
if dep not in self.item_to_status:
if dep.full_name not in self.item_status:
return False

# Has the dep completed?
if self.item_to_status[dep] not in ["P", "F", "K"]:
if self.item_status[dep.full_name] not in ["P", "F", "K"]:
return False

return True
Expand All @@ -379,7 +379,7 @@ def _ok_to_run(self, item: Deploy) -> bool:
if dep not in self.items:
continue

dep_status = self.item_to_status[dep]
dep_status = self.item_status[dep.full_name]
if dep_status not in ["P", "F", "K"]:
raise ValueError("Status must be one of P, F, or K")

Expand Down Expand Up @@ -421,7 +421,7 @@ def _poll(self, hms: str) -> bool:
self._running[target],
self.last_item_polled_idx[target],
)
status = self._launchers[item.qual_name].poll()
status = self._launchers[item.full_name].poll()
level = log.VERBOSE

if status not in ["D", "P", "F", "E", "K"]:
Expand All @@ -445,7 +445,7 @@ def _poll(self, hms: str) -> bool:

self._running[target].pop(self.last_item_polled_idx[target])
self.last_item_polled_idx[target] -= 1
self.item_to_status[item] = status
self.item_status[item.full_name] = status

log.log(
level,
Expand Down Expand Up @@ -532,7 +532,7 @@ def _dispatch(self, hms: str) -> None:

for item in to_dispatch:
try:
self._launchers[item.qual_name].launch()
self._launchers[item.full_name].launch()

except LauncherError:
log.exception("Error launching %s", item)
Expand All @@ -552,7 +552,7 @@ def _dispatch(self, hms: str) -> None:
continue

self._running[target].append(item)
self.item_to_status[item] = "D"
self.item_status[item.full_name] = "D"

def _kill(self) -> None:
"""Kill any running items and cancel any that are waiting."""
Expand Down Expand Up @@ -616,7 +616,7 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None:
Supplied item may be in _scheduled list or the _queued list. From
either, we move it straight to _killed.
"""
self.item_to_status[item] = "K"
self.item_status[item.full_name] = "K"
self._killed[item.target].add(item)
if item in self._queued[item.target]:
self._queued[item.target].remove(item)
Expand All @@ -628,8 +628,8 @@ def _cancel_item(self, item: Deploy, *, cancel_successors: bool = True) -> None:

def _kill_item(self, item: Deploy) -> None:
"""Kill a running item and cancel all of its successors."""
self._launchers[item.qual_name].kill()
self.item_to_status[item] = "K"
self._launchers[item.full_name].kill()
self.item_status[item.full_name] = "K"
self._killed[item.target].add(item)
self._running[item.target].remove(item)
self._cancel_successors(item)
2 changes: 1 addition & 1 deletion src/dvsim/sim_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def __init__(self, items, results) -> None:

def _add_item(self, item, results: Mapping[str, CompletedJobStatus]) -> None:
"""Recursively add a single item to the table of results."""
job_status = results[item.qual_name]
job_status = results[item.full_name]
if job_status.status in ["F", "K"]:
bucket = self._bucketize(job_status.fail_msg.message)
self.buckets[bucket].append(
Expand Down
Loading