diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index 721e585dd..37394c358 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -408,10 +408,10 @@ def _query_slurm_for_status_and_progress( ) statuses = [] for i, slurm_job_id in enumerate(slurm_job_ids): - slurm_status = slurm_jobs_status[slurm_job_id] + slurm_status = slurm_jobs_status[slurm_job_id][0] if slurm_job_id in latest_slurm_job_ids: latest_slurm_job_id = latest_slurm_job_ids[slurm_job_id] - slurm_status = latest_slurm_jobs_status[latest_slurm_job_id] + slurm_status = latest_slurm_jobs_status[latest_slurm_job_id][0] progress = progress_list[i] progress = progress if progress is not None else "unknown" execution_state = SlurmExecutor._map_slurm_state_to_execution_state( @@ -1009,8 +1009,121 @@ def _query_slurm_jobs_status( username: str, hostname: str, socket: str | None, -) -> Dict[str, str]: - """Query SLURM for job statuses using sacct command. +) -> Dict[str, tuple[str, str]]: + """Query SLURM for job statuses using squeue (for active jobs) and sacct (fallback). + + This function first tries squeue which is more accurate for currently running jobs, + then falls back to sacct for completed/historical jobs that squeue doesn't show. + It also finds follow-up jobs (from autoresume) that depend on our known jobs. + + Args: + slurm_job_ids: List of SLURM job IDs to query. + username: SSH username. + hostname: SSH hostname. + socket: control socket location or None + + Returns: + Dict mapping from slurm_job_id to tuple of status, current_job_id. + """ + if len(slurm_job_ids) == 0: + return {} + + # First, try squeue for active jobs (more accurate for running jobs) + squeue_statuses = _query_squeue_for_jobs(slurm_job_ids, username, hostname, socket) + + # For jobs not found in squeue, fall back to sacct + missing_jobs = [job_id for job_id in slurm_job_ids if job_id not in squeue_statuses] + sacct_statuses = {} + + if missing_jobs: + sacct_statuses = _query_sacct_for_jobs(missing_jobs, username, hostname, socket) + + # Combine results, preferring squeue data + combined_statuses = {**sacct_statuses, **squeue_statuses} + + return combined_statuses + + +def _query_squeue_for_jobs( + slurm_job_ids: List[str], + username: str, + hostname: str, + socket: str | None, +) -> Dict[str, tuple[str, str]]: + """Query SLURM for active job statuses using squeue command. + + This function finds: + 1. Jobs that directly match our known job IDs + 2. Follow-up jobs that depend on our known job IDs (from autoresume mechanism) + + For follow-up jobs, returns the status mapped to the original job ID, along with + the actual current SLURM job ID. + + Args: + slurm_job_ids: List of SLURM job IDs to query. + username: SSH username. + hostname: SSH hostname. + socket: control socket location or None + + Returns: + Dict mapping from original slurm_job_id to tuple of status, current_job_id. + """ + if len(slurm_job_ids) == 0: + return {} + + # Use squeue to get active jobs - more accurate than sacct for running jobs + squeue_command = "squeue -u {} -h -o '%i|%T|%E'".format(username) + + ssh_command = ["ssh"] + if socket is not None: + ssh_command.append(f"-S {socket}") + ssh_command.append(f"{username}@{hostname}") + ssh_command.append(squeue_command) + ssh_command = " ".join(ssh_command) + + completed_process = subprocess.run( + args=shlex.split(ssh_command), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + squeue_statuses = {} + dependent_jobs = [] + if completed_process.returncode == 0: + squeue_output = completed_process.stdout.decode("utf-8") + squeue_output_lines = squeue_output.strip().split("\n") + + for line in squeue_output_lines: + if not line.strip(): + continue + parts = line.split("|") + if len(parts) >= 3: + job_id = parts[0].strip() + status = parts[1].strip() + dependency = parts[2].strip() + # Extract base job ID (handle array jobs like 123456_0 -> 123456) + base_job_id = job_id.split("_")[0].split("[")[0] + if base_job_id in slurm_job_ids: + squeue_statuses[base_job_id] = status, base_job_id + elif dependency and dependency != "(null)": + dependent_jobs.append((base_job_id, status, dependency)) + + for dep_job_id, dep_status, dependency in dependent_jobs: + for known_job_id in slurm_job_ids: + if known_job_id in dependency and known_job_id not in squeue_statuses: + squeue_statuses[known_job_id] = dep_status, dep_job_id + break + + return squeue_statuses + + +def _query_sacct_for_jobs( + slurm_job_ids: List[str], + username: str, + hostname: str, + socket: str | None, +) -> Dict[str, tuple[str, str]]: + """Query SLURM for job statuses using sacct command (for completed/historical jobs). Args: slurm_job_ids: List of SLURM job IDs to query. @@ -1019,10 +1132,11 @@ def _query_slurm_jobs_status( socket: control socket location or None Returns: - Dict mapping from slurm_job_id to returned slurm status. + Dict mapping from slurm_job_id to tuple of status, job_id. """ if len(slurm_job_ids) == 0: return {} + sacct_command = "sacct -j {} --format='JobID,State%32' --noheader -P".format( ",".join(slurm_job_ids) ) @@ -1049,7 +1163,7 @@ def _query_slurm_jobs_status( slurm_jobs_status = {} for slurm_job_id in slurm_job_ids: slurm_job_status = _parse_slurm_job_status(slurm_job_id, sacct_output_lines) - slurm_jobs_status[slurm_job_id] = slurm_job_status + slurm_jobs_status[slurm_job_id] = slurm_job_status, slurm_job_id return slurm_jobs_status diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 26fd5b527..ac02fd831 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -1266,7 +1266,7 @@ def test_query_slurm_for_status_and_progress_basic(self): ) as mock_progress, ): mock_open.return_value = "/tmp/socket" - mock_query_status.return_value = {"123456789": "COMPLETED"} + mock_query_status.return_value = {"123456789": ("COMPLETED", "123456789")} mock_autoresume.return_value = {"123456789": ["123456789"]} mock_progress.return_value = [0.8] @@ -1311,8 +1311,10 @@ def test_query_slurm_for_status_and_progress_autoresumed(self): mock_open.return_value = "/tmp/socket" # Initial job was preempted, latest job is running mock_query_status.side_effect = [ - {"123456789": "PREEMPTED"}, # Original job status - {"123456790": "RUNNING"}, # Latest autoresumed job status + {"123456789": ("PREEMPTED", "123456789")}, # Original job status + { + "123456790": ("RUNNING", "123456790") + }, # Latest autoresumed job status ] # Autoresume shows there's a newer job ID mock_autoresume.return_value = {"123456789": ["123456789", "123456790"]} @@ -1357,7 +1359,7 @@ def test_query_slurm_for_status_and_progress_unknown_progress(self): ) as mock_progress, ): mock_open.return_value = "/tmp/socket" - mock_query_status.return_value = {"123456789": "RUNNING"} + mock_query_status.return_value = {"123456789": ("RUNNING", "123456789")} mock_autoresume.return_value = {"123456789": ["123456789"]} mock_progress.return_value = [None] # Unknown progress @@ -1717,14 +1719,27 @@ def test_query_slurm_jobs_status_success(self): ) def mock_subprocess_run(*args, **kwargs): - """Mock subprocess.run for sacct command.""" - # Mock sacct output - return Mock( - returncode=0, - stdout=b"123456789|COMPLETED\n123456790|RUNNING\n", - stderr=b"", + """Mock subprocess.run for squeue and sacct commands.""" + cmd_args = kwargs.get("args", []) + if not cmd_args: + return Mock(returncode=1, stdout=b"", stderr=b"") + + cmd_str = ( + " ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args) ) + if "squeue" in cmd_str: + # Mock squeue with no active jobs (empty output) + return Mock(returncode=0, stdout=b"", stderr=b"") + elif "sacct" in cmd_str: + # Mock sacct output + return Mock( + returncode=0, + stdout=b"123456789|COMPLETED\n123456790|RUNNING\n", + stderr=b"", + ) + return Mock(returncode=1, stdout=b"", stderr=b"") + with patch("subprocess.run", side_effect=mock_subprocess_run): result = _query_slurm_jobs_status( slurm_job_ids=["123456789", "123456790"], @@ -1733,7 +1748,8 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) - assert result == {"123456789": "COMPLETED", "123456790": "RUNNING"} + assert result["123456789"] == ("COMPLETED", "123456789") + assert result["123456790"] == ("RUNNING", "123456790") def test_query_slurm_jobs_status_failure(self): """Test _query_slurm_jobs_status function with failed subprocess call.""" @@ -1756,6 +1772,137 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) + def test_query_squeue_for_jobs_success(self): + """Test _query_squeue_for_jobs function with successful subprocess call.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_squeue_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for squeue command.""" + # Mock squeue output with various job formats + return Mock( + returncode=0, + stdout=b"123456789|RUNNING|\n123456790_0|PENDING|(null)\n123456791[1-10]|PENDING|\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_squeue_for_jobs( + slurm_job_ids=["123456789", "123456790", "123456791"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + assert result["123456789"] == ("RUNNING", "123456789") + assert result["123456790"] == ("PENDING", "123456790") + assert result["123456791"] == ("PENDING", "123456791") + + def test_query_squeue_for_jobs_finds_dependent_jobs(self): + """Test that _query_squeue_for_jobs finds follow-up jobs that depend on known jobs.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_squeue_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for squeue command with dependent jobs.""" + # Simulate: job 123456789 has finished (not in squeue), + # but job 123456790 is PENDING with dependency on 123456789 + return Mock( + returncode=0, + stdout=b"123456790|PENDING|afternotok:123456789\n123456791|RUNNING|(null)\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_squeue_for_jobs( + slurm_job_ids=["123456789", "123456791"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + assert result["123456789"] == ( + "PENDING", + "123456790", + ) # Should find 123456789's status via its dependent job 123456790 + assert result["123456791"] == ( + "RUNNING", + "123456791", + ) # Direct match for 123456791 + + def test_query_slurm_jobs_status_combined_approach(self): + """Test _query_slurm_jobs_status using combined squeue + sacct approach.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_slurm_jobs_status, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for both squeue and sacct commands.""" + # Get the command from kwargs['args'] since that's how subprocess.run is called + cmd_args = kwargs.get("args", []) + if not cmd_args: + return Mock(returncode=1, stdout=b"", stderr=b"") + + cmd_str = ( + " ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args) + ) + + if "squeue" in cmd_str: + # Mock squeue showing only running jobs + return Mock( + returncode=0, + stdout=b"123456789|RUNNING|(null)\n", + stderr=b"", + ) + elif "sacct" in cmd_str: + # Mock sacct showing completed job that's not in squeue + return Mock( + returncode=0, + stdout=b"123456790|COMPLETED\n", + stderr=b"", + ) + return Mock(returncode=1, stdout=b"", stderr=b"") + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_slurm_jobs_status( + slurm_job_ids=["123456789", "123456790"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + # Should get running job from squeue and completed job from sacct + assert result["123456789"] == ("RUNNING", "123456789") + assert result["123456790"] == ("COMPLETED", "123456790") + + def test_query_sacct_for_jobs_success(self): + """Test _query_sacct_for_jobs function with successful subprocess call.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_sacct_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for sacct command.""" + return Mock( + returncode=0, + stdout=b"123456789|COMPLETED\n123456790|FAILED\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_sacct_for_jobs( + slurm_job_ids=["123456789", "123456790"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + assert result == { + "123456789": ("COMPLETED", "123456789"), + "123456790": ("FAILED", "123456790"), + } + def test_sbatch_remote_runsubs_success(self): """Test _sbatch_remote_runsubs function with successful subprocess call.""" from pathlib import Path