Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ on:
pull_request:
branches:
- main
- 'release-*'
- "release-*"
- ray-jobs-feature
paths-ignore:
- 'docs/**'
- '**.adoc'
- '**.md'
- 'LICENSE'
- "docs/**"
- "**.adoc"
- "**.md"
- "LICENSE"

concurrency:
group: ${{ github.head_ref }}-${{ github.workflow }}
Expand All @@ -33,9 +33,9 @@ jobs:
- name: Checkout common repo code
uses: actions/checkout@v4
with:
repository: 'project-codeflare/codeflare-common'
ref: 'main'
path: 'common'
repository: "project-codeflare/codeflare-common"
ref: "main"
path: "common"

- name: Checkout CodeFlare operator repository
uses: actions/checkout@v4
Expand All @@ -46,7 +46,7 @@ jobs:
- name: Set Go
uses: actions/setup-go@v5
with:
go-version-file: './codeflare-operator/go.mod'
go-version-file: "./codeflare-operator/go.mod"
cache-dependency-path: "./codeflare-operator/go.sum"

- name: Set up gotestfmt
Expand All @@ -57,8 +57,8 @@ jobs:
- name: Set up specific Python version
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip' # caching pip dependencies
python-version: "3.11"
cache: "pip" # caching pip dependencies

- name: Setup NVidia GPU environment for KinD
uses: ./common/github-actions/nvidia-gpu-setup
Expand All @@ -76,7 +76,7 @@ jobs:
run: |
cd codeflare-operator
echo Setting up CodeFlare stack
make setup-e2e
make setup-e2e KUEUE_VERSION=v0.13.4 KUERAY_VERSION=v1.4.0
echo Deploying CodeFlare operator
make deploy -e IMG="${CODEFLARE_OPERATOR_IMG}" -e ENV="e2e"
kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager
Expand Down Expand Up @@ -122,10 +122,16 @@ jobs:
pip install poetry
poetry install --with test,docs
echo "Running e2e tests..."
poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
poetry run pytest -v -s ./tests/e2e/ -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
env:
GRPC_DNS_RESOLVER: "native"

- name: Run RayJob E2E tests
run: |
export CODEFLARE_TEST_OUTPUT_DIR=${{ env.TEMP_DIR }}
echo "CODEFLARE_TEST_OUTPUT_DIR=${CODEFLARE_TEST_OUTPUT_DIR}" >> $GITHUB_ENV
poetry run pytest -v -s ./tests/e2e/rayjob > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log 2>&1

- name: Switch to kind-cluster context to print logs
if: always() && steps.deploy.outcome == 'success'
run: kubectl config use-context kind-cluster
Expand All @@ -135,6 +141,7 @@ jobs:
run: |
echo "Printing Pytest output logs"
cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log
cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log

- name: Print CodeFlare operator logs
if: always() && steps.deploy.outcome == 'success'
Expand All @@ -146,7 +153,7 @@ jobs:
if: always() && steps.deploy.outcome == 'success'
run: |
echo "Printing KubeRay operator logs"
kubectl logs -n ray-system --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log
kubectl logs -n default --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log

- name: Export all KinD pod logs
uses: ./common/github-actions/kind-export-logs
Expand Down
6 changes: 1 addition & 5 deletions src/codeflare_sdk/common/utils/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,10 @@
from ..kubernetes_cluster import config_check, _kube_api_error_handling


def get_current_namespace():
def get_current_namespace(): # pragma: no cover
"""
Retrieves the current Kubernetes namespace.
This function attempts to detect the current namespace by:
1. First checking if running inside a pod (reading from service account namespace file)
2. Falling back to reading from the current kubeconfig context
Returns:
str:
The current namespace or None if not found.
Expand Down
1 change: 0 additions & 1 deletion src/codeflare_sdk/ray/cluster/build_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"):
"enableIngress": False,
"rayStartParams": {
"dashboard-host": "0.0.0.0",
"dashboard-port": "8265",
"block": "true",
"num-gpus": str(head_gpu_count),
"resources": head_resources,
Expand Down
21 changes: 19 additions & 2 deletions src/codeflare_sdk/ray/cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ def apply(self, force=False):
self._throw_for_no_raycluster()
namespace = self.config.namespace
name = self.config.name

# Regenerate resource_yaml to reflect any configuration changes
self.resource_yaml = self.create_resource()

try:
self.config_check()
api_instance = client.CustomObjectsApi(get_api_client())
Expand Down Expand Up @@ -387,16 +391,25 @@ def is_dashboard_ready(self) -> bool:
bool:
True if the dashboard is ready, False otherwise.
"""

dashboard_uri = self.cluster_dashboard_uri()
if dashboard_uri is None:
return False

try:
response = requests.get(
self.cluster_dashboard_uri(),
dashboard_uri,
headers=self._client_headers,
timeout=5,
verify=self._client_verify_tls,
)
except requests.exceptions.SSLError: # pragma no cover
# SSL exception occurs when oauth ingress has been created but cluster is not up
return False
except Exception: # pragma no cover
# Any other exception (connection errors, timeouts, etc.)
return False

if response.status_code == 200:
return True
else:
Expand Down Expand Up @@ -504,6 +517,8 @@ def cluster_dashboard_uri(self) -> str:
):
protocol = "https" if route["spec"].get("tls") else "http"
return f"{protocol}://{route['spec']['host']}"
# No route found for this cluster
return "Dashboard not available yet, have you run cluster.up()?"
else:
try:
api_instance = client.NetworkingV1Api(get_api_client())
Expand All @@ -522,7 +537,8 @@ def cluster_dashboard_uri(self) -> str:
protocol = "http"
elif "route.openshift.io/termination" in annotations:
protocol = "https"
return f"{protocol}://{ingress.spec.rules[0].host}"
return f"{protocol}://{ingress.spec.rules[0].host}"

return "Dashboard not available yet, have you run cluster.up()?"

def list_jobs(self) -> List:
Expand Down Expand Up @@ -783,6 +799,7 @@ def remove_autogenerated_fields(resource):
del resource[key]
else:
remove_autogenerated_fields(resource[key])

elif isinstance(resource, list):
for item in resource:
remove_autogenerated_fields(item)
Expand Down
2 changes: 1 addition & 1 deletion src/codeflare_sdk/ray/cluster/test_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022-2025 IBM, Red Hat
# Copyright 2024 IBM, Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
17 changes: 3 additions & 14 deletions src/codeflare_sdk/ray/rayjobs/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,8 @@ class ManagedClusterConfig:
accelerator_configs:
A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names.
Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings.
local_queue:
The name of the queue to use for the cluster.
annotations:
A dictionary of annotations to apply to the cluster.
A dictionary of annotations to apply to the Job.
volumes:
A list of V1Volume objects to add to the Cluster
volume_mounts:
Expand All @@ -161,7 +159,6 @@ class ManagedClusterConfig:
accelerator_configs: Dict[str, str] = field(
default_factory=lambda: DEFAULT_ACCELERATORS.copy()
)
local_queue: Optional[str] = None
annotations: Dict[str, str] = field(default_factory=dict)
volumes: list[V1Volume] = field(default_factory=list)
volume_mounts: list[V1VolumeMount] = field(default_factory=list)
Expand Down Expand Up @@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]:
"""
ray_cluster_spec = {
"rayVersion": RAY_VERSION,
"enableInTreeAutoscaling": False,
"headGroupSpec": self._build_head_group_spec(),
"workerGroupSpecs": [self._build_worker_group_spec(cluster_name)],
}
Expand Down Expand Up @@ -288,7 +284,6 @@ def _build_head_ray_params(self) -> Dict[str, str]:
"""Build Ray start parameters for head node."""
params = {
"dashboard-host": "0.0.0.0",
"dashboard-port": "8265",
"block": "true",
}

Expand Down Expand Up @@ -346,12 +341,9 @@ def _build_head_container(self) -> V1Container:
self.head_accelerators,
),
volume_mounts=self._generate_volume_mounts(),
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
)

# Add environment variables if specified
if hasattr(self, "envs") and self.envs:
container.env = self._build_env_vars()

return container

def _build_worker_container(self) -> V1Container:
Expand All @@ -373,12 +365,9 @@ def _build_worker_container(self) -> V1Container:
self.worker_accelerators,
),
volume_mounts=self._generate_volume_mounts(),
env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None,
)

# Add environment variables if specified
if hasattr(self, "envs") and self.envs:
container.env = self._build_env_vars()

return container

def _build_resource_requirements(
Expand Down
45 changes: 26 additions & 19 deletions src/codeflare_sdk/ray/rayjobs/rayjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import re
import ast
from typing import Dict, Any, Optional, Tuple
from codeflare_sdk.common.kueue.kueue import get_default_kueue_name
from codeflare_sdk.common.utils.constants import MOUNT_PATH
from kubernetes import client
from ...common.kubernetes_cluster.auth import get_api_client
Expand Down Expand Up @@ -59,9 +60,9 @@ def __init__(
cluster_config: Optional[ManagedClusterConfig] = None,
namespace: Optional[str] = None,
runtime_env: Optional[Dict[str, Any]] = None,
shutdown_after_job_finishes: Optional[bool] = None,
ttl_seconds_after_finished: int = 0,
active_deadline_seconds: Optional[int] = None,
local_queue: Optional[str] = None,
):
"""
Initialize a RayJob instance.
Expand All @@ -73,12 +74,11 @@ def __init__(
cluster_config: Configuration for creating a new cluster (optional if cluster_name provided)
namespace: The Kubernetes namespace (auto-detected if not specified)
runtime_env: Ray runtime environment configuration (optional)
shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional)
ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0)
active_deadline_seconds: Maximum time the job can run before being terminated (optional)
local_queue: The Kueue LocalQueue to submit the job to (optional)

Note:
shutdown_after_job_finishes is automatically detected but can be overridden:
- True if cluster_config is provided (new cluster will be cleaned up)
- False if cluster_name is provided (existing cluster will not be shut down)
- User can explicitly set this value to override auto-detection
Expand Down Expand Up @@ -108,17 +108,7 @@ def __init__(
self.runtime_env = runtime_env
self.ttl_seconds_after_finished = ttl_seconds_after_finished
self.active_deadline_seconds = active_deadline_seconds

# Auto-set shutdown_after_job_finishes based on cluster_config presence
# If cluster_config is provided, we want to clean up the cluster after job finishes
# If using existing cluster, we don't want to shut it down
# User can override this behavior by explicitly setting shutdown_after_job_finishes
if shutdown_after_job_finishes is not None:
self.shutdown_after_job_finishes = shutdown_after_job_finishes
elif cluster_config is not None:
self.shutdown_after_job_finishes = True
else:
self.shutdown_after_job_finishes = False
self.local_queue = local_queue

if namespace is None:
detected_namespace = get_current_namespace()
Expand Down Expand Up @@ -177,10 +167,6 @@ def submit(self) -> str:
if scripts:
self._handle_script_volumes_for_existing_cluster(scripts, result)

if self.shutdown_after_job_finishes:
logger.info(
f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion"
)
return self.name
else:
raise RuntimeError(f"Failed to submit RayJob {self.name}")
Expand Down Expand Up @@ -230,11 +216,32 @@ def _build_rayjob_cr(self) -> Dict[str, Any]:
},
"spec": {
"entrypoint": self.entrypoint,
"shutdownAfterJobFinishes": self.shutdown_after_job_finishes,
"ttlSecondsAfterFinished": self.ttl_seconds_after_finished,
"shutdownAfterJobFinishes": self._cluster_config is not None,
},
}

labels = {}
# If cluster_config is provided, use the local_queue from the cluster_config
if self._cluster_config is not None:
if self.local_queue:
labels["kueue.x-k8s.io/queue-name"] = self.local_queue
else:
default_queue = get_default_kueue_name(self.namespace)
if default_queue:
labels["kueue.x-k8s.io/queue-name"] = default_queue
else:
# No default queue found, use "default" as fallback
labels["kueue.x-k8s.io/queue-name"] = "default"
logger.warning(
f"No default Kueue LocalQueue found in namespace '{self.namespace}'. "
f"Using 'default' as the queue name. If a LocalQueue named 'default' "
f"does not exist, the RayJob submission will fail. "
f"To fix this, please explicitly specify the 'local_queue' parameter."
)

rayjob_cr["metadata"]["labels"] = labels

# Add active deadline if specified
if self.active_deadline_seconds:
rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds
Expand Down
Loading
Loading