Skip to content

Commit 64469e6

Browse files
authored
Add user scoping for k8s backend and log level support for Ray APIs (#247)
* Add user scoping for k8s backend and log level support for Ray APIs Signed-off-by: Hemil Desai <[email protected]> * Better logs Signed-off-by: Hemil Desai <[email protected]> --------- Signed-off-by: Hemil Desai <[email protected]>
1 parent 65c0430 commit 64469e6

File tree

6 files changed

+223
-74
lines changed

6 files changed

+223
-74
lines changed

nemo_run/core/execution/kuberay.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ class KubeRayExecutor(Executor):
8080
image: str = "" # Will be set in __post_init__ if empty
8181
head_cpu: str = "1"
8282
head_memory: str = "2Gi"
83-
ray_start_params: dict[str, Any] = field(default_factory=dict)
83+
ray_head_start_params: dict[str, Any] = field(default_factory=dict)
84+
ray_worker_start_params: dict[str, Any] = field(default_factory=dict)
8485
worker_groups: list[KubeRayWorkerGroup] = field(default_factory=list)
8586
labels: dict[str, Any] = field(default_factory=dict)
8687
service_type: str = "ClusterIP"
@@ -122,7 +123,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]:
122123
memory_requests=self.head_memory,
123124
cpu_limits=self.head_cpu,
124125
memory_limits=self.head_memory,
125-
ray_start_params=self.ray_start_params,
126+
ray_start_params=self.ray_head_start_params,
126127
head_ports=self.head_ports,
127128
env_vars=self.env_vars,
128129
volumes=self.volumes,
@@ -144,7 +145,7 @@ def get_cluster_body(self, name: str) -> dict[str, Any]:
144145
replicas=worker_group.replicas,
145146
min_replicas=worker_group.min_replicas or worker_group.replicas,
146147
max_replicas=worker_group.max_replicas or worker_group.replicas,
147-
ray_start_params=self.ray_start_params,
148+
ray_start_params=self.ray_worker_start_params,
148149
volume_mounts=worker_group.volume_mounts,
149150
volumes=worker_group.volumes,
150151
labels=worker_group.labels,
@@ -459,13 +460,13 @@ def is_valid_label(name: str) -> bool:
459460

460461
def sync_workdir_via_pod(
461462
*,
462-
name: str,
463+
pod_name: str,
463464
namespace: str,
465+
user_workspace_path: str,
464466
workdir: str,
465467
core_v1_api: CoreV1Api,
466468
volumes: list[dict[str, object]],
467469
volume_mounts: list[dict[str, object]],
468-
workspace_path: str = "/workspace",
469470
image: str = "alpine:3.19",
470471
cleanup: bool = False,
471472
cleanup_timeout: int = 5,
@@ -477,9 +478,6 @@ def sync_workdir_via_pod(
477478
Requires that the *kubectl* binary is available in PATH and can access
478479
the same cluster context as the Kubernetes Python client.
479480
"""
480-
481-
pod_name = f"{name}-dm"
482-
483481
# Pod manifest
484482
pod_body = client.V1Pod(
485483
metadata=client.V1ObjectMeta(name=pod_name, namespace=namespace),
@@ -548,7 +546,7 @@ def sync_workdir_via_pod(
548546
"--",
549547
"mkdir",
550548
"-p",
551-
workspace_path,
549+
user_workspace_path,
552550
]
553551
)
554552

@@ -570,17 +568,17 @@ def sync_workdir_via_pod(
570568
f"kubectl exec -i -n {namespace} {pod_name}",
571569
"--", # Marks end-of-options for rsync – mandatory when the dest starts with "--:"
572570
f"{os.path.abspath(workdir).rstrip(os.sep)}/",
573-
f"--:{workspace_path.rstrip('/')}/",
571+
f"--:{user_workspace_path.rstrip('/')}/",
574572
]
575573
)
576574

577575
# Emit the full command for easier troubleshooting
578576
logger.debug("Running rsync command: %s", " ".join(rsync_cmd))
579577

580578
subprocess.check_call(rsync_cmd)
579+
logger.info(f"Workdir synced to PVC at {user_workspace_path} via data-mover pod.")
581580

582581
if cleanup:
583-
logger.info("Workdir synced to PVC via data-mover pod. Cleaning up…")
584582
core_v1_api.delete_namespaced_pod(
585583
name=pod_name, namespace=namespace, body=client.V1DeleteOptions()
586584
)

nemo_run/core/frontend/console/api.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,18 @@ def __repr__(self):
3333

3434

3535
def configure_logging(level: str):
36-
handlers = [RichHandler(console=CONSOLE)]
36+
handlers = [
37+
RichHandler(
38+
console=CONSOLE,
39+
show_time=True,
40+
show_level=True,
41+
show_path=True,
42+
)
43+
]
3744
if _is_jupyter():
3845
handlers = None
3946
logging.basicConfig(
40-
level=logging.getLevelName(level.upper()),
47+
level=level.upper(),
4148
format="%(message)s",
4249
datefmt="[%X]",
4350
handlers=handlers,

nemo_run/run/ray/cluster.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from nemo_run.core.execution.base import Executor
2020
from nemo_run.core.execution.slurm import SlurmExecutor
21+
from nemo_run.core.frontend.console.api import configure_logging
2122
from nemo_run.run.ray.slurm import SlurmRayCluster
2223

2324
# Import guard for Kubernetes dependencies
@@ -36,8 +37,10 @@
3637
class RayCluster:
3738
name: str
3839
executor: Executor
40+
log_level: str = "INFO"
3941

4042
def __post_init__(self):
43+
configure_logging(level=self.log_level)
4144
backend_map: dict[Type[Executor], Type] = {
4245
SlurmExecutor: SlurmRayCluster,
4346
}

nemo_run/run/ray/job.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from nemo_run.core.execution.base import Executor
2020
from nemo_run.core.execution.slurm import SlurmExecutor
21+
from nemo_run.core.frontend.console.api import configure_logging
2122
from nemo_run.run.ray.slurm import SlurmRayJob
2223

2324
# Import guard for Kubernetes dependencies
@@ -39,8 +40,10 @@ class RayJob:
3940
name: str
4041
executor: Executor
4142
pre_ray_start_commands: Optional[list[str]] = None
43+
log_level: str = "INFO"
4244

4345
def __post_init__(self) -> None: # noqa: D401 – simple implementation
46+
configure_logging(level=self.log_level)
4447
backend_map: dict[Type[Executor], Type[Any]] = {
4548
SlurmExecutor: SlurmRayJob,
4649
}

0 commit comments

Comments
 (0)