From c021869e4d74384068504c213231054eba6f5590 Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 11:51:58 -0500 Subject: [PATCH 1/7] dd checkpoint resume env vars to remote instances --- src/lattice/routes/instances/routes.py | 8 ++++++++ src/lattice/routes/instances/utils.py | 21 +++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/lattice/routes/instances/routes.py b/src/lattice/routes/instances/routes.py index 541949b..21b6fbb 100644 --- a/src/lattice/routes/instances/routes.py +++ b/src/lattice/routes/instances/routes.py @@ -191,6 +191,8 @@ async def launch_instance( experiment_id: Optional[str] = Form(None), job_name: Optional[str] = Form(None), tlab_job_id: Optional[str] = Form(None), + tlab_parent_job_id: Optional[str] = Form(None), + tlab_checkpoint_name: Optional[str] = Form(None), yaml_file: Optional[UploadFile] = File(None), user: dict = Depends(get_user_or_api_key), db: Session = Depends(get_db), @@ -437,6 +439,12 @@ async def launch_instance( # Set _TFL_JOB_ID environment variable if tlab_job_id is provided if tlab_job_id: hook_env_vars["_TFL_JOB_ID"] = tlab_job_id + + # Set checkpoint-related environment variables for training resume + if tlab_parent_job_id: + hook_env_vars["_TFL_PARENT_JOB_ID"] = tlab_parent_job_id + if tlab_checkpoint_name: + hook_env_vars["_TFL_CHECKPOINT_NAME"] = tlab_checkpoint_name # Pre-calculate requested GPU count and preserve selected RunPod option for pricing # (RunPod mapping below may clear 'accelerators') diff --git a/src/lattice/routes/instances/utils.py b/src/lattice/routes/instances/utils.py index 737873a..5ee8a5d 100644 --- a/src/lattice/routes/instances/utils.py +++ b/src/lattice/routes/instances/utils.py @@ -11,6 +11,7 @@ from fastapi import HTTPException from routes.clouds.azure.utils import az_get_current_config from routes.jobs.utils import get_cluster_job_queue, save_cluster_jobs +from routes.node_pools.utils import is_ssh_cluster from utils.cluster_utils import ( get_cluster_platform_info as get_cluster_platform_info_util, ) @@ -1054,6 +1055,26 @@ def down_cluster_with_skypilot( request_id = sky.down(cluster_name=cluster_name, credentials=credentials) + # For SSH clusters, also clean up any lingering Kubernetes processes + if platform_info and platform_info.get("platform"): + platform = platform_info["platform"] + if platform != cluster_name and is_ssh_cluster(platform): + try: + print(f"Cleaning up Kubernetes processes for SSH cluster: {platform}") + # Kill any k3s or kubernetes processes that might be left running + import subprocess + try: + # Kill k3s processes + subprocess.run(["pkill", "-f", "k3s"], check=False, capture_output=True) + # Kill any processes listening on common Kubernetes ports + subprocess.run(["pkill", "-f", "kube-apiserver"], check=False, capture_output=True) + subprocess.run(["pkill", "-f", "etcd"], check=False, capture_output=True) + print("Cleaned up Kubernetes processes") + except Exception as e: + print(f"Warning: Failed to kill Kubernetes processes: {e}") + except Exception as e: + print(f"Warning: Failed to clean up SSH infrastructure {platform}: {e}") + # Store the request in the database if user info is provided if user_id and organization_id: try: From ad1fef97f0dbcb7f2f2169543913f20566439bbd Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 12:34:25 -0500 Subject: [PATCH 2/7] Remove redundant part --- src/lattice/routes/instances/utils.py | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/src/lattice/routes/instances/utils.py b/src/lattice/routes/instances/utils.py index 5ee8a5d..737873a 100644 --- a/src/lattice/routes/instances/utils.py +++ b/src/lattice/routes/instances/utils.py @@ -11,7 +11,6 @@ from fastapi import HTTPException from routes.clouds.azure.utils import az_get_current_config from routes.jobs.utils import get_cluster_job_queue, save_cluster_jobs -from routes.node_pools.utils import is_ssh_cluster from utils.cluster_utils import ( get_cluster_platform_info as get_cluster_platform_info_util, ) @@ -1055,26 +1054,6 @@ def down_cluster_with_skypilot( request_id = sky.down(cluster_name=cluster_name, credentials=credentials) - # For SSH clusters, also clean up any lingering Kubernetes processes - if platform_info and platform_info.get("platform"): - platform = platform_info["platform"] - if platform != cluster_name and is_ssh_cluster(platform): - try: - print(f"Cleaning up Kubernetes processes for SSH cluster: {platform}") - # Kill any k3s or kubernetes processes that might be left running - import subprocess - try: - # Kill k3s processes - subprocess.run(["pkill", "-f", "k3s"], check=False, capture_output=True) - # Kill any processes listening on common Kubernetes ports - subprocess.run(["pkill", "-f", "kube-apiserver"], check=False, capture_output=True) - subprocess.run(["pkill", "-f", "etcd"], check=False, capture_output=True) - print("Cleaned up Kubernetes processes") - except Exception as e: - print(f"Warning: Failed to kill Kubernetes processes: {e}") - except Exception as e: - print(f"Warning: Failed to clean up SSH infrastructure {platform}: {e}") - # Store the request in the database if user info is provided if user_id and organization_id: try: From ccca19fdb1bfccdc729a2e468e027e77337a59d5 Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 15:47:52 -0500 Subject: [PATCH 3/7] Add logs for debuging --- frontend/src/components/pages/MyNodes/Jobs.tsx | 4 +++- src/lattice/routes/instances/utils.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/frontend/src/components/pages/MyNodes/Jobs.tsx b/frontend/src/components/pages/MyNodes/Jobs.tsx index df3b97b..efff128 100644 --- a/frontend/src/components/pages/MyNodes/Jobs.tsx +++ b/frontend/src/components/pages/MyNodes/Jobs.tsx @@ -164,7 +164,9 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { clusterWithJobs.jobsError = `Failed to fetch jobs: ${response.statusText}`; } } catch (err) { - clusterWithJobs.jobsError = "Failed to fetch jobs"; + const errorMessage = err instanceof Error ? err.message : "Failed to fetch jobs"; + clusterWithJobs.jobsError = errorMessage; + console.error(`Error fetching jobs for cluster ${cluster.cluster_name}:`, err); } finally { clusterWithJobs.jobsLoading = false; } diff --git a/src/lattice/routes/instances/utils.py b/src/lattice/routes/instances/utils.py index 737873a..aed8029 100644 --- a/src/lattice/routes/instances/utils.py +++ b/src/lattice/routes/instances/utils.py @@ -321,6 +321,13 @@ def launch_cluster_with_skypilot( envs = env_vars.copy() else: envs.update(env_vars) + + # Log environment variables for debugging (especially checkpoint resume vars) + if envs: + checkpoint_env_keys = ["_TFL_PARENT_JOB_ID", "_TFL_CHECKPOINT_NAME", "_TFL_JOB_ID"] + checkpoint_envs = {k: v for k, v in envs.items() if k in checkpoint_env_keys} + if checkpoint_envs: + print(f"[SkyPilot] Checkpoint-related env vars: {checkpoint_envs}") # Use job_name if provided, otherwise use default name = job_name if job_name else "lattice-task-setup" @@ -335,6 +342,11 @@ def launch_cluster_with_skypilot( except Exception: effective_num_nodes = 1 + # Log command details for debugging + print(f"[SkyPilot] Creating task with name: {name}") + print(f"[SkyPilot] Command length: {len(command)} chars") + print(f"[SkyPilot] Command preview: {command[:200]}..." if len(command) > 200 else f"[SkyPilot] Command: {command}") + task = sky.Task( name=name, run=command, From aa5769cf0b52010c745fdb2dd265e69de8fe889c Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 16:01:47 -0500 Subject: [PATCH 4/7] Remove redudant log --- .../src/components/pages/MyNodes/Jobs.tsx | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/frontend/src/components/pages/MyNodes/Jobs.tsx b/frontend/src/components/pages/MyNodes/Jobs.tsx index dd8cb43..df3b97b 100644 --- a/frontend/src/components/pages/MyNodes/Jobs.tsx +++ b/frontend/src/components/pages/MyNodes/Jobs.tsx @@ -107,7 +107,7 @@ interface JobsProps { const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { const [clustersWithJobs, setClustersWithJobs] = useState( - [], + [] ); const [pastJobClusters, setPastJobClusters] = useState([]); const [pastJobsLoading, setPastJobsLoading] = useState(false); @@ -154,7 +154,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/${cluster.cluster_name}`), - { credentials: "include" }, + { credentials: "include" } ); if (response.ok) { @@ -164,9 +164,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { clusterWithJobs.jobsError = `Failed to fetch jobs: ${response.statusText}`; } } catch (err) { - const errorMessage = err instanceof Error ? err.message : "Failed to fetch jobs"; - clusterWithJobs.jobsError = errorMessage; - console.error(`Error fetching jobs for cluster ${cluster.cluster_name}:`, err); + clusterWithJobs.jobsError = "Failed to fetch jobs"; } finally { clusterWithJobs.jobsLoading = false; } @@ -210,7 +208,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/past-jobs/${clusterName}/${jobId}/logs`), - { credentials: "include" }, + { credentials: "include" } ); if (response.ok) { @@ -240,7 +238,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/${cluster.cluster_name}`), - { credentials: "include" }, + { credentials: "include" } ); if (response.ok) { @@ -256,11 +254,11 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { } catch (err) { console.error( `Error monitoring jobs for cluster ${cluster.cluster_name}:`, - err, + err ); } return cluster; - }), + }) ); setClustersWithJobs(updatedClustersWithJobs); @@ -280,9 +278,9 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { // Create EventSource for Server-Sent Events const eventSource = new EventSource( buildApiUrl( - `jobs/${clusterName}/${jobId}/logs/stream?tail=1000&follow=true`, + `jobs/${clusterName}/${jobId}/logs/stream?tail=1000&follow=true` ), - { withCredentials: true }, + { withCredentials: true } ); eventSource.onopen = () => { @@ -341,7 +339,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { { method: "POST", credentials: "include", - }, + } ); if (!response.ok) { @@ -359,7 +357,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { jobs: cluster.jobs.map((job) => job.job_id === jobId ? { ...job, status: "JobStatus.CANCELLED" } - : job, + : job ), }; } @@ -772,7 +770,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { onClick={() => fetchPastJobLogs( pastCluster.cluster_name, - job.job_id, + job.job_id ) } > From 76e90b4b4e0b66f7022d9380b1931a43158129e3 Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 16:03:30 -0500 Subject: [PATCH 5/7] Remove redundant logs --- src/lattice/routes/instances/utils.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/lattice/routes/instances/utils.py b/src/lattice/routes/instances/utils.py index 18d1be0..204b70c 100644 --- a/src/lattice/routes/instances/utils.py +++ b/src/lattice/routes/instances/utils.py @@ -322,13 +322,6 @@ def launch_cluster_with_skypilot( envs = env_vars.copy() else: envs.update(env_vars) - - # Log environment variables for debugging (especially checkpoint resume vars) - if envs: - checkpoint_env_keys = ["_TFL_PARENT_JOB_ID", "_TFL_CHECKPOINT_NAME", "_TFL_JOB_ID"] - checkpoint_envs = {k: v for k, v in envs.items() if k in checkpoint_env_keys} - if checkpoint_envs: - print(f"[SkyPilot] Checkpoint-related env vars: {checkpoint_envs}") # Use job_name if provided, otherwise use default name = job_name if job_name else "lattice-task-setup" @@ -343,10 +336,6 @@ def launch_cluster_with_skypilot( except Exception: effective_num_nodes = 1 - # Log command details for debugging - print(f"[SkyPilot] Creating task with name: {name}") - print(f"[SkyPilot] Command length: {len(command)} chars") - print(f"[SkyPilot] Command preview: {command[:200]}..." if len(command) > 200 else f"[SkyPilot] Command: {command}") task = sky.Task( name=name, From 40e893df63f6fdfe4182822413c3d8db1dad743d Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 16:05:52 -0500 Subject: [PATCH 6/7] Prettier --- .../src/components/pages/MyNodes/Jobs.tsx | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/frontend/src/components/pages/MyNodes/Jobs.tsx b/frontend/src/components/pages/MyNodes/Jobs.tsx index df3b97b..2e723e3 100644 --- a/frontend/src/components/pages/MyNodes/Jobs.tsx +++ b/frontend/src/components/pages/MyNodes/Jobs.tsx @@ -107,7 +107,7 @@ interface JobsProps { const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { const [clustersWithJobs, setClustersWithJobs] = useState( - [] + [], ); const [pastJobClusters, setPastJobClusters] = useState([]); const [pastJobsLoading, setPastJobsLoading] = useState(false); @@ -154,7 +154,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/${cluster.cluster_name}`), - { credentials: "include" } + { credentials: "include" }, ); if (response.ok) { @@ -208,7 +208,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/past-jobs/${clusterName}/${jobId}/logs`), - { credentials: "include" } + { credentials: "include" }, ); if (response.ok) { @@ -238,7 +238,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { try { const response = await apiFetch( buildApiUrl(`jobs/${cluster.cluster_name}`), - { credentials: "include" } + { credentials: "include" }, ); if (response.ok) { @@ -254,11 +254,11 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { } catch (err) { console.error( `Error monitoring jobs for cluster ${cluster.cluster_name}:`, - err + err, ); } return cluster; - }) + }), ); setClustersWithJobs(updatedClustersWithJobs); @@ -278,9 +278,9 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { // Create EventSource for Server-Sent Events const eventSource = new EventSource( buildApiUrl( - `jobs/${clusterName}/${jobId}/logs/stream?tail=1000&follow=true` + `jobs/${clusterName}/${jobId}/logs/stream?tail=1000&follow=true`, ), - { withCredentials: true } + { withCredentials: true }, ); eventSource.onopen = () => { @@ -339,7 +339,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { { method: "POST", credentials: "include", - } + }, ); if (!response.ok) { @@ -357,7 +357,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { jobs: cluster.jobs.map((job) => job.job_id === jobId ? { ...job, status: "JobStatus.CANCELLED" } - : job + : job, ), }; } @@ -770,7 +770,7 @@ const Jobs: React.FC = ({ skypilotLoading, myClusters }) => { onClick={() => fetchPastJobLogs( pastCluster.cluster_name, - job.job_id + job.job_id, ) } > From 520b2e59cf29b679d8d3e14b32f6337dfa2b8b34 Mon Sep 17 00:00:00 2001 From: Mina Parham Date: Mon, 3 Nov 2025 18:26:12 -0500 Subject: [PATCH 7/7] stores parent_job_id and resumed_from_checkpoint in new job's data --- src/lattice/routes/instances/routes.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/src/lattice/routes/instances/routes.py b/src/lattice/routes/instances/routes.py index eaad903..65d9c0d 100644 --- a/src/lattice/routes/instances/routes.py +++ b/src/lattice/routes/instances/routes.py @@ -191,8 +191,6 @@ async def launch_instance( experiment_id: Optional[str] = Form(None), job_name: Optional[str] = Form(None), tlab_job_id: Optional[str] = Form(None), - tlab_parent_job_id: Optional[str] = Form(None), - tlab_checkpoint_name: Optional[str] = Form(None), disabled_mandatory_mounts: Optional[bool] = Form(False), yaml_file: Optional[UploadFile] = File(None), user: dict = Depends(get_user_or_api_key), @@ -443,11 +441,8 @@ async def launch_instance( if tlab_job_id: hook_env_vars["_TFL_JOB_ID"] = tlab_job_id - # Set checkpoint-related environment variables for training resume - if tlab_parent_job_id: - hook_env_vars["_TFL_PARENT_JOB_ID"] = tlab_parent_job_id - if tlab_checkpoint_name: - hook_env_vars["_TFL_CHECKPOINT_NAME"] = tlab_checkpoint_name + # Note: Checkpoint resume info (parent_job_id, checkpoint_name) is now stored in job_data + # and accessed directly by the SDK via lab.get_checkpoint_to_resume() # Pre-calculate requested GPU count and preserve selected RunPod option for pricing # (RunPod mapping below may clear 'accelerators')