Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/dvsim/job/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def get_log_path(self) -> str:
def get_timeout_mins(self) -> float | None:
"""Return the timeout in minutes."""

def extract_info_from_log(self, log_text: list) -> None:
def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None:
"""Extract information pertaining to the job from its log.

This method parses the log text after the job has completed, for the
Expand All @@ -326,7 +326,7 @@ def extract_info_from_log(self, log_text: list) -> None:
self.job_runtime.set(time, unit)
except RuntimeError as e:
log.warning(f"{self.full_name}: {e} Using dvsim-maintained job_runtime instead.")
self.job_runtime.set(self.launcher.job_runtime_secs, "s")
self.job_runtime.set(job_runtime_secs, "s")

def model_dump(self) -> Mapping:
"""Dump the deployment object to mapping object.
Expand Down Expand Up @@ -632,12 +632,17 @@ def get_timeout_mins(self):
"""
return self.run_timeout_mins if self.run_timeout_mins is not None else 60

def extract_info_from_log(self, log_text: list) -> None:
def extract_info_from_log(self, job_runtime_secs: int, log_text: list) -> None:
"""Extract the time the design was simulated for, from the log."""
super().extract_info_from_log(log_text)
super().extract_info_from_log(
job_runtime_secs=job_runtime_secs,
log_text=log_text,
)

try:
time, unit = get_simulated_time(log_text, self.sim_cfg.tool)
self.simulated_time.set(time, unit)

except RuntimeError as e:
log.debug(f"{self.full_name}: {e}")

Expand Down
89 changes: 46 additions & 43 deletions src/dvsim/launcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,48 @@ class Launcher(ABC):
context=[],
)

def __init__(self, deploy: "Deploy") -> None:
"""Initialise launcher.

Args:
deploy: deployment object that will be launched.

"""
cfg = deploy.sim_cfg

# One-time preparation of the workspace.
if not Launcher.workspace_prepared:
# TODO: CLI args should be processed far earlier than this
self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args)
Launcher.workspace_prepared = True

# One-time preparation of the workspace, specific to the cfg.
if cfg not in Launcher.workspace_prepared_for_cfg:
self.prepare_workspace_for_cfg(cfg)
Launcher.workspace_prepared_for_cfg.add(cfg)

# Store the deploy object handle.
self.deploy = deploy

# Status of the job. This is primarily determined by the
# _check_status() method, but eventually updated by the _post_finish()
# method, in case any of the cleanup tasks fails. This value is finally
# returned to the Scheduler by the poll() method.
self.status = None

# Return status of the process running the job.
self.exit_code = None

# Flag to indicate whether to 'overwrite' if odir already exists,
# or to backup the existing one and create a new one.
# For builds, we want to overwrite existing to leverage the tools'
# incremental / partition compile features. For runs, we may want to
# create a new one.
self.renew_odir = False

# The actual job runtime computed by dvsim, in seconds.
self.job_runtime_secs = 0

@staticmethod
def set_pyvenv(project: str) -> None:
"""Activate a python virtualenv if available.
Expand Down Expand Up @@ -147,48 +189,6 @@ def __str__(self) -> str:
"""Get a string representation."""
return self.deploy.full_name + ":launcher"

def __init__(self, deploy: "Deploy") -> None:
"""Initialise launcher.

Args:
deploy: deployment object that will be launched.

"""
cfg = deploy.sim_cfg

# One-time preparation of the workspace.
if not Launcher.workspace_prepared:
# TODO: CLI args should be processed far earlier than this
self.prepare_workspace(cfg.project, cfg.proj_root, cfg.args)
Launcher.workspace_prepared = True

# One-time preparation of the workspace, specific to the cfg.
if cfg not in Launcher.workspace_prepared_for_cfg:
self.prepare_workspace_for_cfg(cfg)
Launcher.workspace_prepared_for_cfg.add(cfg)

# Store the deploy object handle.
self.deploy = deploy

# Status of the job. This is primarily determined by the
# _check_status() method, but eventually updated by the _post_finish()
# method, in case any of the cleanup tasks fails. This value is finally
# returned to the Scheduler by the poll() method.
self.status = None

# Return status of the process running the job.
self.exit_code = None

# Flag to indicate whether to 'overwrite' if odir already exists,
# or to backup the existing one and create a new one.
# For builds, we want to overwrite existing to leverage the tools'
# incremental / partition compile features. For runs, we may want to
# create a new one.
self.renew_odir = False

# The actual job runtime computed by dvsim, in seconds.
self.job_runtime_secs = 0

def _make_odir(self) -> None:
"""Create the output directory."""
# If renew_odir flag is True - then move it.
Expand Down Expand Up @@ -318,7 +318,10 @@ def _find_patterns(patterns: Sequence[str], line: str) -> Sequence[str] | None:
# Since the log file is already opened and read to assess the job's
# status, use this opportunity to also extract other pieces of
# information.
self.deploy.extract_info_from_log(lines)
self.deploy.extract_info_from_log(
job_runtime_secs=self.job_runtime_secs,
log_text=lines,
)

if chk_failed or chk_passed:
for cnt, line in enumerate(lines):
Expand Down
22 changes: 22 additions & 0 deletions src/dvsim/launcher/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import os
import shlex
import subprocess
from collections.abc import Mapping
from pathlib import Path
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -181,3 +182,24 @@ def _close_job_log_file(self) -> None:
"""Close the file descriptors associated with the process."""
if self._log_file:
self._log_file.close()

@staticmethod
def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None:
"""Prepare the workspace based on the chosen launcher's needs.

This is done once for the entire duration for the flow run.

Args:
project: the name of the project.
repo_top: the path to the repository.
args: command line args passed to dvsim.

"""

@staticmethod
def prepare_workspace_for_cfg(cfg: Mapping) -> None:
"""Prepare the workspace for a cfg.

This is invoked once for each cfg.
'cfg' is the flow configuration object.
"""
24 changes: 24 additions & 0 deletions src/dvsim/launcher/nc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
# Licensed under the Apache License, Version 2.0, see LICENSE for details.
# SPDX-License-Identifier: Apache-2.0

"""Altair nc job launcher."""

import datetime
import os
import pathlib
import subprocess
import sys
from collections.abc import Mapping

from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError
from dvsim.logging import log
Expand Down Expand Up @@ -261,3 +264,24 @@ def _close_process(self) -> None:
assert self.process
if self.process.stdout:
self.process.stdout.close()

@staticmethod
def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None:
"""Prepare the workspace based on the chosen launcher's needs.

This is done once for the entire duration for the flow run.

Args:
project: the name of the project.
repo_top: the path to the repository.
args: command line args passed to dvsim.

"""

@staticmethod
def prepare_workspace_for_cfg(cfg: Mapping) -> None:
"""Prepare the workspace for a cfg.

This is invoked once for each cfg.
'cfg' is the flow configuration object.
"""
32 changes: 29 additions & 3 deletions src/dvsim/launcher/sge/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pathlib
import shlex
import subprocess
from collections.abc import Mapping
from subprocess import PIPE, Popen

from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError
Expand Down Expand Up @@ -46,7 +47,9 @@ def _do_launch(self) -> None:
self._dump_env_vars(exports)

try:
f = open(self.deploy.get_log_path(), "w", encoding="UTF-8", errors="surrogateescape")
f = pathlib.Path(self.deploy.get_log_path()).open(
"w", encoding="UTF-8", errors="surrogateescape"
)
f.write(f"[Executing]:\n{self.deploy.cmd}\n\n")
f.flush()
# ---------- prepare SGE job struct -----
Expand Down Expand Up @@ -103,10 +106,12 @@ def poll(self):
# -------------------------------------
# copy SGE jobb results to log file
if pathlib.Path(self.deploy.get_log_path() + ".sge").exists():
file1 = open(self.deploy.get_log_path() + ".sge", errors="replace")
file1 = pathlib.Path(self.deploy.get_log_path() + ".sge").open(errors="replace")
lines = file1.readlines()
file1.close()
f = open(self.deploy.get_log_path(), "a", encoding="UTF-8", errors="surrogateescape")
f = pathlib.Path(self.deploy.get_log_path()).open(
"a", encoding="UTF-8", errors="surrogateescape"
)
f.writelines(lines)
f.flush()
pathlib.Path(self.deploy.get_log_path() + ".sge").unlink()
Expand Down Expand Up @@ -160,3 +165,24 @@ def _close_process(self) -> None:
assert self.process
if self.process.stdout:
self.process.stdout.close()

@staticmethod
def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None:
"""Prepare the workspace based on the chosen launcher's needs.

This is done once for the entire duration for the flow run.

Args:
project: the name of the project.
repo_top: the path to the repository.
args: command line args passed to dvsim.

"""

@staticmethod
def prepare_workspace_for_cfg(cfg: Mapping) -> None:
"""Prepare the workspace for a cfg.

This is invoked once for each cfg.
'cfg' is the flow configuration object.
"""
28 changes: 25 additions & 3 deletions src/dvsim/launcher/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shlex
import shutil
import subprocess
from collections.abc import Mapping

from dvsim.launcher.base import ErrorMessage, Launcher, LauncherError
from dvsim.logging import log
Expand Down Expand Up @@ -59,7 +60,7 @@ def _do_launch(self) -> None:
)

try:
with open(self.slurm_log_file, "w") as out_file:
with pathlib.Path(self.slurm_log_file).open("w") as out_file:
out_file.write(f"[Executing]:\n{self.deploy.cmd}\n\n")
out_file.flush()

Expand Down Expand Up @@ -99,9 +100,9 @@ def poll(self):
# Copy slurm job results to log file
if pathlib.Path(self.slurm_log_file).exists():
try:
with open(self.slurm_log_file) as slurm_file:
with pathlib.Path(self.slurm_log_file).open() as slurm_file:
try:
with open(self.deploy.get_log_path(), "a") as out_file:
with pathlib.Path(self.deploy.get_log_path()).open("a") as out_file:
shutil.copyfileobj(slurm_file, out_file)
except OSError as e:
msg = f"File Error: {e} when handling {self.deploy.get_log_path()}"
Expand Down Expand Up @@ -146,3 +147,24 @@ def _close_process(self) -> None:
assert self.process
if self.process.stdout:
self.process.stdout.close()

@staticmethod
def prepare_workspace(project: str, repo_top: str, args: Mapping) -> None:
"""Prepare the workspace based on the chosen launcher's needs.

This is done once for the entire duration for the flow run.

Args:
project: the name of the project.
repo_top: the path to the repository.
args: command line args passed to dvsim.

"""

@staticmethod
def prepare_workspace_for_cfg(cfg: Mapping) -> None:
"""Prepare the workspace for a cfg.

This is invoked once for each cfg.
'cfg' is the flow configuration object.
"""
Loading
Loading