Skip to content

Commit 5e05165

Browse files
committed
RHOAIENG-32532: Add kueue integration and update tests
1 parent 538d345 commit 5e05165

22 files changed

+1129
-1131
lines changed

.github/workflows/e2e_tests.yaml

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ on:
55
pull_request:
66
branches:
77
- main
8-
- 'release-*'
8+
- "release-*"
99
- ray-jobs-feature
1010
paths-ignore:
11-
- 'docs/**'
12-
- '**.adoc'
13-
- '**.md'
14-
- 'LICENSE'
11+
- "docs/**"
12+
- "**.adoc"
13+
- "**.md"
14+
- "LICENSE"
1515

1616
concurrency:
1717
group: ${{ github.head_ref }}-${{ github.workflow }}
@@ -33,9 +33,9 @@ jobs:
3333
- name: Checkout common repo code
3434
uses: actions/checkout@v4
3535
with:
36-
repository: 'project-codeflare/codeflare-common'
37-
ref: 'main'
38-
path: 'common'
36+
repository: "project-codeflare/codeflare-common"
37+
ref: "main"
38+
path: "common"
3939

4040
- name: Checkout CodeFlare operator repository
4141
uses: actions/checkout@v4
@@ -46,7 +46,7 @@ jobs:
4646
- name: Set Go
4747
uses: actions/setup-go@v5
4848
with:
49-
go-version-file: './codeflare-operator/go.mod'
49+
go-version-file: "./codeflare-operator/go.mod"
5050
cache-dependency-path: "./codeflare-operator/go.sum"
5151

5252
- name: Set up gotestfmt
@@ -57,8 +57,8 @@ jobs:
5757
- name: Set up specific Python version
5858
uses: actions/setup-python@v5
5959
with:
60-
python-version: '3.11'
61-
cache: 'pip' # caching pip dependencies
60+
python-version: "3.11"
61+
cache: "pip" # caching pip dependencies
6262

6363
- name: Setup NVidia GPU environment for KinD
6464
uses: ./common/github-actions/nvidia-gpu-setup
@@ -76,7 +76,7 @@ jobs:
7676
run: |
7777
cd codeflare-operator
7878
echo Setting up CodeFlare stack
79-
make setup-e2e
79+
make setup-e2e KUEUE_VERSION=v0.13.4 KUERAY_VERSION=v1.4.0
8080
echo Deploying CodeFlare operator
8181
make deploy -e IMG="${CODEFLARE_OPERATOR_IMG}" -e ENV="e2e"
8282
kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager
@@ -95,6 +95,10 @@ jobs:
9595
kubectl create clusterrolebinding sdk-user-namespace-creator --clusterrole=namespace-creator --user=sdk-user
9696
kubectl create clusterrole raycluster-creator --verb=get,list,create,delete,patch --resource=rayclusters
9797
kubectl create clusterrolebinding sdk-user-raycluster-creator --clusterrole=raycluster-creator --user=sdk-user
98+
kubectl create clusterrole rayjob-creator --verb=get,list,create,delete,patch --resource=rayjobs
99+
kubectl create clusterrolebinding sdk-user-rayjob-creator --clusterrole=rayjob-creator --user=sdk-user
100+
kubectl create clusterrole rayjob-status-reader --verb=get,list,patch,update --resource=rayjobs/status
101+
kubectl create clusterrolebinding sdk-user-rayjob-status-reader --clusterrole=rayjob-status-reader --user=sdk-user
98102
kubectl create clusterrole appwrapper-creator --verb=get,list,create,delete,patch --resource=appwrappers
99103
kubectl create clusterrolebinding sdk-user-appwrapper-creator --clusterrole=appwrapper-creator --user=sdk-user
100104
kubectl create clusterrole resourceflavor-creator --verb=get,list,create,delete --resource=resourceflavors
@@ -122,10 +126,16 @@ jobs:
122126
pip install poetry
123127
poetry install --with test,docs
124128
echo "Running e2e tests..."
125-
poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
129+
poetry run pytest -v -s ./tests/e2e/ -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
126130
env:
127131
GRPC_DNS_RESOLVER: "native"
128132

133+
- name: Run RayJob E2E tests
134+
run: |
135+
export CODEFLARE_TEST_OUTPUT_DIR=${{ env.TEMP_DIR }}
136+
echo "CODEFLARE_TEST_OUTPUT_DIR=${CODEFLARE_TEST_OUTPUT_DIR}" >> $GITHUB_ENV
137+
poetry run pytest -v -s ./tests/e2e/rayjob > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log 2>&1
138+
129139
- name: Switch to kind-cluster context to print logs
130140
if: always() && steps.deploy.outcome == 'success'
131141
run: kubectl config use-context kind-cluster
@@ -135,6 +145,7 @@ jobs:
135145
run: |
136146
echo "Printing Pytest output logs"
137147
cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log
148+
cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log
138149
139150
- name: Print CodeFlare operator logs
140151
if: always() && steps.deploy.outcome == 'success'
@@ -146,7 +157,7 @@ jobs:
146157
if: always() && steps.deploy.outcome == 'success'
147158
run: |
148159
echo "Printing KubeRay operator logs"
149-
kubectl logs -n ray-system --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log
160+
kubectl logs -n default --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log
150161
151162
- name: Export all KinD pod logs
152163
uses: ./common/github-actions/kind-export-logs

src/codeflare_sdk/common/utils/k8s_utils.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,10 @@
77
from ..kubernetes_cluster import config_check, _kube_api_error_handling
88

99

10-
def get_current_namespace():
10+
def get_current_namespace(): # pragma: no cover
1111
"""
1212
Retrieves the current Kubernetes namespace.
1313
14-
This function attempts to detect the current namespace by:
15-
1. First checking if running inside a pod (reading from service account namespace file)
16-
2. Falling back to reading from the current kubeconfig context
17-
1814
Returns:
1915
str:
2016
The current namespace or None if not found.

src/codeflare_sdk/ray/cluster/build_ray_cluster.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
136136
"enableIngress": False,
137137
"rayStartParams": {
138138
"dashboard-host": "0.0.0.0",
139-
"dashboard-port": "8265",
140139
"block": "true",
141140
"num-gpus": str(head_gpu_count),
142141
"resources": head_resources,

src/codeflare_sdk/ray/cluster/cluster.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,10 @@ def apply(self, force=False):
208208
self._throw_for_no_raycluster()
209209
namespace = self.config.namespace
210210
name = self.config.name
211+
212+
# Regenerate resource_yaml to reflect any configuration changes
213+
self.resource_yaml = self.create_resource()
214+
211215
try:
212216
self.config_check()
213217
api_instance = client.CustomObjectsApi(get_api_client())
@@ -387,16 +391,25 @@ def is_dashboard_ready(self) -> bool:
387391
bool:
388392
True if the dashboard is ready, False otherwise.
389393
"""
394+
395+
dashboard_uri = self.cluster_dashboard_uri()
396+
if dashboard_uri is None:
397+
return False
398+
390399
try:
391400
response = requests.get(
392-
self.cluster_dashboard_uri(),
401+
dashboard_uri,
393402
headers=self._client_headers,
394403
timeout=5,
395404
verify=self._client_verify_tls,
396405
)
397406
except requests.exceptions.SSLError: # pragma no cover
398407
# SSL exception occurs when oauth ingress has been created but cluster is not up
399408
return False
409+
except Exception: # pragma no cover
410+
# Any other exception (connection errors, timeouts, etc.)
411+
return False
412+
400413
if response.status_code == 200:
401414
return True
402415
else:
@@ -504,6 +517,8 @@ def cluster_dashboard_uri(self) -> str:
504517
):
505518
protocol = "https" if route["spec"].get("tls") else "http"
506519
return f"{protocol}://{route['spec']['host']}"
520+
# No route found for this cluster
521+
return "Dashboard not available yet, have you run cluster.up()?"
507522
else:
508523
try:
509524
api_instance = client.NetworkingV1Api(get_api_client())
@@ -522,7 +537,8 @@ def cluster_dashboard_uri(self) -> str:
522537
protocol = "http"
523538
elif "route.openshift.io/termination" in annotations:
524539
protocol = "https"
525-
return f"{protocol}://{ingress.spec.rules[0].host}"
540+
return f"{protocol}://{ingress.spec.rules[0].host}"
541+
526542
return "Dashboard not available yet, have you run cluster.up()?"
527543

528544
def list_jobs(self) -> List:
@@ -783,6 +799,7 @@ def remove_autogenerated_fields(resource):
783799
del resource[key]
784800
else:
785801
remove_autogenerated_fields(resource[key])
802+
786803
elif isinstance(resource, list):
787804
for item in resource:
788805
remove_autogenerated_fields(item)

src/codeflare_sdk/ray/cluster/test_config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright 2022-2025 IBM, Red Hat
1+
# Copyright 2024 IBM, Red Hat
22
#
33
# Licensed under the Apache License, Version 2.0 (the "License");
44
# you may not use this file except in compliance with the License.

src/codeflare_sdk/ray/rayjobs/config.py

Lines changed: 3 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,8 @@ class ManagedClusterConfig:
131131
accelerator_configs:
132132
A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names.
133133
Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings.
134-
local_queue:
135-
The name of the queue to use for the cluster.
136134
annotations:
137-
A dictionary of annotations to apply to the cluster.
135+
A dictionary of annotations to apply to the Job.
138136
volumes:
139137
A list of V1Volume objects to add to the Cluster
140138
volume_mounts:
@@ -161,7 +159,6 @@ class ManagedClusterConfig:
161159
accelerator_configs: Dict[str, str] = field(
162160
default_factory=lambda: DEFAULT_ACCELERATORS.copy()
163161
)
164-
local_queue: Optional[str] = None
165162
annotations: Dict[str, str] = field(default_factory=dict)
166163
volumes: list[V1Volume] = field(default_factory=list)
167164
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
@@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
248245
"""
249246
ray_cluster_spec = {
250247
"rayVersion": RAY_VERSION,
251-
"enableInTreeAutoscaling": False,
252248
"headGroupSpec": self._build_head_group_spec(),
253249
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
254250
}
@@ -288,7 +284,6 @@ def _build_head_ray_params(self) -> Dict[str, str]:
288284
"""Build Ray start parameters for head node."""
289285
params = {
290286
"dashboard-host": "0.0.0.0",
291-
"dashboard-port": "8265",
292287
"block": "true",
293288
}
294289

@@ -346,12 +341,9 @@ def _build_head_container(self) -> V1Container:
346341
self.head_accelerators,
347342
),
348343
volume_mounts=self._generate_volume_mounts(),
344+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
349345
)
350346

351-
# Add environment variables if specified
352-
if hasattr(self, "envs") and self.envs:
353-
container.env = self._build_env_vars()
354-
355347
return container
356348

357349
def _build_worker_container(self) -> V1Container:
@@ -373,12 +365,9 @@ def _build_worker_container(self) -> V1Container:
373365
self.worker_accelerators,
374366
),
375367
volume_mounts=self._generate_volume_mounts(),
368+
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
376369
)
377370

378-
# Add environment variables if specified
379-
if hasattr(self, "envs") and self.envs:
380-
container.env = self._build_env_vars()
381-
382371
return container
383372

384373
def _build_resource_requirements(

src/codeflare_sdk/ray/rayjobs/rayjob.py

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import re
2323
import ast
2424
from typing import Dict, Any, Optional, Tuple
25+
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
2526
from codeflare_sdk.common.utils.constants import MOUNT_PATH
2627
from kubernetes import client
2728
from ...common.kubernetes_cluster.auth import get_api_client
@@ -59,9 +60,9 @@ def __init__(
5960
cluster_config: Optional[ManagedClusterConfig] = None,
6061
namespace: Optional[str] = None,
6162
runtime_env: Optional[Dict[str, Any]] = None,
62-
shutdown_after_job_finishes: Optional[bool] = None,
6363
ttl_seconds_after_finished: int = 0,
6464
active_deadline_seconds: Optional[int] = None,
65+
local_queue: Optional[str] = None,
6566
):
6667
"""
6768
Initialize a RayJob instance.
@@ -73,12 +74,11 @@ def __init__(
7374
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
7475
namespace: The Kubernetes namespace (auto-detected if not specified)
7576
runtime_env: Ray runtime environment configuration (optional)
76-
shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
7777
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
7878
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
79+
local_queue: The Kueue LocalQueue to submit the job to (optional)
7980
8081
Note:
81-
shutdown_after_job_finishes is automatically detected but can be overridden:
8282
- True if cluster_config is provided (new cluster will be cleaned up)
8383
- False if cluster_name is provided (existing cluster will not be shut down)
8484
- User can explicitly set this value to override auto-detection
@@ -108,17 +108,7 @@ def __init__(
108108
self.runtime_env = runtime_env
109109
self.ttl_seconds_after_finished = ttl_seconds_after_finished
110110
self.active_deadline_seconds = active_deadline_seconds
111-
112-
# Auto-set shutdown_after_job_finishes based on cluster_config presence
113-
# If cluster_config is provided, we want to clean up the cluster after job finishes
114-
# If using existing cluster, we don't want to shut it down
115-
# User can override this behavior by explicitly setting shutdown_after_job_finishes
116-
if shutdown_after_job_finishes is not None:
117-
self.shutdown_after_job_finishes = shutdown_after_job_finishes
118-
elif cluster_config is not None:
119-
self.shutdown_after_job_finishes = True
120-
else:
121-
self.shutdown_after_job_finishes = False
111+
self.local_queue = local_queue
122112

123113
if namespace is None:
124114
detected_namespace = get_current_namespace()
@@ -177,10 +167,6 @@ def submit(self) -> str:
177167
if scripts:
178168
self._handle_script_volumes_for_existing_cluster(scripts, result)
179169

180-
if self.shutdown_after_job_finishes:
181-
logger.info(
182-
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion"
183-
)
184170
return self.name
185171
else:
186172
raise RuntimeError(f"Failed to submit RayJob {self.name}")
@@ -230,11 +216,32 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
230216
},
231217
"spec": {
232218
"entrypoint": self.entrypoint,
233-
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
234219
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
220+
"shutdownAfterJobFinishes": self._cluster_config is not None,
235221
},
236222
}
237223

224+
labels = {}
225+
# If cluster_config is provided, use the local_queue from the cluster_config
226+
if self._cluster_config is not None:
227+
if self.local_queue:
228+
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
229+
else:
230+
default_queue = get_default_kueue_name(self.namespace)
231+
if default_queue:
232+
labels["kueue.x-k8s.io/queue-name"] = default_queue
233+
else:
234+
# No default queue found, use "default" as fallback
235+
labels["kueue.x-k8s.io/queue-name"] = "default"
236+
logger.warning(
237+
f"No default Kueue LocalQueue found in namespace '{self.namespace}'. "
238+
f"Using 'default' as the queue name. If a LocalQueue named 'default' "
239+
f"does not exist, the RayJob submission will fail. "
240+
f"To fix this, please explicitly specify the 'local_queue' parameter."
241+
)
242+
243+
rayjob_cr["metadata"]["labels"] = labels
244+
238245
# Add active deadline if specified
239246
if self.active_deadline_seconds:
240247
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds

0 commit comments

Comments
 (0)