Skip to content

Commit fe4ef3f

Browse files
committed
Use a low-resource pod to move data to Lepton
Running a low-resource pod to copy experiment data to the Lepton cluster is more reliable and broadly compatible with various cluster types versus the storage API. Signed-Off-By: Robert Clark <[email protected]>
1 parent 7d3d685 commit fe4ef3f

File tree

1 file changed

+60
-27
lines changed

1 file changed

+60
-27
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 60 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
1+
import base64
12
import logging
23
import os
34
import re
45
import subprocess
6+
import tempfile
57
import time
68
from dataclasses import dataclass, field
9+
from datetime import datetime
710
from pathlib import Path
8-
from typing import Any, Optional, Set, Type
11+
from typing import Any, List, Optional, Set, Type
912

1013
from invoke.context import Context
1114
from leptonai.api.v1.client import APIClient
1215
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
13-
from leptonai.api.v1.types.common import Metadata
16+
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
1417
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
15-
from leptonai.api.v1.types.deployment import EnvVar, LeptonContainer, Mount
18+
from leptonai.api.v1.types.deployment import (
19+
EnvVar,
20+
LeptonContainer,
21+
LeptonDeployment,
22+
LeptonDeploymentUserSpec,
23+
Mount,
24+
ResourceRequirement,
25+
)
1626
from leptonai.api.v1.types.job import LeptonJob, LeptonJobState, LeptonJobUserSpec
1727
from leptonai.api.v1.types.replica import Replica
1828

@@ -62,29 +72,56 @@ def stop_job(self, job_id: str):
6272
client.job.update(job_id, spec={"spec": {"stopped": True}})
6373
logger.info(f"Job {job_id} stopped successfully.")
6474

75+
def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> List:
76+
with tempfile.TemporaryDirectory() as temp_dir:
77+
tarball_path = os.path.join(temp_dir, "archive.tar.gz")
78+
subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True)
79+
with open(tarball_path, "rb") as file:
80+
file_data = file.read()
81+
encoded_data = base64.b64encode(file_data).decode("utf-8")
82+
83+
# Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod
84+
cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz"
85+
full_command = ["sh", "-c", cmd]
86+
return full_command
87+
6588
def move_data(self, sleep: float = 10) -> None:
6689
"""
67-
Moves job directory into PVC and deletes the workload after completion
90+
Moves job directory into remote storage and deletes the workload after completion.
6891
"""
6992
client = APIClient()
70-
client.storage.create_dir(additional_path=self.lepton_job_dir)
71-
72-
# Create all sub-directories in the directory tree
73-
# Then, copy all files to the storage
74-
for root, dirs, files in os.walk(self.job_dir):
75-
# Create the sub-directories
76-
for dir in dirs:
77-
abs_path = os.path.join(root, dir)
78-
relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/"))
79-
client.storage.create_dir(additional_path=relative_path)
80-
# Copy the files in each sub-directory to the remote filesystem
81-
for file in files:
82-
abs_path = os.path.join(root, file)
83-
relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/"))
84-
client.storage.create_file(
85-
local_path=abs_path,
86-
remote_path=relative_path
87-
)
93+
cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir)
94+
node_group_id = self._node_group_id(client)
95+
valid_node_ids = self._valid_node_ids(node_group_id, client)
96+
97+
spec = LeptonDeploymentUserSpec(
98+
container=LeptonContainer(
99+
image="busybox:1.37.0", # Use a very low resource container
100+
command=cmd,
101+
),
102+
mounts=[Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts],
103+
)
104+
spec.resource_requirement = ResourceRequirement(
105+
resource_shape="cpu.small",
106+
affinity=LeptonResourceAffinity(
107+
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
108+
allowed_nodes_in_node_group=valid_node_ids,
109+
),
110+
min_replicas=1,
111+
max_replicas=1,
112+
)
113+
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
114+
115+
deployment = LeptonDeployment(
116+
metadata=Metadata(
117+
id=custom_name,
118+
name=custom_name,
119+
visibility=LeptonVisibility("private"),
120+
),
121+
spec=spec,
122+
)
123+
124+
client.deployment.create(deployment)
88125

89126
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
90127
"""
@@ -121,11 +158,7 @@ def create_lepton_job(self, name: str):
121158

122159
envs = [EnvVar(name=key, value=value) for key, value in self.env_vars.items()]
123160

124-
cmd = [
125-
"/bin/bash",
126-
"-c",
127-
f"chmod +x {self.lepton_job_dir}/launch_script.sh && bash {self.lepton_job_dir}/launch_script.sh"
128-
]
161+
cmd = ["/bin/bash", "-c", f"bash {self.lepton_job_dir}/launch_script.sh"]
129162

130163
# Get ID of requested node group
131164
node_group_id = self._node_group_id(client)

0 commit comments

Comments
 (0)