Skip to content

Commit 093ff4c

Browse files
committed
Address CodeRabbit follow-up issues in backends and k8s smoke scripts
- fix backend executor/fallback validation and run_job_and_wait cleanup - harden declarative/slurm/kubernetes behavior and NCCL warning handling - make single/multi-node smoke scripts CI-safe with automatic cleanup - tighten kubectl manifest-validation fallback rules Signed-off-by: Emanuel Scoullos <escoullos@nvidia.com>
1 parent b766b9a commit 093ff4c

File tree

12 files changed

+137
-72
lines changed

12 files changed

+137
-72
lines changed

nemo_skills/pipeline/backends/factory.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,12 @@ def get_backend(
106106

107107
# Normalize executor name
108108
executor = executor.lower()
109+
primary_config = dict(cluster_config)
110+
primary_config["executor"] = executor
109111

110112
# Try primary backend
111113
try:
112-
backend = BackendFactory._create_backend(executor, cluster_config)
114+
backend = BackendFactory._create_backend(executor, primary_config)
113115

114116
# Health check
115117
if backend.health_check():
@@ -124,9 +126,12 @@ def get_backend(
124126
# Try fallback if configured
125127
fallback_executor = cluster_config.get("fallback_executor")
126128
if fallback and fallback_executor:
129+
fallback_executor = fallback_executor.lower()
127130
LOG.info(f"Attempting fallback to {fallback_executor} backend")
128131
try:
129-
fallback_backend = BackendFactory._create_backend(fallback_executor, cluster_config)
132+
fallback_config = dict(cluster_config)
133+
fallback_config["executor"] = fallback_executor
134+
fallback_backend = BackendFactory._create_backend(fallback_executor, fallback_config)
130135
if fallback_backend.health_check():
131136
LOG.info(f"Successfully initialized fallback {fallback_executor} backend")
132137
return fallback_backend

nemo_skills/pipeline/backends/integration.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,12 @@ def run_job_and_wait(
295295
backend = get_backend(cluster_config)
296296
handle = backend.submit_job(spec)
297297
LOG.info(f"Submitted job {handle.job_id} to {backend.name} backend")
298-
299-
status = backend.wait_for_completion(handle, timeout=timeout)
300-
LOG.info(f"Job {handle.job_id} finished with status: {status.value}")
301-
302-
return status
298+
try:
299+
status = backend.wait_for_completion(handle, timeout=timeout)
300+
LOG.info(f"Job {handle.job_id} finished with status: {status.value}")
301+
return status
302+
finally:
303+
backend.cleanup(handle)
303304

304305

305306
def is_kubernetes_cluster(cluster_config: Dict) -> bool:

nemo_skills/pipeline/backends/kubernetes.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,11 @@ def __init__(self, cluster_config: Dict):
8888
self.config = cluster_config
8989

9090
# Validate config
91-
if cluster_config.get("executor") != "kubernetes":
91+
try:
92+
executor = cluster_config["executor"]
93+
except KeyError as exc:
94+
raise ValueError("KubernetesBackend requires executor='kubernetes' in config") from exc
95+
if executor != "kubernetes":
9296
raise ValueError("KubernetesBackend requires executor='kubernetes' in config")
9397

9498
self.namespace = cluster_config.get("namespace", "default")
@@ -499,7 +503,11 @@ def _inject_rdma_resources(self, containers: list):
499503
continue
500504
except (TypeError, ValueError):
501505
# Keep behavior permissive if a custom quantity format appears.
502-
pass
506+
LOG.warning(
507+
"Unable to parse GPU quantity for container '%s' (value=%r); continuing RDMA resource injection",
508+
container.name,
509+
gpu_count,
510+
)
503511

504512
limits[resource_name] = resource_count
505513
requests[resource_name] = resource_count

nemo_skills/pipeline/backends/local.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,15 @@ class LocalBackend(ComputeBackend):
9494

9595
def __init__(self, cluster_config: Dict):
9696
self.config = cluster_config
97-
self.use_docker = cluster_config.get("executor") == "local"
97+
try:
98+
executor = cluster_config["executor"]
99+
except KeyError as exc:
100+
raise ValueError("LocalBackend requires executor='local' or 'none' in config") from exc
101+
102+
if executor not in {"local", "none"}:
103+
raise ValueError("LocalBackend requires executor='local' or 'none' in config")
104+
105+
self.use_docker = executor == "local"
98106
self._jobs: Dict[str, LocalJob] = {}
99107

100108
@property

nemo_skills/pipeline/backends/slurm.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,15 @@ def submit_job(self, spec: JobSpec) -> JobHandle:
127127

128128
# Submit using existing infrastructure
129129
with get_exp(spec.name, self.config) as exp:
130+
requested_gpus = main_container.resources.gpus if main_container.resources.gpus is not None else None
131+
130132
task = add_task(
131133
exp=exp,
132134
cmd=cmd,
133135
task_name=spec.name,
134136
cluster_config=self.config,
135137
container=container_image,
136-
num_gpus=main_container.resources.gpus or None,
138+
num_gpus=requested_gpus,
137139
num_nodes=1,
138140
heterogeneous=is_heterogeneous,
139141
)

nemo_skills/pipeline/utils/declarative.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ def _sanitize_k8s_name(name: str, max_length: int = 63) -> tuple[str, bool]:
257257
if not name:
258258
name = "job"
259259

260-
was_modified = name != original.lower() or original != original.lower()
260+
was_modified = name != original
261261
return name, was_modified
262262

263263

@@ -537,9 +537,10 @@ def _run_nemo_run(
537537
LOG.info(f"Job '{job_name}' depends on task handle '{dep}' (from reused experiment)")
538538
elif isinstance(dep, dict):
539539
# Dict dependency = internal job reference (by job spec object)
540-
dep_name = dep.get("name")
541-
if not dep_name:
542-
raise ValueError(f"Job dependency must have a 'name' field: {dep}")
540+
try:
541+
dep_name = dep["name"]
542+
except KeyError as exc:
543+
raise ValueError(f"Job dependency must have a 'name' field: {dep}") from exc
543544
if dep_name in job_name_to_handle:
544545
internal_deps.append(job_name_to_handle[dep_name])
545546
LOG.info(
@@ -695,9 +696,10 @@ def _run_kubernetes(self, dry_run: bool = False, log_dir: Optional[str] = None,
695696
LOG.warning(f"External dependency '{dep}' not supported on Kubernetes, skipping")
696697
elif isinstance(dep, dict):
697698
# Dict dependency = internal job reference (same as _run_nemo_run)
698-
dep_name = dep.get("name")
699-
if not dep_name:
700-
raise ValueError(f"Job dependency must have a 'name' field: {dep}")
699+
try:
700+
dep_name = dep["name"]
701+
except KeyError as exc:
702+
raise ValueError(f"Job dependency must have a 'name' field: {dep}") from exc
701703
if dep_name in job_name_to_handle:
702704
dependency_handles.append(job_name_to_handle[dep_name])
703705
LOG.info(f"Job '{original_job_name}' depends on internal job '{dep_name}'")
@@ -744,8 +746,8 @@ def _run_kubernetes(self, dry_run: bool = False, log_dir: Optional[str] = None,
744746
LOG.info(f"Waiting for job '{job_name}' to complete (sequential mode)...")
745747
status = backend.wait_for_completion(handle)
746748
LOG.info(f"Job '{job_name}' completed with status: {status.value}")
747-
if status == JobStatus.FAILED:
748-
raise RuntimeError(f"Job '{job_name}' failed, aborting pipeline")
749+
if status != JobStatus.SUCCEEDED:
750+
raise RuntimeError(f"Job '{job_name}' did not succeed (status={status.value}), aborting pipeline")
749751

750752
if dry_run:
751753
LOG.info("Dry run complete. No jobs were submitted.")
@@ -808,15 +810,12 @@ def _convert_groups_to_job_spec(
808810
# Prepare the command (evaluates lazy commands)
809811
script, exec_config = self._prepare_command(command, self.cluster_config)
810812

811-
# Get the command string
812-
if callable(script.inline):
813-
cmd_result = script.inline()
814-
if isinstance(cmd_result, tuple):
815-
cmd_str, _ = cmd_result
816-
else:
817-
cmd_str = cmd_result
818-
else:
819-
cmd_str = script.inline
813+
# _prepare_command() resolves lazy callables; inline is expected to be a string now.
814+
cmd_str = script.inline
815+
if not isinstance(cmd_str, str):
816+
raise TypeError(
817+
f"Command '{command.name}' must resolve to a string inline command, got {type(cmd_str).__name__}"
818+
)
820819

821820
# Resolve container image
822821
container_image = self._resolve_container(exec_config, command, self.cluster_config)
@@ -843,7 +842,15 @@ def _convert_groups_to_job_spec(
843842
# Get ports from script if available
844843
ports = []
845844
if hasattr(script, "port"):
846-
ports = [script.port]
845+
script_port = script.port
846+
if isinstance(script_port, int) and 1 <= script_port <= 65535:
847+
ports = [script_port]
848+
elif script_port is not None:
849+
LOG.warning(
850+
"Ignoring invalid port value %r on command '%s'; expected int in [1, 65535]",
851+
script_port,
852+
command.name,
853+
)
847854

848855
# Create container spec
849856
container = ContainerSpec(
@@ -926,7 +933,11 @@ def _print_dry_run_job(self, job_name: str, spec: JobSpec):
926933
LOG.info(f" - {container.name}")
927934
LOG.info(f" Image: {container.image}")
928935
LOG.info(f" GPUs: {container.resources.gpus}")
929-
LOG.info(f" Command: {' '.join(container.command[:50])}...")
936+
command_text = " ".join(container.command)
937+
max_chars = 200
938+
if len(command_text) > max_chars:
939+
command_text = f"{command_text[:max_chars]}..."
940+
LOG.info(f" Command: {command_text}")
930941
if spec.dependencies:
931942
LOG.info(f"Dependencies: {spec.dependencies}")
932943
LOG.info(f"Timeout: {spec.timeout_seconds}s")

scripts/k8s-tests/check_nccl_logs.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,10 @@ def parse_nccl_logs(log_text: str) -> NCCLCheckResult:
121121
if rank_match:
122122
result.ranks_seen.add(int(rank_match.group(1)))
123123

124-
# Errors
125-
if "NCCL WARN" in line or "NCCL ERROR" in line:
124+
# NCCL WARN lines are useful signal but not hard failures by themselves.
125+
if "NCCL WARN" in line:
126+
result.warnings.append(line.strip())
127+
if "NCCL ERROR" in line:
126128
result.errors.append(line.strip())
127129

128130
return result
@@ -189,9 +191,15 @@ def validate_result(
189191
messages.append(f"FAIL: World size {result.world_size} != expected {expected_world}")
190192
passed = False
191193

194+
# Check for NCCL warnings
195+
if result.warnings:
196+
messages.append(f"WARN: {len(result.warnings)} NCCL warning(s):")
197+
for warn in result.warnings[:5]:
198+
messages.append(f" - {warn}")
199+
192200
# Check for NCCL errors
193201
if result.errors:
194-
messages.append(f"FAIL: {len(result.errors)} NCCL error(s)/warning(s):")
202+
messages.append(f"FAIL: {len(result.errors)} NCCL error(s):")
195203
for err in result.errors[:5]:
196204
messages.append(f" - {err}")
197205
passed = False

scripts/k8s-tests/pipeline_smoke_test.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -92,13 +92,7 @@
9292
MULTI_NODE_TRAIN_CMD = """
9393
export NCCL_DEBUG=INFO
9494
export NCCL_DEBUG_SUBSYS=INIT,NET
95-
torchrun \\
96-
--nproc_per_node={gpus} \\
97-
--nnodes={nodes} \\
98-
--node_rank=${{NODE_RANK:-0}} \\
99-
--master_addr=${{MASTER_ADDR:-localhost}} \\
100-
--master_port=${{MASTER_PORT:-29500}} \\
101-
-c "
95+
cat > /tmp/multinode_smoke_train.py << 'PYEOF'
10296
import os, torch, torch.distributed as dist
10397
from torch.nn.parallel import DistributedDataParallel as DDP
10498
import torch.nn as nn
@@ -108,7 +102,7 @@
108102
local_rank = int(os.environ.get('LOCAL_RANK', 0))
109103
device = torch.device(f'cuda:{{local_rank}}')
110104
torch.cuda.set_device(device)
111-
print(f'[Rank {{rank}}] Node {{os.environ.get(\"NODE_RANK\",\"?\")}} GPU: {{torch.cuda.get_device_name(device)}}')
105+
print(f'[Rank {{rank}}] Node {{os.environ.get("NODE_RANK","?")}} GPU: {{torch.cuda.get_device_name(device)}}')
112106
113107
model = DDP(nn.Linear(64, 64).to(device), device_ids=[local_rank])
114108
x = torch.randn(16, 64, device=device)
@@ -125,7 +119,14 @@
125119
print(f' World size: {{dist.get_world_size()}}')
126120
print(f' GPU: {{torch.cuda.get_device_name(0)}}')
127121
dist.destroy_process_group()
128-
"
122+
PYEOF
123+
torchrun \\
124+
--nproc_per_node={gpus} \\
125+
--nnodes={nodes} \\
126+
--node_rank=${{NODE_RANK:-0}} \\
127+
--master_addr=${{MASTER_ADDR:-localhost}} \\
128+
--master_port=${{MASTER_PORT:-29500}} \\
129+
/tmp/multinode_smoke_train.py
129130
"""
130131

131132

@@ -243,11 +244,8 @@ def main():
243244

244245
if status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
245246
print(f"\n--- Logs from {name} ---")
246-
try:
247-
for line in backend.get_logs(handle):
248-
print(line, end="" if line.endswith("\n") else "\n")
249-
except Exception as e:
250-
print(f"Failed to get logs: {e}")
247+
for line in backend.get_logs(handle):
248+
print(line, end="" if line.endswith("\n") else "\n")
251249

252250
if status == JobStatus.FAILED:
253251
print(f"\nERROR: Job '{name}' failed")

scripts/k8s-tests/run_sft_k8s_real.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,8 @@ def main():
171171

172172
if status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
173173
print("\n--- Logs ---")
174-
try:
175-
for line in backend.get_logs(handle):
176-
print(line, end="" if line.endswith("\n") else "\n")
177-
except Exception as e:
178-
print(f"Log error: {e}")
174+
for line in backend.get_logs(handle):
175+
print(line, end="" if line.endswith("\n") else "\n")
179176

180177
# Cleanup
181178
backend.cleanup(handle)

scripts/k8s-tests/smoke_test_multi_node.sh

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ SERVICE_NAME="${JOB_NAME}-workers"
2828
MASTER_PORT=29500
2929
TIMEOUT_SECONDS=900 # 15 minutes
3030
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
31+
CLEANUP_DONE=0
32+
33+
cleanup_resources() {
34+
if [[ "$CLEANUP_DONE" -eq 1 ]]; then
35+
return
36+
fi
37+
CLEANUP_DONE=1
38+
39+
echo ""
40+
echo "Cleaning up Kubernetes resources..."
41+
kubectl delete job "$JOB_NAME" -n "$NAMESPACE" --ignore-not-found >/dev/null 2>&1 || true
42+
kubectl delete service "$SERVICE_NAME" -n "$NAMESPACE" --ignore-not-found >/dev/null 2>&1 || true
43+
}
44+
45+
trap cleanup_resources EXIT INT TERM
3146

3247
while [[ $# -gt 0 ]]; do
3348
case $1 in
@@ -267,12 +282,7 @@ if [ -n "$FIRST_POD" ]; then
267282
--expected-gpus-per-node "$NUM_GPUS" || true
268283
fi
269284

270-
# Cleanup
271-
echo ""
272-
read -p "Delete job and service? [y/N] " -n 1 -r
273-
echo
274-
if [[ $REPLY =~ ^[Yy]$ ]]; then
275-
kubectl delete job "$JOB_NAME" -n "$NAMESPACE" --ignore-not-found
276-
kubectl delete service "$SERVICE_NAME" -n "$NAMESPACE" --ignore-not-found
277-
echo "Resources deleted."
285+
if [[ "$JOB_STATUS" != "succeeded" ]]; then
286+
echo "Smoke test failed."
287+
exit 1
278288
fi

0 commit comments

Comments
 (0)