diff --git a/src/dvsim/job/deploy.py b/src/dvsim/job/deploy.py index c570b7ba..53944224 100644 --- a/src/dvsim/job/deploy.py +++ b/src/dvsim/job/deploy.py @@ -306,7 +306,7 @@ def get_log_path(self) -> str: def get_timeout_mins(self) -> float | None: """Return the timeout in minutes.""" - def extract_info_from_log(self, log_text: list) -> None: + def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: """Extract information pertaining to the job from its log. This method parses the log text after the job has completed, for the @@ -326,7 +326,7 @@ def extract_info_from_log(self, log_text: list) -> None: self.job_runtime.set(time, unit) except RuntimeError as e: log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.") - self.job_runtime.set(self.launcher.job_runtime_secs, "s") + self.job_runtime.set(job_runtime_secs, "s") def model_dump(self) -> Mapping: """Dump the deployment object to mapping object. @@ -632,12 +632,17 @@ def get_timeout_mins(self): """ return self.run_timeout_mins if self.run_timeout_mins is not None else 60 - def extract_info_from_log(self, log_text: list) -> None: + def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None: """Extract the time the design was simulated for, from the log.""" - super().extract_info_from_log(log_text) + super().extract_info_from_log( + job_runtime_secs=job_runtime_secs, + log_text=log_text, + ) + try: time, unit = get_simulated_time(log_text, self.sim_cfg.tool) self.simulated_time.set(time, unit) + except RuntimeError as e: log.debug(f"{self.full_name}: {e}") diff --git a/src/dvsim/launcher/base.py b/src/dvsim/launcher/base.py index e19a8225..f5bc73c7 100644 --- a/src/dvsim/launcher/base.py +++ b/src/dvsim/launcher/base.py @@ -90,6 +90,48 @@ class Launcher(ABC): context=[], ) + def __init__(self, deploy: "Deploy") -> None: + """Initialise launcher. + + Args: + deploy: deployment object that will be launched. + + """ + cfg = deploy.sim_cfg + + # One-time preparation of the workspace. + if not Launcher.workspace_prepared: + # TODO: CLI args should be processed far earlier than this + self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args) + Launcher.workspace_prepared = True + + # One-time preparation of the workspace, specific to the cfg. + if cfg not in Launcher.workspace_prepared_for_cfg: + self.prepare_workspace_for_cfg(cfg) + Launcher.workspace_prepared_for_cfg.add(cfg) + + # Store the deploy object handle. + self.deploy = deploy + + # Status of the job. This is primarily determined by the + # _check_status() method, but eventually updated by the _post_finish() + # method, in case any of the cleanup tasks fails. This value is finally + # returned to the Scheduler by the poll() method. + self.status = None + + # Return status of the process running the job. + self.exit_code = None + + # Flag to indicate whether to 'overwrite' if odir already exists, + # or to backup the existing one and create a new one. + # For builds, we want to overwrite existing to leverage the tools' + # incremental / partition compile features. For runs, we may want to + # create a new one. + self.renew_odir = False + + # The actual job runtime computed by dvsim, in seconds. + self.job_runtime_secs = 0 + @staticmethod def set_pyvenv(project: str) -> None: """Activate a python virtualenv if available. @@ -147,48 +189,6 @@ def __str__(self) -> str: """Get a string representation.""" return self.deploy.full_name + ":launcher" - def __init__(self, deploy: "Deploy") -> None: - """Initialise launcher. - - Args: - deploy: deployment object that will be launched. - - """ - cfg = deploy.sim_cfg - - # One-time preparation of the workspace. - if not Launcher.workspace_prepared: - # TODO: CLI args should be processed far earlier than this - self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args) - Launcher.workspace_prepared = True - - # One-time preparation of the workspace, specific to the cfg. - if cfg not in Launcher.workspace_prepared_for_cfg: - self.prepare_workspace_for_cfg(cfg) - Launcher.workspace_prepared_for_cfg.add(cfg) - - # Store the deploy object handle. - self.deploy = deploy - - # Status of the job. This is primarily determined by the - # _check_status() method, but eventually updated by the _post_finish() - # method, in case any of the cleanup tasks fails. This value is finally - # returned to the Scheduler by the poll() method. - self.status = None - - # Return status of the process running the job. - self.exit_code = None - - # Flag to indicate whether to 'overwrite' if odir already exists, - # or to backup the existing one and create a new one. - # For builds, we want to overwrite existing to leverage the tools' - # incremental / partition compile features. For runs, we may want to - # create a new one. - self.renew_odir = False - - # The actual job runtime computed by dvsim, in seconds. - self.job_runtime_secs = 0 - def _make_odir(self) -> None: """Create the output directory.""" # If renew_odir flag is True - then move it. @@ -318,7 +318,10 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None: # Since the log file is already opened and read to assess the job's # status, use this opportunity to also extract other pieces of # information. - self.deploy.extract_info_from_log(lines) + self.deploy.extract_info_from_log( + job_runtime_secs=self.job_runtime_secs, + log_text=lines, + ) if chk_failed or chk_passed: for cnt, line in enumerate(lines): diff --git a/src/dvsim/launcher/local.py b/src/dvsim/launcher/local.py index eecc3ce6..8ea688ce 100644 --- a/src/dvsim/launcher/local.py +++ b/src/dvsim/launcher/local.py @@ -8,6 +8,7 @@ import os import shlex import subprocess +from collections.abc import Mapping from pathlib import Path from typing import TYPE_CHECKING @@ -181,3 +182,24 @@ def _close_job_log_file(self) -> None: """Close the file descriptors associated with the process.""" if self._log_file: self._log_file.close() + + @staticmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + """Prepare the workspace based on the chosen launcher's needs. + + This is done once for the entire duration for the flow run. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + + """ + + @staticmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + """ diff --git a/src/dvsim/launcher/nc.py b/src/dvsim/launcher/nc.py index ea361551..2a5af9f7 100644 --- a/src/dvsim/launcher/nc.py +++ b/src/dvsim/launcher/nc.py @@ -2,11 +2,14 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 +"""Altair nc job launcher.""" + import datetime import os import pathlib import subprocess import sys +from collections.abc import Mapping from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log @@ -261,3 +264,24 @@ def _close_process(self) -> None: assert self.process if self.process.stdout: self.process.stdout.close() + + @staticmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + """Prepare the workspace based on the chosen launcher's needs. + + This is done once for the entire duration for the flow run. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + + """ + + @staticmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + """ diff --git a/src/dvsim/launcher/sge/launcher.py b/src/dvsim/launcher/sge/launcher.py index d8eb435d..0d759762 100644 --- a/src/dvsim/launcher/sge/launcher.py +++ b/src/dvsim/launcher/sge/launcher.py @@ -8,6 +8,7 @@ import pathlib import shlex import subprocess +from collections.abc import Mapping from subprocess import PIPE, Popen from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError @@ -46,7 +47,9 @@ def _do_launch(self) -> None: self._dump_env_vars(exports) try: - f = open(self.deploy.get_log_path(), "w", encoding="UTF-8", errors="surrogateescape") + f = pathlib.Path(self.deploy.get_log_path()).open( + "w", encoding="UTF-8", errors="surrogateescape" + ) f.write(f"[Executing]:\n{self.deploy.cmd}\n\n") f.flush() # ---------- prepare SGE job struct ----- @@ -103,10 +106,12 @@ def poll(self): # ------------------------------------- # copy SGE jobb results to log file if pathlib.Path(self.deploy.get_log_path() + ".sge").exists(): - file1 = open(self.deploy.get_log_path() + ".sge", errors="replace") + file1 = pathlib.Path(self.deploy.get_log_path() + ".sge").open(errors="replace") lines = file1.readlines() file1.close() - f = open(self.deploy.get_log_path(), "a", encoding="UTF-8", errors="surrogateescape") + f = pathlib.Path(self.deploy.get_log_path()).open( + "a", encoding="UTF-8", errors="surrogateescape" + ) f.writelines(lines) f.flush() pathlib.Path(self.deploy.get_log_path() + ".sge").unlink() @@ -160,3 +165,24 @@ def _close_process(self) -> None: assert self.process if self.process.stdout: self.process.stdout.close() + + @staticmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + """Prepare the workspace based on the chosen launcher's needs. + + This is done once for the entire duration for the flow run. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + + """ + + @staticmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + """ diff --git a/src/dvsim/launcher/slurm.py b/src/dvsim/launcher/slurm.py index de7aa8d7..e631a556 100644 --- a/src/dvsim/launcher/slurm.py +++ b/src/dvsim/launcher/slurm.py @@ -7,6 +7,7 @@ import shlex import shutil import subprocess +from collections.abc import Mapping from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError from dvsim.logging import log @@ -59,7 +60,7 @@ def _do_launch(self) -> None: ) try: - with open(self.slurm_log_file, "w") as out_file: + with pathlib.Path(self.slurm_log_file).open("w") as out_file: out_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n") out_file.flush() @@ -99,9 +100,9 @@ def poll(self): # Copy slurm job results to log file if pathlib.Path(self.slurm_log_file).exists(): try: - with open(self.slurm_log_file) as slurm_file: + with pathlib.Path(self.slurm_log_file).open() as slurm_file: try: - with open(self.deploy.get_log_path(), "a") as out_file: + with pathlib.Path(self.deploy.get_log_path()).open("a") as out_file: shutil.copyfileobj(slurm_file, out_file) except OSError as e: msg = f"File Error: {e} when handling {self.deploy.get_log_path()}" @@ -146,3 +147,24 @@ def _close_process(self) -> None: assert self.process if self.process.stdout: self.process.stdout.close() + + @staticmethod + def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None: + """Prepare the workspace based on the chosen launcher's needs. + + This is done once for the entire duration for the flow run. + + Args: + project: the name of the project. + repo_top: the path to the repository. + args: command line args passed to dvsim. + + """ + + @staticmethod + def prepare_workspace_for_cfg(cfg: Mapping) -> None: + """Prepare the workspace for a cfg. + + This is invoked once for each cfg. + 'cfg' is the flow configuration object. + """ diff --git a/src/dvsim/sim_utils.py b/src/dvsim/sim_utils.py index 6df457c1..1c4f3149 100644 --- a/src/dvsim/sim_utils.py +++ b/src/dvsim/sim_utils.py @@ -2,26 +2,34 @@ # Licensed under the Apache License, Version 2.0, see LICENSE for details. # SPDX-License-Identifier: Apache-2.0 -"""This script provides common DV simulation specific utilities.""" +"""Common DV simulation specific utilities.""" import re from collections import OrderedDict +from pathlib import Path -# Capture the summary results as a list of lists. -# The text coverage report is passed as input to the function, in addition to -# the tool used. The tool returns a 2D list if the coverage report file was read -# and the coverage was extracted successfully. It returns a tuple of: -# List of metrics and values -# Final coverage total -# -# Raises the appropriate exception if the coverage summary extraction fails. -def get_cov_summary_table(cov_report_txt, tool): - with open(cov_report_txt) as f: +def get_cov_summary_table(cov_report_txt: Path, tool: str): + """Capture the summary results as a list of lists. + + The text coverage report is passed as input to the function, in addition to + the tool used. The tool returns a 2D list if the coverage report file was read + and the coverage was extracted successfully. + + Returns: + tuple of, List of metrics and values, and final coverage total + + Raises: + the appropriate exception if the coverage summary extraction fails. + + """ + with Path(cov_report_txt).open() as f: if tool == "xcelium": return xcelium_cov_summary_table(f) + if tool == "vcs": return vcs_cov_summary_table(f) + msg = f"{tool} is unsupported for cov extraction." raise NotImplementedError(msg) @@ -57,6 +65,7 @@ def xcelium_cov_summary_table(buf): items[metrics[i]]["total"] += int(m.group(2)) items["Score"]["covered"] += int(m.group(1)) items["Score"]["total"] += int(m.group(2)) + # Capture the percentages and the aggregate. values = [] cov_total = None @@ -69,6 +78,7 @@ def xcelium_cov_summary_table(buf): values.append(value) if metric == "Score": cov_total = value + return [items.keys(), values], cov_total # If we reached here, then we were unable to extract the coverage. @@ -102,17 +112,23 @@ def vcs_cov_summary_table(buf): def get_job_runtime(log_text: list, tool: str) -> tuple[float, str]: - """Returns the job runtime (wall clock time) along with its units. + """Return the job runtime (wall clock time) along with its units. EDA tools indicate how long the job ran in terms of CPU time in the log file. This method invokes the tool specific method which parses the log text and returns the runtime as a floating point value followed by its units as a tuple. - `log_text` is the job's log file contents as a list of lines. - `tool` is the EDA tool used to run the job. - Returns the runtime, units as a tuple. - Raises NotImplementedError exception if the EDA tool is not supported. + Args: + log_text: is the job's log file contents as a list of lines. + tool: is the EDA tool used to run the job. + + Returns: + the runtime, units as a tuple. + + Raises: + NotImplementedError: exception if the EDA tool is not supported. + """ if tool == "xcelium": return xcelium_job_runtime(log_text) @@ -123,15 +139,22 @@ def get_job_runtime(log_text: list, tool: str) -> tuple[float, str]: def vcs_job_runtime(log_text: list) -> tuple[float, str]: - """Returns the VCS job runtime (wall clock time) along with its units. + """Return the VCS job runtime (wall clock time) along with its units. Search pattern example: CPU time: 22.170 seconds to compile + .518 seconds to elab + 1.901 \ seconds to link CPU Time: 0.610 seconds; Data structure size: 1.6Mb - Returns the runtime, units as a tuple. - Raises RuntimeError exception if the search pattern is not found. + Args: + log_text: is the job's log file contents as a list of lines. + + Returns: + the runtime, units as a tuple. + + Raises: + RuntimeError exception if the search pattern is not found. + """ pattern = r"^CPU [tT]ime:\s*(\d+\.?\d*?)\s*(seconds|minutes|hours).*$" for line in reversed(log_text): @@ -143,14 +166,21 @@ def vcs_job_runtime(log_text: list) -> tuple[float, str]: def xcelium_job_runtime(log_text: list) -> tuple[float, str]: - """Returns the Xcelium job runtime (wall clock time) along with its units. + """Return the Xcelium job runtime (wall clock time) along with its units. Search pattern example: TOOL: xrun(64) 21.09-s006: Exiting on Aug 01, 2022 at 00:21:18 PDT \ (total: 00:00:05) - Returns the runtime, units as a tuple. - Raises RuntimeError exception if the search pattern is not found. + Args: + log_text: is the job's log file contents as a list of lines. + + Returns: + the runtime, units as a tuple. + + Raises: + RuntimeError: exception if the search pattern is not found. + """ pattern = r"^TOOL:\s*xrun.*: Exiting on .*\(total:\s*(\d+):(\d+):(\d+)\)\s*$" for line in reversed(log_text): @@ -163,17 +193,23 @@ def xcelium_job_runtime(log_text: list) -> tuple[float, str]: def get_simulated_time(log_text: list, tool: str) -> tuple[float, str]: - """Returns the simulated time along with its units. + """Return the simulated time along with its units. EDA tools indicate how long the design was simulated for in the log file. This method invokes the tool specific method which parses the log text and returns the simulated time as a floating point value followed by its units (typically, pico|nano|micro|milliseconds) as a tuple. - `log_text` is the job's log file contents as a list of lines. - `tool` is the EDA tool used to run the job. - Returns the simulated, units as a tuple. - Raises NotImplementedError exception if the EDA tool is not supported. + Args: + log_text: is the job's log file contents as a list of lines. + tool: is the EDA tool used to run the job. + + Returns: + the simulated, units as a tuple. + + Raises: + NotImplementedError: exception if the EDA tool is not supported. + """ if tool == "xcelium": return xcelium_simulated_time(log_text) @@ -184,13 +220,20 @@ def get_simulated_time(log_text: list, tool: str) -> tuple[float, str]: def xcelium_simulated_time(log_text: list) -> tuple[float, str]: - """Returns the Xcelium simulated time along with its units. + """Return the Xcelium simulated time along with its units. Search pattern example: Simulation complete via $finish(2) at time 11724965 PS + 13 - Returns the simulated time, units as a tuple. - Raises RuntimeError exception if the search pattern is not found. + Args: + log_text: is the job's log file contents as a list of lines. + + Returns: + the simulated time, units as a tuple. + + Raises: + RuntimeError: exception if the search pattern is not found. + """ pattern = r"^Simulation complete .* at time (\d+\.?\d*?)\s*(.?[sS]).*$" for line in reversed(log_text): @@ -202,22 +245,32 @@ def xcelium_simulated_time(log_text: list) -> tuple[float, str]: def vcs_simulated_time(log_text: list) -> tuple[float, str]: - """Returns the VCS simulated time along with its units. + """Return the VCS simulated time along with its units. Search pattern example: V C S S i m u l a t i o n R e p o r t Time: 12241752 ps - Returns the simulated time, units as a tuple. - Raises RuntimeError exception if the search pattern is not found. + Args: + log_text: is the job's log file contents as a list of lines. + + Returns: + the simulated time, units as a tuple. + + Raises: + RuntimeError: exception if the search pattern is not found. + """ pattern = r"^Time:\s*(\d+\.?\d*?)\s*(.?[sS])\s*$" next_line = "" + for line in reversed(log_text): - if "V C S S i m u l a t i o n R e p o r t" in line: - m = re.search(pattern, next_line) - if m: - return float(m.group(1)), m.group(2).lower() + if "V C S S i m u l a t i o n R e p o r t" in line and ( + m := re.search(pattern, next_line) + ): + return float(m.group(1)), m.group(2).lower() + next_line = line + msg = "Simulated time not found in the log." raise RuntimeError(msg)