Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
e6523f4
Add first draft of resuming training from latest checkpoint
mina-parham Oct 22, 2025
50c58af
Ruff and add resume_from_checkpoint endpoint
mina-parham Oct 23, 2025
771be20
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Oct 28, 2025
976baea
Ruff
mina-parham Oct 28, 2025
841c0a7
Move resume checkpoint endpoint to remote.py
mina-parham Oct 28, 2025
cdd5f55
Add resume from checkpoint endpoint
mina-parham Oct 28, 2025
90677be
Ruff
mina-parham Oct 28, 2025
0f29b56
Ruff
mina-parham Oct 28, 2025
d0c472a
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Oct 29, 2025
42273eb
Fix the bug related to passing the checkpoint
mina-parham Oct 29, 2025
2f8844d
Use HTTPException with proper status codes (400/404/500) instead of …
mina-parham Oct 29, 2025
9e4a615
use parent job data and proper error handling
mina-parham Oct 30, 2025
e54f02f
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Oct 30, 2025
0cfa5f8
Ruff
mina-parham Oct 30, 2025
0909b8d
Merge branch 'add/resume-training-checkpoint' of https://github.com/t…
mina-parham Oct 30, 2025
eedada5
Debug
mina-parham Oct 30, 2025
cb59a9a
Debug
mina-parham Oct 30, 2025
4f19401
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Oct 30, 2025
d2dc3ad
Debug
mina-parham Oct 30, 2025
bc30c47
Merge branch 'add/resume-training-checkpoint' of https://github.com/t…
mina-parham Oct 30, 2025
276a028
Clean up resume_from_checkpoint debugging code
mina-parham Oct 30, 2025
139edb6
Fix checkpoint resume functionality for remote training jobs
mina-parham Oct 30, 2025
e59f0cb
Fix checkpoint resume: add --resume_from_checkpoint to python command
mina-parham Oct 30, 2025
e3813f1
Refactor resume_from_checkpoint to reuse launch logic
mina-parham Oct 31, 2025
b6c91f7
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Oct 31, 2025
3254e93
Ruff
mina-parham Oct 31, 2025
acc7215
Merge branch 'add/resume-training-checkpoint' of https://github.com/t…
mina-parham Oct 31, 2025
272e311
Fix security vulnerability
mina-parham Oct 31, 2025
4a5b703
Ruff
mina-parham Oct 31, 2025
58a9d8d
Potential fix for code scanning alert no. 513: Uncontrolled data used…
mina-parham Oct 31, 2025
d97f6f7
Merge launch_remote and resume_from_checkpoint functions
mina-parham Oct 31, 2025
120dbbf
Debug load checkpoint problem
mina-parham Oct 31, 2025
5b6f982
Remove redundant print and ruff
mina-parham Oct 31, 2025
5238ca9
Fix resume training validation for missing cluster_name
mina-parham Oct 31, 2025
0fb8b50
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Nov 3, 2025
df85fea
Pass checkpoint metadata via env vars
mina-parham Nov 3, 2025
c70bab4
Merge branch 'add/resume-training-checkpoint' of https://github.com/t…
mina-parham Nov 3, 2025
d545d82
Bump the version of sdk into 0.0.41
mina-parham Nov 3, 2025
b453102
Add logs for debugging
mina-parham Nov 3, 2025
321403e
Restore stream_logs from main
mina-parham Nov 3, 2025
8806ff8
Restore stream_logs from main
mina-parham Nov 3, 2025
e0246cf
Restore stream_logs from main
mina-parham Nov 3, 2025
b8d237d
Ruff
mina-parham Nov 3, 2025
bf7fed0
Remove prints
mina-parham Nov 3, 2025
a6d715b
Merge job_data and request_data
mina-parham Nov 3, 2025
85310c0
stores parent_job_id and resumed_from_checkpoint in new job's data
mina-parham Nov 3, 2025
51bda4f
Remove comments
mina-parham Nov 3, 2025
c100901
Merge branch 'main' into add/resume-training-checkpoint
mina-parham Nov 7, 2025
6a3694a
Merge branch 'main' into add/resume-training-checkpoint
deep1401 Nov 10, 2025
15472e2
merge conflict
mina-parham Nov 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions transformerlab/routers/experiment/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,3 +696,4 @@ async def get_artifacts(job_id: str, request: Request):
artifacts.sort(key=lambda x: x["filename"], reverse=True)

return {"artifacts": artifacts}

134 changes: 132 additions & 2 deletions transformerlab/routers/remote.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import os
import httpx
from fastapi import APIRouter, Form, Request, File, UploadFile
import json
import re
from fastapi import APIRouter, Form, Request, File, UploadFile, HTTPException
from typing import Optional, List
from transformerlab.services import job_service
from transformerlab.services.job_service import job_update_status
from lab.dirs import get_workspace_dir, get_job_checkpoints_dir


router = APIRouter(prefix="/remote", tags=["remote"])
Expand Down Expand Up @@ -190,6 +193,22 @@
job_id, "cluster_name", response_data["cluster_name"], experimentId
)

# Save the launch_config for potential resume
launch_config = {
"command": command,
"setup": setup,
"cluster_name": cluster_name,
"cpus": cpus,
"memory": memory,
"disk_space": disk_space,
"accelerators": accelerators,
"num_nodes": num_nodes,
"uploaded_dir_path": uploaded_dir_path,
}
job_service.job_update_job_data_insert_key_value(
job_id, "launch_config", json.dumps(launch_config), experimentId
)

return {
"status": "success",
"data": response_data,
Expand Down Expand Up @@ -284,7 +303,6 @@
Upload a directory to the remote Lattice orchestrator for later use in cluster launches.
Files are stored locally first, then sent to orchestrator.
"""
from lab.dirs import get_workspace_dir

# Validate environment variables
result = validate_gpu_orchestrator_env_vars()
Expand Down Expand Up @@ -559,3 +577,115 @@
except Exception as e:
print(f"Error checking remote job status: {str(e)}")
return {"status": "error", "message": "Error checking remote job status"}


@router.post("/{job_id}/resume_from_checkpoint")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a doubt as to what is the difference between using this and the launch/remote?
Maybe we just add extra params there and use that so we dont have duplicate routes? But if there is extra logic in here then please let me know

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged these two routes

async def resume_from_checkpoint(
job_id: str,
request: Request,
):
"""Resume training from a checkpoint by creating a new job with the checkpoint parameter"""
try:
body = await request.json()
checkpoint_name = body.get("checkpoint_name")
experimentId = body.get("experimentId")

if not checkpoint_name:
raise HTTPException(status_code=400, detail="checkpoint_name is required")
if not experimentId:
raise HTTPException(status_code=400, detail="experimentId is required")

# Get the original job
original_job = job_service.job_get(job_id)
if not original_job:
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")

original_job_data = original_job.get("job_data", {})

# Get the launch_config from the original job
launch_config_str = original_job_data.get("launch_config")
if not launch_config_str:
raise HTTPException(status_code=400, detail="launch_config not found in original job")

# Parse the launch_config JSON string
try:
launch_config = json.loads(launch_config_str)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid launch_config format")

# Get the checkpoint directory path
checkpoints_dir = get_job_checkpoints_dir(job_id)
checkpoint_path = os.path.join(checkpoints_dir, checkpoint_name)

if not os.path.exists(checkpoint_path):
raise HTTPException(status_code=404, detail=f"Checkpoint {checkpoint_name} not found at {checkpoint_path}")

# Modify the command to include the checkpoint parameter
original_command = launch_config.get("command", "")
if not original_command:
raise HTTPException(status_code=400, detail="Original command not found in launch_config")

# Parse the command to find the python execution part and add --resume_from_checkpoint
command_parts = original_command.split(' && ')
python_part_index = None
for i, part in enumerate(command_parts):
if part.strip().startswith('python '):
python_part_index = i
break

if python_part_index is None:
raise HTTPException(status_code=400, detail="Could not find python execution command in original command")

# Modify the python part to include --resume_from_checkpoint
python_command = command_parts[python_part_index].strip()

# Check if --resume_from_checkpoint already exists
if "--resume_from_checkpoint" in python_command:
# Replace existing checkpoint parameter
resume_python_command = re.sub(
r'--resume_from_checkpoint\s+\S+',
f'--resume_from_checkpoint {checkpoint_path}',
python_command
)
else:
# Add the checkpoint parameter
resume_python_command = f"{python_command} --resume_from_checkpoint {checkpoint_path}"

# Replace the python part in the command_parts
command_parts[python_part_index] = resume_python_command

# Rejoin the command parts
resume_command = ' && '.join(command_parts)
# Call launch_remote to create and launch the new job
launch_result = await launch_remote(
request=request,
experimentId=experimentId,
cluster_name=launch_config.get("cluster_name"),
command=resume_command,
task_name=f"Resume from {checkpoint_name}",
cpus=launch_config.get("cpus"),
memory=launch_config.get("memory"),
disk_space=launch_config.get("disk_space"),
accelerators=launch_config.get("accelerators"),
num_nodes=launch_config.get("num_nodes"),
setup=launch_config.get("setup"),
uploaded_dir_path=launch_config.get("uploaded_dir_path"),
)

if launch_result.get("status") == "success":
new_job_id = launch_result.get("job_id")
# Update the new job's job_data to mark it as resumed
job_service.job_update_job_data_insert_key_value(new_job_id, "resumed_from_checkpoint", checkpoint_name, experimentId)
job_service.job_update_job_data_insert_key_value(new_job_id, "parent_job_id", job_id, experimentId)
return {
"status": "success",
"job_id": new_job_id,
"message": f"Training resumed from checkpoint {checkpoint_name}",
"data": launch_result
}
else:
raise HTTPException(status_code=500, detail=f"Failed to launch remote job: {launch_result.get('message')}")

except Exception as e:
print(f"Error checking remote job status: {str(e)}")
return {"status": "error", "message": "Error checking remote job status"}
Loading