Skip to content

Commit 6c03306

Browse files
committed
Use a low-resource pod to move data to Lepton
Running a low-resource pod to copy experiment data to the Lepton cluster is more reliable and broadly compatible with various cluster types versus the storage API. Signed-Off-By: Robert Clark <[email protected]>
1 parent 2f012cf commit 6c03306

File tree

1 file changed

+47
-42
lines changed

1 file changed

+47
-42
lines changed

nemo_run/core/execution/lepton.py

Lines changed: 47 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,24 @@
1+
import base64
12
import logging
23
import os
34
import re
45
import subprocess
6+
import tempfile
57
import time
68
from dataclasses import dataclass, field
9+
from datetime import datetime
710
from pathlib import Path
8-
from typing import Any, Optional, Set, Type
11+
from typing import Any, List, Optional, Set, Type
912

1013
from invoke.context import Context
1114
from leptonai.api.v1.client import APIClient
1215
from leptonai.api.v1.types.affinity import LeptonResourceAffinity
13-
from leptonai.api.v1.types.common import Metadata
16+
from leptonai.api.v1.types.common import LeptonVisibility, Metadata
1417
from leptonai.api.v1.types.dedicated_node_group import DedicatedNodeGroup
1518
from leptonai.api.v1.types.deployment import (EnvVar, LeptonContainer,
16-
LeptonDeployment, Mount)
19+
LeptonDeployment,
20+
LeptonDeploymentUserSpec, Mount,
21+
ResourceRequirement)
1722
from leptonai.api.v1.types.job import (LeptonJob, LeptonJobState,
1823
LeptonJobUserSpec)
1924
from leptonai.api.v1.types.replica import Replica
@@ -25,8 +30,6 @@
2530

2631
logger = logging.getLogger(__name__)
2732

28-
LEPTON_STORAGE_DEPLOYMENT = "storage-rsync-by-lepton"
29-
3033

3134
@dataclass(kw_only=True)
3235
class LeptonExecutor(Executor):
@@ -78,54 +81,56 @@ def _replica_public_ip(self, name: str, client: APIClient) -> str | None:
7881
raise RuntimeError(f"Unable to find status of pod {name}.")
7982
return replicas[0].status.public_ip
8083

81-
def _rsync_password(self, deployment: LeptonDeployment) -> str:
82-
"""
83-
Find the rsync password on the cluster.
84-
"""
85-
for env in deployment.spec.envs:
86-
if env.name == "PASSWORD":
87-
return env.value
88-
raise RuntimeError("Unable to find rsync password on cluster.")
84+
def copy_directory_data_command(self, local_dir_path: str, dest_path: str) -> List:
85+
with tempfile.TemporaryDirectory() as temp_dir:
86+
tarball_path = os.path.join(temp_dir, "archive.tar.gz")
87+
subprocess.run(f"tar -czf {tarball_path} -C {local_dir_path} .", shell=True, check=True)
88+
with open(tarball_path, "rb") as file:
89+
file_data = file.read()
90+
encoded_data = base64.b64encode(file_data).decode("utf-8")
8991

92+
# Delete and recreate directory if it already exists, command to decode base64 data, save to a file, and extract inside the pod
93+
cmd = f"rm -rf {dest_path} && mkdir -p {dest_path} && echo {encoded_data} | base64 -d > {dest_path}/archive.tar.gz && tar -xzf {dest_path}/archive.tar.gz -C {dest_path} && rm {dest_path}/archive.tar.gz"
94+
full_command = ["sh", "-c", cmd]
95+
return full_command
9096

9197
def move_data(self, sleep: float = 10) -> None:
9298
"""
93-
Moves job directory into PVC and deletes the workload after completion.
94-
95-
Referenced from the LeptonAI Python SDK: https://github.com/leptonai/leptonai/blob/main/leptonai/cli/storage.py
99+
Moves job directory into remote storage and deletes the workload after completion.
96100
"""
97101
client = APIClient()
98-
client.storage.create_dir(additional_path=self.lepton_job_dir)
99-
100-
# Lepton comes with a hidden storage deplyment specifically for rsync'ing
101-
# data to storage with a deterministic name. This deployment can be used
102-
# for transferring data to the storage.
103-
storage_deployment = client.deployment.get(LEPTON_STORAGE_DEPLOYMENT)
104-
105-
if not storage_deployment:
106-
raise RuntimeError("Rsync deployment not found on cluster. Ensure your cluster supports rsync.")
107-
port = storage_deployment.spec.container.ports[0].host_port
108-
ip = self._replica_public_ip(LEPTON_STORAGE_DEPLOYMENT, client)
109-
workspace_id = client.get_workspace_id()
110-
111-
password = self._rsync_password(storage_deployment)
102+
cmd = self.copy_directory_data_command(self.job_dir, self.lepton_job_dir)
103+
node_group_id = self._node_group_id(client)
104+
valid_node_ids = self._valid_node_ids(node_group_id, client)
112105

113-
command = (
114-
f"rsync -a {self.job_dir}/ rsync://{workspace_id}@{ip}:{port}/volume{self.lepton_job_dir}"
106+
spec = LeptonDeploymentUserSpec(
107+
container=LeptonContainer(
108+
image="busybox:1.37.0", # Use a very low resource container
109+
command=cmd,
110+
),
111+
mounts=[Mount(path=mount["path"], mount_path=mount["mount_path"]) for mount in self.mounts],
112+
)
113+
spec.resource_requirement = ResourceRequirement(
114+
resource_shape="cpu.small",
115+
affinity=LeptonResourceAffinity(
116+
allowed_dedicated_node_groups=[node_group_id.metadata.id_],
117+
allowed_nodes_in_node_group=valid_node_ids,
118+
),
119+
min_replicas=1,
120+
max_replicas=1,
115121
)
122+
custom_name = f"data-mover-{int(datetime.now().timestamp())}"
116123

117-
process = subprocess.Popen(
118-
command,
119-
stdout=subprocess.PIPE,
120-
stderr=subprocess.STDOUT,
121-
env={"RSYNC_PASSWORD": password},
122-
shell=True,
123-
universal_newlines=True
124+
deployment = LeptonDeployment(
125+
metadata=Metadata(
126+
id=custom_name,
127+
name=custom_name,
128+
visibility=LeptonVisibility("private"),
129+
),
130+
spec=spec,
124131
)
125132

126-
for line in process.stdout:
127-
print(line, end="")
128-
process.wait()
133+
client.deployment.create(deployment)
129134

130135
def _node_group_id(self, client: APIClient) -> DedicatedNodeGroup:
131136
"""

0 commit comments

Comments
 (0)