Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions trailblazer/clients/slurm_cli_client/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import subprocess

from trailblazer.clients.slurm_cli_client.models import SqueueResult
from trailblazer.constants import CharacterFormat, FileFormat
from trailblazer.exc import EmptySqueueError
from trailblazer.io.controller import ReadStream
from trailblazer.clients.slurm_cli_client.models import SqueueResult


def cancel_slurm_job(slurm_id: int, analysis_host: str | None = None) -> None:
Expand All @@ -23,36 +23,44 @@ def get_slurm_queue(job_ids: list[int], analysis_host: str | None = None) -> Squ

def get_slurm_queue_output(job_ids: str, analysis_host: str | None = None) -> str:
"""Return squeue output from ongoing analyses in SLURM."""
squeue_commands: list[str] = [
"squeue",
sacct_commands: list[str] = [
"sacct",
"--jobs",
job_ids,
"--states=all",
"--format",
"%A,%j,%T,%l,%M,%S",
"JobID%10,JobName%50,State%12,Timelimit%12,Elapsed%12,Submit%20",
"--parsable2",
"--delimiter=,",
"--noheader",
]
if analysis_host:
squeue_commands = ["ssh", analysis_host] + squeue_commands
return (
sacct_commands = ["ssh", analysis_host] + sacct_commands
output = (
subprocess.check_output(
squeue_commands,
sacct_commands,
universal_newlines=True,
)
.decode(CharacterFormat.UNICODE_TRANSFORMATION_FORMAT_8)
.strip()
.replace("//n", "/n")
)
return subprocess.check_output(squeue_commands).decode(
CharacterFormat.UNICODE_TRANSFORMATION_FORMAT_8
)
else:
output = (
subprocess.check_output(sacct_commands)
.decode(CharacterFormat.UNICODE_TRANSFORMATION_FORMAT_8)
.strip()
)

squeue_headers = "JOBID,NAME,STATE,TIME_LIMIT,TIME,START_TIME"
return squeue_headers + "\n" + output if output else squeue_headers


def get_squeue_result(squeue_response: str) -> SqueueResult:
"""Return SqueueResult object from squeue response.
Raises:
TrailblazerError: when no entries were returned by squeue command.
"""
if not squeue_response:
if not squeue_response or squeue_response.count("\n") <= 1:
raise EmptySqueueError("No jobs found in SLURM registry")
squeue_response_content: list[dict] = ReadStream.get_content_from_stream(
file_format=FileFormat.CSV,
Expand Down
Loading