Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,10 @@ def _query_slurm_for_status_and_progress(
)
statuses = []
for i, slurm_job_id in enumerate(slurm_job_ids):
slurm_status = slurm_jobs_status[slurm_job_id]
slurm_status = slurm_jobs_status[slurm_job_id][0]
if slurm_job_id in latest_slurm_job_ids:
latest_slurm_job_id = latest_slurm_job_ids[slurm_job_id]
slurm_status = latest_slurm_jobs_status[latest_slurm_job_id]
slurm_status = latest_slurm_jobs_status[latest_slurm_job_id][0]
progress = progress_list[i]
progress = progress if progress is not None else "unknown"
execution_state = SlurmExecutor._map_slurm_state_to_execution_state(
Expand Down Expand Up @@ -1009,8 +1009,121 @@ def _query_slurm_jobs_status(
username: str,
hostname: str,
socket: str | None,
) -> Dict[str, str]:
"""Query SLURM for job statuses using sacct command.
) -> Dict[str, tuple[str, str]]:
"""Query SLURM for job statuses using squeue (for active jobs) and sacct (fallback).

This function first tries squeue which is more accurate for currently running jobs,
then falls back to sacct for completed/historical jobs that squeue doesn't show.
It also finds follow-up jobs (from autoresume) that depend on our known jobs.

Args:
slurm_job_ids: List of SLURM job IDs to query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect that most of the bugs are coming from the fact that we cannot reliably tell the slurm job id for a specific job. We are trying to read this from a file, but there can be some race conditions and manual restarts that can make the file to be out-of-sync from reality.

For the concrete case we discussed offline, will this fix the status?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should try to handle cases where a user does something manually, e.g. restarts the job.

Also I think the file with job IDs is the closest thing to the truth that we can get. If we tried to get the information from all user's jobs, we'd open a new can of worms - most folks run different things, not only evaluations, it's hard to predict what corner cases we'd hit.

username: SSH username.
hostname: SSH hostname.
socket: control socket location or None

Returns:
Dict mapping from slurm_job_id to tuple of status, current_job_id.
"""
if len(slurm_job_ids) == 0:
return {}

# First, try squeue for active jobs (more accurate for running jobs)
squeue_statuses = _query_squeue_for_jobs(slurm_job_ids, username, hostname, socket)

# For jobs not found in squeue, fall back to sacct
missing_jobs = [job_id for job_id in slurm_job_ids if job_id not in squeue_statuses]
sacct_statuses = {}

if missing_jobs:
sacct_statuses = _query_sacct_for_jobs(missing_jobs, username, hostname, socket)

# Combine results, preferring squeue data
combined_statuses = {**sacct_statuses, **squeue_statuses}

return combined_statuses


def _query_squeue_for_jobs(
slurm_job_ids: List[str],
username: str,
hostname: str,
socket: str | None,
) -> Dict[str, tuple[str, str]]:
"""Query SLURM for active job statuses using squeue command.

This function finds:
1. Jobs that directly match our known job IDs
2. Follow-up jobs that depend on our known job IDs (from autoresume mechanism)

For follow-up jobs, returns the status mapped to the original job ID, along with
the actual current SLURM job ID.

Args:
slurm_job_ids: List of SLURM job IDs to query.
username: SSH username.
hostname: SSH hostname.
socket: control socket location or None

Returns:
Dict mapping from original slurm_job_id to tuple of status, current_job_id.
"""
if len(slurm_job_ids) == 0:
return {}

# Use squeue to get active jobs - more accurate than sacct for running jobs
squeue_command = "squeue -u {} -h -o '%i|%T|%E'".format(username)

ssh_command = ["ssh"]
if socket is not None:
ssh_command.append(f"-S {socket}")
ssh_command.append(f"{username}@{hostname}")
ssh_command.append(squeue_command)
ssh_command = " ".join(ssh_command)

completed_process = subprocess.run(
args=shlex.split(ssh_command),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)

squeue_statuses = {}
dependent_jobs = []
if completed_process.returncode == 0:
squeue_output = completed_process.stdout.decode("utf-8")
squeue_output_lines = squeue_output.strip().split("\n")

for line in squeue_output_lines:
if not line.strip():
continue
parts = line.split("|")
if len(parts) >= 3:
job_id = parts[0].strip()
status = parts[1].strip()
dependency = parts[2].strip()
# Extract base job ID (handle array jobs like 123456_0 -> 123456)
base_job_id = job_id.split("_")[0].split("[")[0]
if base_job_id in slurm_job_ids:
squeue_statuses[base_job_id] = status, base_job_id
elif dependency and dependency != "(null)":
dependent_jobs.append((base_job_id, status, dependency))

for dep_job_id, dep_status, dependency in dependent_jobs:
for known_job_id in slurm_job_ids:
if known_job_id in dependency and known_job_id not in squeue_statuses:
squeue_statuses[known_job_id] = dep_status, dep_job_id
break

return squeue_statuses


def _query_sacct_for_jobs(
slurm_job_ids: List[str],
username: str,
hostname: str,
socket: str | None,
) -> Dict[str, tuple[str, str]]:
"""Query SLURM for job statuses using sacct command (for completed/historical jobs).

Args:
slurm_job_ids: List of SLURM job IDs to query.
Expand All @@ -1019,10 +1132,11 @@ def _query_slurm_jobs_status(
socket: control socket location or None

Returns:
Dict mapping from slurm_job_id to returned slurm status.
Dict mapping from slurm_job_id to tuple of status, job_id.
"""
if len(slurm_job_ids) == 0:
return {}

sacct_command = "sacct -j {} --format='JobID,State%32' --noheader -P".format(
",".join(slurm_job_ids)
)
Expand All @@ -1049,7 +1163,7 @@ def _query_slurm_jobs_status(
slurm_jobs_status = {}
for slurm_job_id in slurm_job_ids:
slurm_job_status = _parse_slurm_job_status(slurm_job_id, sacct_output_lines)
slurm_jobs_status[slurm_job_id] = slurm_job_status
slurm_jobs_status[slurm_job_id] = slurm_job_status, slurm_job_id
return slurm_jobs_status


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1266,7 +1266,7 @@ def test_query_slurm_for_status_and_progress_basic(self):
) as mock_progress,
):
mock_open.return_value = "/tmp/socket"
mock_query_status.return_value = {"123456789": "COMPLETED"}
mock_query_status.return_value = {"123456789": ("COMPLETED", "123456789")}
mock_autoresume.return_value = {"123456789": ["123456789"]}
mock_progress.return_value = [0.8]

Expand Down Expand Up @@ -1311,8 +1311,10 @@ def test_query_slurm_for_status_and_progress_autoresumed(self):
mock_open.return_value = "/tmp/socket"
# Initial job was preempted, latest job is running
mock_query_status.side_effect = [
{"123456789": "PREEMPTED"}, # Original job status
{"123456790": "RUNNING"}, # Latest autoresumed job status
{"123456789": ("PREEMPTED", "123456789")}, # Original job status
{
"123456790": ("RUNNING", "123456790")
}, # Latest autoresumed job status
]
# Autoresume shows there's a newer job ID
mock_autoresume.return_value = {"123456789": ["123456789", "123456790"]}
Expand Down Expand Up @@ -1357,7 +1359,7 @@ def test_query_slurm_for_status_and_progress_unknown_progress(self):
) as mock_progress,
):
mock_open.return_value = "/tmp/socket"
mock_query_status.return_value = {"123456789": "RUNNING"}
mock_query_status.return_value = {"123456789": ("RUNNING", "123456789")}
mock_autoresume.return_value = {"123456789": ["123456789"]}
mock_progress.return_value = [None] # Unknown progress

Expand Down Expand Up @@ -1717,14 +1719,27 @@ def test_query_slurm_jobs_status_success(self):
)

def mock_subprocess_run(*args, **kwargs):
"""Mock subprocess.run for sacct command."""
# Mock sacct output
return Mock(
returncode=0,
stdout=b"123456789|COMPLETED\n123456790|RUNNING\n",
stderr=b"",
"""Mock subprocess.run for squeue and sacct commands."""
cmd_args = kwargs.get("args", [])
if not cmd_args:
return Mock(returncode=1, stdout=b"", stderr=b"")

cmd_str = (
" ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args)
)

if "squeue" in cmd_str:
# Mock squeue with no active jobs (empty output)
return Mock(returncode=0, stdout=b"", stderr=b"")
elif "sacct" in cmd_str:
# Mock sacct output
return Mock(
returncode=0,
stdout=b"123456789|COMPLETED\n123456790|RUNNING\n",
stderr=b"",
)
return Mock(returncode=1, stdout=b"", stderr=b"")

with patch("subprocess.run", side_effect=mock_subprocess_run):
result = _query_slurm_jobs_status(
slurm_job_ids=["123456789", "123456790"],
Expand All @@ -1733,7 +1748,8 @@ def mock_subprocess_run(*args, **kwargs):
socket="/tmp/socket",
)

assert result == {"123456789": "COMPLETED", "123456790": "RUNNING"}
assert result["123456789"] == ("COMPLETED", "123456789")
assert result["123456790"] == ("RUNNING", "123456790")

def test_query_slurm_jobs_status_failure(self):
"""Test _query_slurm_jobs_status function with failed subprocess call."""
Expand All @@ -1756,6 +1772,137 @@ def mock_subprocess_run(*args, **kwargs):
socket="/tmp/socket",
)

def test_query_squeue_for_jobs_success(self):
"""Test _query_squeue_for_jobs function with successful subprocess call."""
from nemo_evaluator_launcher.executors.slurm.executor import (
_query_squeue_for_jobs,
)

def mock_subprocess_run(*args, **kwargs):
"""Mock subprocess.run for squeue command."""
# Mock squeue output with various job formats
return Mock(
returncode=0,
stdout=b"123456789|RUNNING|\n123456790_0|PENDING|(null)\n123456791[1-10]|PENDING|\n",
stderr=b"",
)

with patch("subprocess.run", side_effect=mock_subprocess_run):
result = _query_squeue_for_jobs(
slurm_job_ids=["123456789", "123456790", "123456791"],
username="testuser",
hostname="slurm.example.com",
socket="/tmp/socket",
)

assert result["123456789"] == ("RUNNING", "123456789")
assert result["123456790"] == ("PENDING", "123456790")
assert result["123456791"] == ("PENDING", "123456791")

def test_query_squeue_for_jobs_finds_dependent_jobs(self):
"""Test that _query_squeue_for_jobs finds follow-up jobs that depend on known jobs."""
from nemo_evaluator_launcher.executors.slurm.executor import (
_query_squeue_for_jobs,
)

def mock_subprocess_run(*args, **kwargs):
"""Mock subprocess.run for squeue command with dependent jobs."""
# Simulate: job 123456789 has finished (not in squeue),
# but job 123456790 is PENDING with dependency on 123456789
return Mock(
returncode=0,
stdout=b"123456790|PENDING|afternotok:123456789\n123456791|RUNNING|(null)\n",
stderr=b"",
)

with patch("subprocess.run", side_effect=mock_subprocess_run):
result = _query_squeue_for_jobs(
slurm_job_ids=["123456789", "123456791"],
username="testuser",
hostname="slurm.example.com",
socket="/tmp/socket",
)
assert result["123456789"] == (
"PENDING",
"123456790",
) # Should find 123456789's status via its dependent job 123456790
assert result["123456791"] == (
"RUNNING",
"123456791",
) # Direct match for 123456791

def test_query_slurm_jobs_status_combined_approach(self):
"""Test _query_slurm_jobs_status using combined squeue + sacct approach."""
from nemo_evaluator_launcher.executors.slurm.executor import (
_query_slurm_jobs_status,
)

def mock_subprocess_run(*args, **kwargs):
"""Mock subprocess.run for both squeue and sacct commands."""
# Get the command from kwargs['args'] since that's how subprocess.run is called
cmd_args = kwargs.get("args", [])
if not cmd_args:
return Mock(returncode=1, stdout=b"", stderr=b"")

cmd_str = (
" ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args)
)

if "squeue" in cmd_str:
# Mock squeue showing only running jobs
return Mock(
returncode=0,
stdout=b"123456789|RUNNING|(null)\n",
stderr=b"",
)
elif "sacct" in cmd_str:
# Mock sacct showing completed job that's not in squeue
return Mock(
returncode=0,
stdout=b"123456790|COMPLETED\n",
stderr=b"",
)
return Mock(returncode=1, stdout=b"", stderr=b"")

with patch("subprocess.run", side_effect=mock_subprocess_run):
result = _query_slurm_jobs_status(
slurm_job_ids=["123456789", "123456790"],
username="testuser",
hostname="slurm.example.com",
socket="/tmp/socket",
)

# Should get running job from squeue and completed job from sacct
assert result["123456789"] == ("RUNNING", "123456789")
assert result["123456790"] == ("COMPLETED", "123456790")

def test_query_sacct_for_jobs_success(self):
"""Test _query_sacct_for_jobs function with successful subprocess call."""
from nemo_evaluator_launcher.executors.slurm.executor import (
_query_sacct_for_jobs,
)

def mock_subprocess_run(*args, **kwargs):
"""Mock subprocess.run for sacct command."""
return Mock(
returncode=0,
stdout=b"123456789|COMPLETED\n123456790|FAILED\n",
stderr=b"",
)

with patch("subprocess.run", side_effect=mock_subprocess_run):
result = _query_sacct_for_jobs(
slurm_job_ids=["123456789", "123456790"],
username="testuser",
hostname="slurm.example.com",
socket="/tmp/socket",
)

assert result == {
"123456789": ("COMPLETED", "123456789"),
"123456790": ("FAILED", "123456790"),
}

def test_sbatch_remote_runsubs_success(self):
"""Test _sbatch_remote_runsubs function with successful subprocess call."""
from pathlib import Path
Expand Down