Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
53c3711
feat: Stream DGXC logs
ko3n1g Nov 5, 2025
58164a4
revert
ko3n1g Nov 5, 2025
19a7b58
revert
ko3n1g Nov 5, 2025
8c2de94
remove
ko3n1g Nov 5, 2025
5c66e66
fix tests
ko3n1g Nov 5, 2025
2124bcb
remove
ko3n1g Nov 5, 2025
8dc9a1f
fix tests
ko3n1g Nov 5, 2025
b8ac3ff
fix
ko3n1g Nov 5, 2025
6e12793
fix
ko3n1g Nov 5, 2025
2fd985f
fix
ko3n1g Nov 5, 2025
7b1459f
add test for executor
ko3n1g Nov 10, 2025
ee6f42c
add test for scheduler
ko3n1g Nov 10, 2025
080136b
fix
ko3n1g Nov 10, 2025
ec56d2e
remove
ko3n1g Nov 10, 2025
6f1ab3a
fix
ko3n1g Nov 17, 2025
af7d7e5
add date
ko3n1g Nov 17, 2025
9895766
fix
ko3n1g Nov 17, 2025
9eeee10
fix
ko3n1g Nov 17, 2025
6dd7bf6
token
ko3n1g Nov 17, 2025
a6c0471
test
ko3n1g Nov 17, 2025
64155cf
fix
ko3n1g Nov 17, 2025
43b72cc
test
ko3n1g Nov 17, 2025
a48a002
test
ko3n1g Nov 17, 2025
8c98481
fix
ko3n1g Nov 17, 2025
ba6378d
newline
ko3n1g Nov 17, 2025
8352992
fix
ko3n1g Nov 17, 2025
39f335a
revert
ko3n1g Nov 17, 2025
4895bfe
revert
ko3n1g Nov 17, 2025
97a69dc
fix
ko3n1g Nov 17, 2025
f79d4c4
test
ko3n1g Nov 19, 2025
a2f0751
fix
ko3n1g Nov 19, 2025
8bf2e79
verify=False
ko3n1g Nov 19, 2025
464e301
fix
ko3n1g Nov 19, 2025
77fe9d2
decode_unicode=True
ko3n1g Nov 19, 2025
c8c158a
headers
ko3n1g Nov 19, 2025
d7d7070
fix
ko3n1g Nov 19, 2025
0559d4a
newline
ko3n1g Nov 19, 2025
0aedd40
fix
ko3n1g Nov 19, 2025
e1fe89f
fix
ko3n1g Nov 19, 2025
ea98c2b
more tests
ko3n1g Nov 19, 2025
0c6b7c9
disable flake8
ko3n1g Nov 19, 2025
7874fa7
remove unused import
ko3n1g Nov 19, 2025
18b51c2
test stream_url_async
ko3n1g Nov 19, 2025
6b7b308
cleanup
ko3n1g Nov 19, 2025
c5c4c99
use native typing
ko3n1g Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"python.testing.pytestArgs": [
"test"
],
"flake8.enabled": false,
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
97 changes: 90 additions & 7 deletions nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import json
import logging
import os
import queue
import subprocess
import tempfile
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Optional, Type
from typing import Any, Iterable, Optional

import requests
from invoke.context import Context
Expand Down Expand Up @@ -65,6 +67,7 @@ class DGXCloudExecutor(Executor):
"""

base_url: str
kube_apiserver_url: str
app_id: str
app_secret: str
project_name: str
Expand Down Expand Up @@ -359,6 +362,92 @@ def status(self, job_id: str) -> Optional[DGXCloudState]:
r_json = response.json()
return DGXCloudState(r_json["phase"])

def _stream_url_sync(self, url: str, headers: dict, q: queue.Queue):
"""Stream a single URL using requests and put chunks into the queue"""
try:
with requests.get(url, stream=True, headers=headers, verify=False) as response:
for line in response.iter_lines(decode_unicode=True):
q.put((url, f"{line}\n"))
except Exception as e:
logger.error(f"Error streaming URL {url}: {e}")

finally:
q.put((url, None))

def fetch_logs(
self,
job_id: str,
stream: bool,
stderr: Optional[bool] = None,
stdout: Optional[bool] = None,
) -> Iterable[str]:
token = self.get_auth_token()
if not token:
logger.error("Failed to retrieve auth token for fetch logs request.")
yield ""

response = requests.get(
f"{self.base_url}/workloads", headers=self._default_headers(token=token)
)
workload_name = next(
(
workload["name"]
for workload in response.json()["workloads"]
if workload["id"] == job_id
),
None,
)
if workload_name is None:
logger.error(f"No workload found with id {job_id}")
yield ""

urls = [
f"{self.kube_apiserver_url}/api/v1/namespaces/runai-{self.project_name}/pods/{workload_name}-worker-{i}/log?container=pytorch"
for i in range(self.nodes)
]

if stream:
urls = [url + "&follow=true" for url in urls]

while self.status(job_id) != DGXCloudState.RUNNING:
logger.info("Waiting for job to start...")
time.sleep(15)

time.sleep(10)

q = queue.Queue()
active_urls = set(urls)

# Start threads
threads = [
threading.Thread(
target=self._stream_url_sync, args=(url, self._default_headers(token=token), q)
)
for url in urls
]
for t in threads:
t.start()

# Yield chunks as they arrive
while active_urls:
url, item = q.get()
if item is None or self.status(job_id) in [
DGXCloudState.DELETING,
DGXCloudState.STOPPED,
DGXCloudState.STOPPING,
DGXCloudState.DEGRADED,
DGXCloudState.FAILED,
DGXCloudState.COMPLETED,
DGXCloudState.TERMINATING,
]:
active_urls.discard(url)
else:
yield item

# Wait for threads
for t in threads:
t.join()

def cancel(self, job_id: str):
# Retrieve the authentication token for the REST calls
token = self.get_auth_token()
Expand All @@ -385,12 +474,6 @@ def cancel(self, job_id: str):
response.text,
)

@classmethod
def logs(cls: Type["DGXCloudExecutor"], app_id: str, fallback_path: Optional[str]):
logger.warning(
"Logs not available for DGXCloudExecutor based jobs. Please visit the cluster UI to view the logs."
)

def cleanup(self, handle: str): ...

def assign(
Expand Down
6 changes: 3 additions & 3 deletions nemo_run/run/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
from nemo_run.core.execution.base import LogSupportedExecutor
from nemo_run.core.frontend.console.api import CONSOLE
from nemo_run.run.torchx_backend.runner import Runner, get_runner
from nemo_run.run.torchx_backend.schedulers.api import (
REVERSE_EXECUTOR_MAPPING,
)
from nemo_run.run.torchx_backend.schedulers.api import REVERSE_EXECUTOR_MAPPING

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -60,6 +58,8 @@ def print_log_lines(
role_name,
replica_id,
regex,
None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

torchx has 8 args, so prior to this change we were routing should_tail and streams to since and until.

Since my codepaths use streams I ran into this issue

None,
should_tail=should_tail,
streams=streams,
):
Expand Down
44 changes: 35 additions & 9 deletions nemo_run/run/torchx_backend/schedulers/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import shutil
import tempfile
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from typing import Any, Iterable, Optional

import fiddle as fdl
import fiddle._src.experimental.dataclasses as fdl_dc
Expand All @@ -29,15 +30,10 @@
DescribeAppResponse,
ListAppResponse,
Scheduler,
Stream,
split_lines,
)
from torchx.specs import (
AppDef,
AppState,
ReplicaStatus,
Role,
RoleStatus,
runopts,
)
from torchx.specs import AppDef, AppState, ReplicaStatus, Role, RoleStatus, runopts

from nemo_run.config import get_nemorun_home
from nemo_run.core.execution.base import Executor
Expand Down Expand Up @@ -189,6 +185,36 @@ def describe(self, app_id: str) -> Optional[DescribeAppResponse]:
ui_url=f"{executor.base_url}/workloads/distributed/{job_id}",
)

def log_iter(
self,
app_id: str,
role_name: str,
k: int = 0,
regex: Optional[str] = None,
since: Optional[datetime] = None,
until: Optional[datetime] = None,
should_tail: bool = False,
streams: Optional[Stream] = None,
) -> Iterable[str]:
stored_data = _get_job_dirs()
job_info = stored_data.get(app_id)
_, _, job_id = app_id.split("___")
executor: Optional[DGXCloudExecutor] = job_info.get("executor", None) # type: ignore
if not executor:
return [""]

logs = executor.fetch_logs(
job_id=job_id,
stream=should_tail,
) # type: ignore
if isinstance(logs, str):
if len(logs) == 0:
logs = []
else:
logs = split_lines(logs)

return logs

def _cancel_existing(self, app_id: str) -> None:
"""
Cancels the job by calling the DGXExecutor's cancel method.
Expand Down
Loading
Loading