Skip to content

Commit 2f012cf

Browse files
committed
Upload files with rsync
Uploading files with rsync allows the files to retain their existing permissions. This means files that are executable locally will also be executable on the remote storage. Signed-Off-By: Robert Clark <[email protected]>
1 parent 39b3770 commit 2f012cf

File tree

1 file changed

+59
-19
lines changed

1 file changed

+59
-19
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
1313
from leptonai.api.v1.types.common import Metadata
1414
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
15-
from leptonai.api.v1.types.deployment import EnvVar, LeptonContainer, Mount
15+
from leptonai.api.v1.types.deployment import (EnvVar, LeptonContainer,
16+
LeptonDeployment, Mount)
1617
from leptonai.api.v1.types.job import (LeptonJob, LeptonJobState,
1718
LeptonJobUserSpec)
1819
from leptonai.api.v1.types.replica import Replica
@@ -24,6 +25,8 @@
2425

2526
logger = logging.getLogger(__name__)
2627

28+
LEPTON_STORAGE_DEPLOYMENT = "storage-rsync-by-lepton"
29+
2730

2831
@dataclass(kw_only=True)
2932
class LeptonExecutor(Executor):
@@ -63,29 +66,66 @@ def stop_job(self, job_id: str):
6366
client.job.update(job_id, spec={"spec": {"stopped": True}})
6467
logger.info(f"Job {job_id} stopped successfully.")
6568

69+
def _replica_public_ip(self, name: str, client: APIClient) -> str | None:
70+
"""
71+
Find the public IP address for a given replica.
72+
"""
73+
replicas = client.deployment.get_replicas(name)
74+
75+
if len(replicas) != 1:
76+
raise RuntimeError(f"Pod {name} has more than one replica. This is not supported.")
77+
if not replicas[0].status:
78+
raise RuntimeError(f"Unable to find status of pod {name}.")
79+
return replicas[0].status.public_ip
80+
81+
def _rsync_password(self, deployment: LeptonDeployment) -> str:
82+
"""
83+
Find the rsync password on the cluster.
84+
"""
85+
for env in deployment.spec.envs:
86+
if env.name == "PASSWORD":
87+
return env.value
88+
raise RuntimeError("Unable to find rsync password on cluster.")
89+
90+
6691
def move_data(self, sleep: float = 10) -> None:
6792
"""
68-
Moves job directory into PVC and deletes the workload after completion
93+
Moves job directory into PVC and deletes the workload after completion.
94+
95+
Referenced from the LeptonAI Python SDK: https://github.com/leptonai/leptonai/blob/main/leptonai/cli/storage.py
6996
"""
7097
client = APIClient()
7198
client.storage.create_dir(additional_path=self.lepton_job_dir)
7299

73-
# Create all sub-directories in the directory tree
74-
# Then, copy all files to the storage
75-
for root, dirs, files in os.walk(self.job_dir):
76-
# Create the sub-directories
77-
for dir in dirs:
78-
abs_path = os.path.join(root, dir)
79-
relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/"))
80-
client.storage.create_dir(additional_path=relative_path)
81-
# Copy the files in each sub-directory to the remote filesystem
82-
for file in files:
83-
abs_path = os.path.join(root, file)
84-
relative_path = os.path.join(self.lepton_job_dir, abs_path.replace(self.job_dir, "").lstrip("/"))
85-
client.storage.create_file(
86-
local_path=abs_path,
87-
remote_path=relative_path
88-
)
100+
# Lepton comes with a hidden storage deplyment specifically for rsync'ing
101+
# data to storage with a deterministic name. This deployment can be used
102+
# for transferring data to the storage.
103+
storage_deployment = client.deployment.get(LEPTON_STORAGE_DEPLOYMENT)
104+
105+
if not storage_deployment:
106+
raise RuntimeError("Rsync deployment not found on cluster. Ensure your cluster supports rsync.")
107+
port = storage_deployment.spec.container.ports[0].host_port
108+
ip = self._replica_public_ip(LEPTON_STORAGE_DEPLOYMENT, client)
109+
workspace_id = client.get_workspace_id()
110+
111+
password = self._rsync_password(storage_deployment)
112+
113+
command = (
114+
f"rsync -a {self.job_dir}/ rsync://{workspace_id}@{ip}:{port}/volume{self.lepton_job_dir}"
115+
)
116+
117+
process = subprocess.Popen(
118+
command,
119+
stdout=subprocess.PIPE,
120+
stderr=subprocess.STDOUT,
121+
env={"RSYNC_PASSWORD": password},
122+
shell=True,
123+
universal_newlines=True
124+
)
125+
126+
for line in process.stdout:
127+
print(line, end="")
128+
process.wait()
89129

90130
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
91131
"""
@@ -127,7 +167,7 @@ def create_lepton_job(self, name: str):
127167
cmd = [
128168
"/bin/bash",
129169
"-c",
130-
f"chmod +x {self.lepton_job_dir}/launch_script.sh && bash {self.lepton_job_dir}/launch_script.sh"
170+
f"bash {self.lepton_job_dir}/launch_script.sh"
131171
]
132172

133173
# Get ID of requested node group

0 commit comments

Comments
 (0)