Skip to content

Commit 7fc5426

Browse files
authored
fix: DGXC streaming (#401)
1 parent d3be9ac commit 7fc5426

File tree

2 files changed

+14
-11
lines changed

2 files changed

+14
-11
lines changed

nemo_run/core/execution/dgxcloud.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
323323
ln -s {self.pvc_job_dir}/ /nemo_run
324324
cd /nemo_run/code
325325
mkdir -p {self.pvc_job_dir}/logs
326-
{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/logs/output-$HOSTNAME.log
326+
{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/log_$HOSTNAME.out {self.pvc_job_dir}/log-allranks_0.out
327327
"""
328328
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
329329
f.write(launch_script)
@@ -394,8 +394,11 @@ def fetch_logs(
394394

395395
files = []
396396
while len(files) < self.nodes:
397-
files = list(glob.glob(f"{self.pvc_job_dir}/logs/output-*.log"))
398-
logger.info(f"Waiting for {self.nodes - len(files)} log files to be created...")
397+
files = list(glob.glob(f"{self.pvc_job_dir}/log_*.out"))
398+
files = [f for f in files if "log-allranks_0" not in f]
399+
logger.info(
400+
f"Waiting for {self.nodes + 1 - len(files)} log files to be created in {self.pvc_job_dir}..."
401+
)
399402
time.sleep(3)
400403

401404
cmd.extend(files)
@@ -410,7 +413,7 @@ def fetch_logs(
410413
for line in iter(proc.stdout.readline, ""):
411414
if (
412415
line
413-
and not line.rstrip("\n").endswith(".log <==")
416+
and not line.rstrip("\n").endswith(".out <==")
414417
and line.rstrip("\n") != ""
415418
):
416419
yield f"{line}"

test/core/execution/test_dgxcloud.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ def test_fetch_logs_streaming(self, mock_sleep, mock_popen, mock_glob):
110110

111111
# Mock log files
112112
mock_glob.return_value = [
113-
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
114-
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log",
113+
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
114+
"/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out",
115115
]
116116

117117
# Mock process that yields log lines
@@ -162,7 +162,7 @@ def test_fetch_logs_non_streaming(self, mock_sleep, mock_popen, mock_glob):
162162

163163
# Mock log files
164164
mock_glob.return_value = [
165-
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
165+
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
166166
]
167167

168168
# Mock process that yields log lines
@@ -232,7 +232,7 @@ def test_fetch_logs_waits_for_running_status(self, mock_glob, mock_sleep):
232232

233233
with patch.object(executor, "status", side_effect=status_values):
234234
# Mock glob to prevent it from blocking
235-
mock_glob.return_value = ["/workspace/nemo_run/logs/output.log"]
235+
mock_glob.return_value = ["/workspace/nemo_run/logs/outputlog_"]
236236

237237
with patch("subprocess.Popen") as mock_popen:
238238
mock_process = MagicMock()
@@ -257,10 +257,10 @@ def test_fetch_logs_waits_for_log_files(self, mock_popen, mock_glob, mock_sleep)
257257
# Mock glob to return incomplete files first, then all files
258258
mock_glob.side_effect = [
259259
[], # No files yet
260-
["/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log"], # 1 of 2
260+
["/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out"], # 1 of 2
261261
[ # All 2 files
262-
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
263-
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log",
262+
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
263+
"/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out",
264264
],
265265
]
266266

0 commit comments

Comments
 (0)