From 41b661ee317c4b1414e9e9803427dd3a5df1fbd6 Mon Sep 17 00:00:00 2001 From: Adam Rajfer Date: Fri, 28 Nov 2025 18:54:05 +0000 Subject: [PATCH 1/2] Use squeue for real-time active job status Signed-off-by: Adam Rajfer --- .../executors/slurm/executor.py | 98 ++++++++++++++++++- .../tests/unit_tests/test_slurm_executor.py | 97 ++++++++++++++++++ 2 files changed, 194 insertions(+), 1 deletion(-) diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index f0114384f..b4262cbc0 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -985,7 +985,102 @@ def _query_slurm_jobs_status( hostname: str, socket: str | None, ) -> Dict[str, str]: - """Query SLURM for job statuses using sacct command. + """Query SLURM for job statuses using squeue (for active jobs) and sacct (fallback). + + This function first tries squeue which is more accurate for currently running jobs, + then falls back to sacct for completed/historical jobs that squeue doesn't show. + + Args: + slurm_job_ids: List of SLURM job IDs to query. + username: SSH username. + hostname: SSH hostname. + socket: control socket location or None + + Returns: + Dict mapping from slurm_job_id to returned slurm status. + """ + if len(slurm_job_ids) == 0: + return {} + + # First, try squeue for active jobs (more accurate for running jobs) + squeue_statuses = _query_squeue_for_jobs(slurm_job_ids, username, hostname, socket) + + # For jobs not found in squeue, fall back to sacct + missing_jobs = [job_id for job_id in slurm_job_ids if job_id not in squeue_statuses] + sacct_statuses = {} + + if missing_jobs: + sacct_statuses = _query_sacct_for_jobs(missing_jobs, username, hostname, socket) + + # Combine results, preferring squeue data + combined_statuses = {**sacct_statuses, **squeue_statuses} + + return combined_statuses + + +def _query_squeue_for_jobs( + slurm_job_ids: List[str], + username: str, + hostname: str, + socket: str | None, +) -> Dict[str, str]: + """Query SLURM for active job statuses using squeue command. + + Args: + slurm_job_ids: List of SLURM job IDs to query. + username: SSH username. + hostname: SSH hostname. + socket: control socket location or None + + Returns: + Dict mapping from slurm_job_id to returned slurm status for active jobs only. + """ + if len(slurm_job_ids) == 0: + return {} + + # Use squeue to get active jobs - more accurate than sacct for running jobs + squeue_command = "squeue -u {} -h -o '%i %T'".format(username) + + ssh_command = ["ssh"] + if socket is not None: + ssh_command.append(f"-S {socket}") + ssh_command.append(f"{username}@{hostname}") + ssh_command.append(squeue_command) + ssh_command = " ".join(ssh_command) + + completed_process = subprocess.run( + args=shlex.split(ssh_command), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + squeue_statuses = {} + if completed_process.returncode == 0: + squeue_output = completed_process.stdout.decode("utf-8") + squeue_output_lines = squeue_output.strip().split("\n") + + for line in squeue_output_lines: + if not line.strip(): + continue + parts = line.split() + if len(parts) >= 2: + job_id = parts[0] + status = parts[1] + # Extract base job ID (handle array jobs like 123456_0 -> 123456) + base_job_id = job_id.split("_")[0].split("[")[0] + if base_job_id in slurm_job_ids: + squeue_statuses[base_job_id] = status + + return squeue_statuses + + +def _query_sacct_for_jobs( + slurm_job_ids: List[str], + username: str, + hostname: str, + socket: str | None, +) -> Dict[str, str]: + """Query SLURM for job statuses using sacct command (for completed/historical jobs). Args: slurm_job_ids: List of SLURM job IDs to query. @@ -998,6 +1093,7 @@ def _query_slurm_jobs_status( """ if len(slurm_job_ids) == 0: return {} + sacct_command = "sacct -j {} --format='JobID,State%32' --noheader -P".format( ",".join(slurm_job_ids) ) diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 2ace90ddf..6500ff10d 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -1733,6 +1733,103 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) + def test_query_squeue_for_jobs_success(self): + """Test _query_squeue_for_jobs function with successful subprocess call.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_squeue_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for squeue command.""" + # Mock squeue output with various job formats + return Mock( + returncode=0, + stdout=b"123456789 RUNNING\n123456790_0 PENDING\n123456791[1-10] PENDING\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_squeue_for_jobs( + slurm_job_ids=["123456789", "123456790", "123456791"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + assert result == { + "123456789": "RUNNING", + "123456790": "PENDING", + "123456791": "PENDING", + } + + def test_query_slurm_jobs_status_combined_approach(self): + """Test _query_slurm_jobs_status using combined squeue + sacct approach.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_slurm_jobs_status, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for both squeue and sacct commands.""" + # Get the command from kwargs['args'] since that's how subprocess.run is called + cmd_args = kwargs.get("args", []) + if not cmd_args: + return Mock(returncode=1, stdout=b"", stderr=b"") + + cmd_str = ( + " ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args) + ) + + if "squeue" in cmd_str: + # Mock squeue showing only running jobs + return Mock( + returncode=0, + stdout=b"123456789 RUNNING\n", + stderr=b"", + ) + elif "sacct" in cmd_str: + # Mock sacct showing completed job that's not in squeue + return Mock( + returncode=0, + stdout=b"123456790|COMPLETED\n", + stderr=b"", + ) + return Mock(returncode=1, stdout=b"", stderr=b"") + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_slurm_jobs_status( + slurm_job_ids=["123456789", "123456790"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + # Should get running job from squeue and completed job from sacct + assert result == {"123456789": "RUNNING", "123456790": "COMPLETED"} + + def test_query_sacct_for_jobs_success(self): + """Test _query_sacct_for_jobs function with successful subprocess call.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_sacct_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for sacct command.""" + return Mock( + returncode=0, + stdout=b"123456789|COMPLETED\n123456790|FAILED\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_sacct_for_jobs( + slurm_job_ids=["123456789", "123456790"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + + assert result == {"123456789": "COMPLETED", "123456790": "FAILED"} + def test_sbatch_remote_runsubs_success(self): """Test _sbatch_remote_runsubs function with successful subprocess call.""" from pathlib import Path From e405cdb2afd951f41089b4412f6360ca988e3083 Mon Sep 17 00:00:00 2001 From: Adam Rajfer Date: Mon, 1 Dec 2025 16:18:36 +0000 Subject: [PATCH 2/2] Detect follow-up jobs via dependency parsing Signed-off-by: Adam Rajfer --- .../executors/slurm/executor.py | 48 ++++++---- .../tests/unit_tests/test_slurm_executor.py | 90 ++++++++++++++----- 2 files changed, 103 insertions(+), 35 deletions(-) diff --git a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py index b4262cbc0..541cbac5a 100644 --- a/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py +++ b/packages/nemo-evaluator-launcher/src/nemo_evaluator_launcher/executors/slurm/executor.py @@ -388,10 +388,10 @@ def _query_slurm_for_status_and_progress( ) statuses = [] for i, slurm_job_id in enumerate(slurm_job_ids): - slurm_status = slurm_jobs_status[slurm_job_id] + slurm_status = slurm_jobs_status[slurm_job_id][0] if slurm_job_id in latest_slurm_job_ids: latest_slurm_job_id = latest_slurm_job_ids[slurm_job_id] - slurm_status = latest_slurm_jobs_status[latest_slurm_job_id] + slurm_status = latest_slurm_jobs_status[latest_slurm_job_id][0] progress = progress_list[i] progress = progress if progress is not None else "unknown" execution_state = SlurmExecutor._map_slurm_state_to_execution_state( @@ -984,11 +984,12 @@ def _query_slurm_jobs_status( username: str, hostname: str, socket: str | None, -) -> Dict[str, str]: +) -> Dict[str, tuple[str, str]]: """Query SLURM for job statuses using squeue (for active jobs) and sacct (fallback). This function first tries squeue which is more accurate for currently running jobs, then falls back to sacct for completed/historical jobs that squeue doesn't show. + It also finds follow-up jobs (from autoresume) that depend on our known jobs. Args: slurm_job_ids: List of SLURM job IDs to query. @@ -997,7 +998,7 @@ def _query_slurm_jobs_status( socket: control socket location or None Returns: - Dict mapping from slurm_job_id to returned slurm status. + Dict mapping from slurm_job_id to tuple of status, current_job_id. """ if len(slurm_job_ids) == 0: return {} @@ -1023,9 +1024,16 @@ def _query_squeue_for_jobs( username: str, hostname: str, socket: str | None, -) -> Dict[str, str]: +) -> Dict[str, tuple[str, str]]: """Query SLURM for active job statuses using squeue command. + This function finds: + 1. Jobs that directly match our known job IDs + 2. Follow-up jobs that depend on our known job IDs (from autoresume mechanism) + + For follow-up jobs, returns the status mapped to the original job ID, along with + the actual current SLURM job ID. + Args: slurm_job_ids: List of SLURM job IDs to query. username: SSH username. @@ -1033,13 +1041,13 @@ def _query_squeue_for_jobs( socket: control socket location or None Returns: - Dict mapping from slurm_job_id to returned slurm status for active jobs only. + Dict mapping from original slurm_job_id to tuple of status, current_job_id. """ if len(slurm_job_ids) == 0: return {} # Use squeue to get active jobs - more accurate than sacct for running jobs - squeue_command = "squeue -u {} -h -o '%i %T'".format(username) + squeue_command = "squeue -u {} -h -o '%i|%T|%E'".format(username) ssh_command = ["ssh"] if socket is not None: @@ -1055,6 +1063,7 @@ def _query_squeue_for_jobs( ) squeue_statuses = {} + dependent_jobs = [] if completed_process.returncode == 0: squeue_output = completed_process.stdout.decode("utf-8") squeue_output_lines = squeue_output.strip().split("\n") @@ -1062,14 +1071,23 @@ def _query_squeue_for_jobs( for line in squeue_output_lines: if not line.strip(): continue - parts = line.split() - if len(parts) >= 2: - job_id = parts[0] - status = parts[1] + parts = line.split("|") + if len(parts) >= 3: + job_id = parts[0].strip() + status = parts[1].strip() + dependency = parts[2].strip() # Extract base job ID (handle array jobs like 123456_0 -> 123456) base_job_id = job_id.split("_")[0].split("[")[0] if base_job_id in slurm_job_ids: - squeue_statuses[base_job_id] = status + squeue_statuses[base_job_id] = status, base_job_id + elif dependency and dependency != "(null)": + dependent_jobs.append((base_job_id, status, dependency)) + + for dep_job_id, dep_status, dependency in dependent_jobs: + for known_job_id in slurm_job_ids: + if known_job_id in dependency and known_job_id not in squeue_statuses: + squeue_statuses[known_job_id] = dep_status, dep_job_id + break return squeue_statuses @@ -1079,7 +1097,7 @@ def _query_sacct_for_jobs( username: str, hostname: str, socket: str | None, -) -> Dict[str, str]: +) -> Dict[str, tuple[str, str]]: """Query SLURM for job statuses using sacct command (for completed/historical jobs). Args: @@ -1089,7 +1107,7 @@ def _query_sacct_for_jobs( socket: control socket location or None Returns: - Dict mapping from slurm_job_id to returned slurm status. + Dict mapping from slurm_job_id to tuple of status, job_id. """ if len(slurm_job_ids) == 0: return {} @@ -1120,7 +1138,7 @@ def _query_sacct_for_jobs( slurm_jobs_status = {} for slurm_job_id in slurm_job_ids: slurm_job_status = _parse_slurm_job_status(slurm_job_id, sacct_output_lines) - slurm_jobs_status[slurm_job_id] = slurm_job_status + slurm_jobs_status[slurm_job_id] = slurm_job_status, slurm_job_id return slurm_jobs_status diff --git a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py index 6500ff10d..47f7cf599 100644 --- a/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py +++ b/packages/nemo-evaluator-launcher/tests/unit_tests/test_slurm_executor.py @@ -1255,7 +1255,7 @@ def test_query_slurm_for_status_and_progress_basic(self): ) as mock_progress, ): mock_open.return_value = "/tmp/socket" - mock_query_status.return_value = {"123456789": "COMPLETED"} + mock_query_status.return_value = {"123456789": ("COMPLETED", "123456789")} mock_autoresume.return_value = {"123456789": ["123456789"]} mock_progress.return_value = [0.8] @@ -1300,8 +1300,10 @@ def test_query_slurm_for_status_and_progress_autoresumed(self): mock_open.return_value = "/tmp/socket" # Initial job was preempted, latest job is running mock_query_status.side_effect = [ - {"123456789": "PREEMPTED"}, # Original job status - {"123456790": "RUNNING"}, # Latest autoresumed job status + {"123456789": ("PREEMPTED", "123456789")}, # Original job status + { + "123456790": ("RUNNING", "123456790") + }, # Latest autoresumed job status ] # Autoresume shows there's a newer job ID mock_autoresume.return_value = {"123456789": ["123456789", "123456790"]} @@ -1346,7 +1348,7 @@ def test_query_slurm_for_status_and_progress_unknown_progress(self): ) as mock_progress, ): mock_open.return_value = "/tmp/socket" - mock_query_status.return_value = {"123456789": "RUNNING"} + mock_query_status.return_value = {"123456789": ("RUNNING", "123456789")} mock_autoresume.return_value = {"123456789": ["123456789"]} mock_progress.return_value = [None] # Unknown progress @@ -1694,14 +1696,27 @@ def test_query_slurm_jobs_status_success(self): ) def mock_subprocess_run(*args, **kwargs): - """Mock subprocess.run for sacct command.""" - # Mock sacct output - return Mock( - returncode=0, - stdout=b"123456789|COMPLETED\n123456790|RUNNING\n", - stderr=b"", + """Mock subprocess.run for squeue and sacct commands.""" + cmd_args = kwargs.get("args", []) + if not cmd_args: + return Mock(returncode=1, stdout=b"", stderr=b"") + + cmd_str = ( + " ".join(cmd_args) if isinstance(cmd_args, list) else str(cmd_args) ) + if "squeue" in cmd_str: + # Mock squeue with no active jobs (empty output) + return Mock(returncode=0, stdout=b"", stderr=b"") + elif "sacct" in cmd_str: + # Mock sacct output + return Mock( + returncode=0, + stdout=b"123456789|COMPLETED\n123456790|RUNNING\n", + stderr=b"", + ) + return Mock(returncode=1, stdout=b"", stderr=b"") + with patch("subprocess.run", side_effect=mock_subprocess_run): result = _query_slurm_jobs_status( slurm_job_ids=["123456789", "123456790"], @@ -1710,7 +1725,8 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) - assert result == {"123456789": "COMPLETED", "123456790": "RUNNING"} + assert result["123456789"] == ("COMPLETED", "123456789") + assert result["123456790"] == ("RUNNING", "123456790") def test_query_slurm_jobs_status_failure(self): """Test _query_slurm_jobs_status function with failed subprocess call.""" @@ -1744,7 +1760,7 @@ def mock_subprocess_run(*args, **kwargs): # Mock squeue output with various job formats return Mock( returncode=0, - stdout=b"123456789 RUNNING\n123456790_0 PENDING\n123456791[1-10] PENDING\n", + stdout=b"123456789|RUNNING|\n123456790_0|PENDING|(null)\n123456791[1-10]|PENDING|\n", stderr=b"", ) @@ -1756,11 +1772,41 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) - assert result == { - "123456789": "RUNNING", - "123456790": "PENDING", - "123456791": "PENDING", - } + assert result["123456789"] == ("RUNNING", "123456789") + assert result["123456790"] == ("PENDING", "123456790") + assert result["123456791"] == ("PENDING", "123456791") + + def test_query_squeue_for_jobs_finds_dependent_jobs(self): + """Test that _query_squeue_for_jobs finds follow-up jobs that depend on known jobs.""" + from nemo_evaluator_launcher.executors.slurm.executor import ( + _query_squeue_for_jobs, + ) + + def mock_subprocess_run(*args, **kwargs): + """Mock subprocess.run for squeue command with dependent jobs.""" + # Simulate: job 123456789 has finished (not in squeue), + # but job 123456790 is PENDING with dependency on 123456789 + return Mock( + returncode=0, + stdout=b"123456790|PENDING|afternotok:123456789\n123456791|RUNNING|(null)\n", + stderr=b"", + ) + + with patch("subprocess.run", side_effect=mock_subprocess_run): + result = _query_squeue_for_jobs( + slurm_job_ids=["123456789", "123456791"], + username="testuser", + hostname="slurm.example.com", + socket="/tmp/socket", + ) + assert result["123456789"] == ( + "PENDING", + "123456790", + ) # Should find 123456789's status via its dependent job 123456790 + assert result["123456791"] == ( + "RUNNING", + "123456791", + ) # Direct match for 123456791 def test_query_slurm_jobs_status_combined_approach(self): """Test _query_slurm_jobs_status using combined squeue + sacct approach.""" @@ -1783,7 +1829,7 @@ def mock_subprocess_run(*args, **kwargs): # Mock squeue showing only running jobs return Mock( returncode=0, - stdout=b"123456789 RUNNING\n", + stdout=b"123456789|RUNNING|(null)\n", stderr=b"", ) elif "sacct" in cmd_str: @@ -1804,7 +1850,8 @@ def mock_subprocess_run(*args, **kwargs): ) # Should get running job from squeue and completed job from sacct - assert result == {"123456789": "RUNNING", "123456790": "COMPLETED"} + assert result["123456789"] == ("RUNNING", "123456789") + assert result["123456790"] == ("COMPLETED", "123456790") def test_query_sacct_for_jobs_success(self): """Test _query_sacct_for_jobs function with successful subprocess call.""" @@ -1828,7 +1875,10 @@ def mock_subprocess_run(*args, **kwargs): socket="/tmp/socket", ) - assert result == {"123456789": "COMPLETED", "123456790": "FAILED"} + assert result == { + "123456789": ("COMPLETED", "123456789"), + "123456790": ("FAILED", "123456790"), + } def test_sbatch_remote_runsubs_success(self): """Test _sbatch_remote_runsubs function with successful subprocess call."""