diff --git a/.github/workflows/e2e_tests.yaml b/.github/workflows/e2e_tests.yaml index d66e4b34..e3bbf3ae 100644 --- a/.github/workflows/e2e_tests.yaml +++ b/.github/workflows/e2e_tests.yaml @@ -5,13 +5,13 @@ on: pull_request: branches: - main - - 'release-*' + - "release-*" - ray-jobs-feature paths-ignore: - - 'docs/**' - - '**.adoc' - - '**.md' - - 'LICENSE' + - "docs/**" + - "**.adoc" + - "**.md" + - "LICENSE" concurrency: group: ${{ github.head_ref }}-${{ github.workflow }} @@ -33,9 +33,9 @@ jobs: - name: Checkout common repo code uses: actions/checkout@v4 with: - repository: 'project-codeflare/codeflare-common' - ref: 'main' - path: 'common' + repository: "project-codeflare/codeflare-common" + ref: "main" + path: "common" - name: Checkout CodeFlare operator repository uses: actions/checkout@v4 @@ -46,7 +46,7 @@ jobs: - name: Set Go uses: actions/setup-go@v5 with: - go-version-file: './codeflare-operator/go.mod' + go-version-file: "./codeflare-operator/go.mod" cache-dependency-path: "./codeflare-operator/go.sum" - name: Set up gotestfmt @@ -57,8 +57,8 @@ jobs: - name: Set up specific Python version uses: actions/setup-python@v5 with: - python-version: '3.11' - cache: 'pip' # caching pip dependencies + python-version: "3.11" + cache: "pip" # caching pip dependencies - name: Setup NVidia GPU environment for KinD uses: ./common/github-actions/nvidia-gpu-setup @@ -76,7 +76,7 @@ jobs: run: | cd codeflare-operator echo Setting up CodeFlare stack - make setup-e2e + make setup-e2e KUEUE_VERSION=v0.13.4 KUERAY_VERSION=v1.4.0 echo Deploying CodeFlare operator make deploy -e IMG="${CODEFLARE_OPERATOR_IMG}" -e ENV="e2e" kubectl wait --timeout=120s --for=condition=Available=true deployment -n openshift-operators codeflare-operator-manager @@ -122,10 +122,16 @@ jobs: pip install poetry poetry install --with test,docs echo "Running e2e tests..." - poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 + poetry run pytest -v -s ./tests/e2e/ -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1 env: GRPC_DNS_RESOLVER: "native" + - name: Run RayJob E2E tests + run: | + export CODEFLARE_TEST_OUTPUT_DIR=${{ env.TEMP_DIR }} + echo "CODEFLARE_TEST_OUTPUT_DIR=${CODEFLARE_TEST_OUTPUT_DIR}" >> $GITHUB_ENV + poetry run pytest -v -s ./tests/e2e/rayjob > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log 2>&1 + - name: Switch to kind-cluster context to print logs if: always() && steps.deploy.outcome == 'success' run: kubectl config use-context kind-cluster @@ -135,6 +141,7 @@ jobs: run: | echo "Printing Pytest output logs" cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log + cat ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output_rayjob.log - name: Print CodeFlare operator logs if: always() && steps.deploy.outcome == 'success' @@ -146,7 +153,7 @@ jobs: if: always() && steps.deploy.outcome == 'success' run: | echo "Printing KubeRay operator logs" - kubectl logs -n ray-system --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log + kubectl logs -n default --tail -1 -l app.kubernetes.io/name=kuberay | tee ${CODEFLARE_TEST_OUTPUT_DIR}/kuberay.log - name: Export all KinD pod logs uses: ./common/github-actions/kind-export-logs diff --git a/src/codeflare_sdk/common/utils/k8s_utils.py b/src/codeflare_sdk/common/utils/k8s_utils.py index 57eccf2d..e2e03a5d 100644 --- a/src/codeflare_sdk/common/utils/k8s_utils.py +++ b/src/codeflare_sdk/common/utils/k8s_utils.py @@ -7,14 +7,10 @@ from ..kubernetes_cluster import config_check, _kube_api_error_handling -def get_current_namespace(): +def get_current_namespace(): # pragma: no cover """ Retrieves the current Kubernetes namespace. - This function attempts to detect the current namespace by: - 1. First checking if running inside a pod (reading from service account namespace file) - 2. Falling back to reading from the current kubeconfig context - Returns: str: The current namespace or None if not found. diff --git a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py index 9d7ed10a..b2b147c4 100644 --- a/src/codeflare_sdk/ray/cluster/build_ray_cluster.py +++ b/src/codeflare_sdk/ray/cluster/build_ray_cluster.py @@ -136,7 +136,6 @@ def build_ray_cluster(cluster: "codeflare_sdk.ray.cluster.Cluster"): "enableIngress": False, "rayStartParams": { "dashboard-host": "0.0.0.0", - "dashboard-port": "8265", "block": "true", "num-gpus": str(head_gpu_count), "resources": head_resources, diff --git a/src/codeflare_sdk/ray/cluster/cluster.py b/src/codeflare_sdk/ray/cluster/cluster.py index 5c378efd..ce078716 100644 --- a/src/codeflare_sdk/ray/cluster/cluster.py +++ b/src/codeflare_sdk/ray/cluster/cluster.py @@ -208,6 +208,10 @@ def apply(self, force=False): self._throw_for_no_raycluster() namespace = self.config.namespace name = self.config.name + + # Regenerate resource_yaml to reflect any configuration changes + self.resource_yaml = self.create_resource() + try: self.config_check() api_instance = client.CustomObjectsApi(get_api_client()) @@ -387,9 +391,14 @@ def is_dashboard_ready(self) -> bool: bool: True if the dashboard is ready, False otherwise. """ + + dashboard_uri = self.cluster_dashboard_uri() + if dashboard_uri is None: + return False + try: response = requests.get( - self.cluster_dashboard_uri(), + dashboard_uri, headers=self._client_headers, timeout=5, verify=self._client_verify_tls, @@ -397,6 +406,10 @@ def is_dashboard_ready(self) -> bool: except requests.exceptions.SSLError: # pragma no cover # SSL exception occurs when oauth ingress has been created but cluster is not up return False + except Exception: # pragma no cover + # Any other exception (connection errors, timeouts, etc.) + return False + if response.status_code == 200: return True else: @@ -504,6 +517,8 @@ def cluster_dashboard_uri(self) -> str: ): protocol = "https" if route["spec"].get("tls") else "http" return f"{protocol}://{route['spec']['host']}" + # No route found for this cluster + return "Dashboard not available yet, have you run cluster.up()?" else: try: api_instance = client.NetworkingV1Api(get_api_client()) @@ -522,7 +537,8 @@ def cluster_dashboard_uri(self) -> str: protocol = "http" elif "route.openshift.io/termination" in annotations: protocol = "https" - return f"{protocol}://{ingress.spec.rules[0].host}" + return f"{protocol}://{ingress.spec.rules[0].host}" + return "Dashboard not available yet, have you run cluster.up()?" def list_jobs(self) -> List: @@ -783,6 +799,7 @@ def remove_autogenerated_fields(resource): del resource[key] else: remove_autogenerated_fields(resource[key]) + elif isinstance(resource, list): for item in resource: remove_autogenerated_fields(item) diff --git a/src/codeflare_sdk/ray/cluster/test_config.py b/src/codeflare_sdk/ray/cluster/test_config.py index 9f880df7..e405bc5b 100644 --- a/src/codeflare_sdk/ray/cluster/test_config.py +++ b/src/codeflare_sdk/ray/cluster/test_config.py @@ -1,4 +1,4 @@ -# Copyright 2022-2025 IBM, Red Hat +# Copyright 2024 IBM, Red Hat # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/src/codeflare_sdk/ray/rayjobs/config.py b/src/codeflare_sdk/ray/rayjobs/config.py index a46e9a62..916189dd 100644 --- a/src/codeflare_sdk/ray/rayjobs/config.py +++ b/src/codeflare_sdk/ray/rayjobs/config.py @@ -131,10 +131,8 @@ class ManagedClusterConfig: accelerator_configs: A dictionary of custom resource mappings to map extended resource requests to RayCluster resource names. Defaults to DEFAULT_ACCELERATORS but can be overridden with custom mappings. - local_queue: - The name of the queue to use for the cluster. annotations: - A dictionary of annotations to apply to the cluster. + A dictionary of annotations to apply to the Job. volumes: A list of V1Volume objects to add to the Cluster volume_mounts: @@ -161,7 +159,6 @@ class ManagedClusterConfig: accelerator_configs: Dict[str, str] = field( default_factory=lambda: DEFAULT_ACCELERATORS.copy() ) - local_queue: Optional[str] = None annotations: Dict[str, str] = field(default_factory=dict) volumes: list[V1Volume] = field(default_factory=list) volume_mounts: list[V1VolumeMount] = field(default_factory=list) @@ -248,7 +245,6 @@ def build_ray_cluster_spec(self, cluster_name: str) -> Dict[str, Any]: """ ray_cluster_spec = { "rayVersion": RAY_VERSION, - "enableInTreeAutoscaling": False, "headGroupSpec": self._build_head_group_spec(), "workerGroupSpecs": [self._build_worker_group_spec(cluster_name)], } @@ -288,7 +284,6 @@ def _build_head_ray_params(self) -> Dict[str, str]: """Build Ray start parameters for head node.""" params = { "dashboard-host": "0.0.0.0", - "dashboard-port": "8265", "block": "true", } @@ -346,12 +341,9 @@ def _build_head_container(self) -> V1Container: self.head_accelerators, ), volume_mounts=self._generate_volume_mounts(), + env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None, ) - # Add environment variables if specified - if hasattr(self, "envs") and self.envs: - container.env = self._build_env_vars() - return container def _build_worker_container(self) -> V1Container: @@ -373,12 +365,9 @@ def _build_worker_container(self) -> V1Container: self.worker_accelerators, ), volume_mounts=self._generate_volume_mounts(), + env=self._build_env_vars() if hasattr(self, "envs") and self.envs else None, ) - # Add environment variables if specified - if hasattr(self, "envs") and self.envs: - container.env = self._build_env_vars() - return container def _build_resource_requirements( diff --git a/src/codeflare_sdk/ray/rayjobs/rayjob.py b/src/codeflare_sdk/ray/rayjobs/rayjob.py index 49ccafcb..fd1c3a60 100644 --- a/src/codeflare_sdk/ray/rayjobs/rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/rayjob.py @@ -22,6 +22,7 @@ import re import ast from typing import Dict, Any, Optional, Tuple +from codeflare_sdk.common.kueue.kueue import get_default_kueue_name from codeflare_sdk.common.utils.constants import MOUNT_PATH from kubernetes import client from ...common.kubernetes_cluster.auth import get_api_client @@ -59,9 +60,9 @@ def __init__( cluster_config: Optional[ManagedClusterConfig] = None, namespace: Optional[str] = None, runtime_env: Optional[Dict[str, Any]] = None, - shutdown_after_job_finishes: Optional[bool] = None, ttl_seconds_after_finished: int = 0, active_deadline_seconds: Optional[int] = None, + local_queue: Optional[str] = None, ): """ Initialize a RayJob instance. @@ -73,12 +74,11 @@ def __init__( cluster_config: Configuration for creating a new cluster (optional if cluster_name provided) namespace: The Kubernetes namespace (auto-detected if not specified) runtime_env: Ray runtime environment configuration (optional) - shutdown_after_job_finishes: Whether to shut down cluster after job finishes (optional) ttl_seconds_after_finished: Seconds to wait before cleanup after job finishes (default: 0) active_deadline_seconds: Maximum time the job can run before being terminated (optional) + local_queue: The Kueue LocalQueue to submit the job to (optional) Note: - shutdown_after_job_finishes is automatically detected but can be overridden: - True if cluster_config is provided (new cluster will be cleaned up) - False if cluster_name is provided (existing cluster will not be shut down) - User can explicitly set this value to override auto-detection @@ -108,17 +108,7 @@ def __init__( self.runtime_env = runtime_env self.ttl_seconds_after_finished = ttl_seconds_after_finished self.active_deadline_seconds = active_deadline_seconds - - # Auto-set shutdown_after_job_finishes based on cluster_config presence - # If cluster_config is provided, we want to clean up the cluster after job finishes - # If using existing cluster, we don't want to shut it down - # User can override this behavior by explicitly setting shutdown_after_job_finishes - if shutdown_after_job_finishes is not None: - self.shutdown_after_job_finishes = shutdown_after_job_finishes - elif cluster_config is not None: - self.shutdown_after_job_finishes = True - else: - self.shutdown_after_job_finishes = False + self.local_queue = local_queue if namespace is None: detected_namespace = get_current_namespace() @@ -177,10 +167,6 @@ def submit(self) -> str: if scripts: self._handle_script_volumes_for_existing_cluster(scripts, result) - if self.shutdown_after_job_finishes: - logger.info( - f"Cluster will be automatically cleaned up {self.ttl_seconds_after_finished}s after job completion" - ) return self.name else: raise RuntimeError(f"Failed to submit RayJob {self.name}") @@ -230,11 +216,32 @@ def _build_rayjob_cr(self) -> Dict[str, Any]: }, "spec": { "entrypoint": self.entrypoint, - "shutdownAfterJobFinishes": self.shutdown_after_job_finishes, "ttlSecondsAfterFinished": self.ttl_seconds_after_finished, + "shutdownAfterJobFinishes": self._cluster_config is not None, }, } + labels = {} + # If cluster_config is provided, use the local_queue from the cluster_config + if self._cluster_config is not None: + if self.local_queue: + labels["kueue.x-k8s.io/queue-name"] = self.local_queue + else: + default_queue = get_default_kueue_name(self.namespace) + if default_queue: + labels["kueue.x-k8s.io/queue-name"] = default_queue + else: + # No default queue found, use "default" as fallback + labels["kueue.x-k8s.io/queue-name"] = "default" + logger.warning( + f"No default Kueue LocalQueue found in namespace '{self.namespace}'. " + f"Using 'default' as the queue name. If a LocalQueue named 'default' " + f"does not exist, the RayJob submission will fail. " + f"To fix this, please explicitly specify the 'local_queue' parameter." + ) + + rayjob_cr["metadata"]["labels"] = labels + # Add active deadline if specified if self.active_deadline_seconds: rayjob_cr["spec"]["activeDeadlineSeconds"] = self.active_deadline_seconds diff --git a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py index 54ad61dd..829265d6 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_rayjob.py +++ b/src/codeflare_sdk/ray/rayjobs/test_rayjob.py @@ -29,23 +29,52 @@ ) -def test_rayjob_submit_success(mocker): - """Test successful RayJob submission.""" - # Mock kubernetes config loading +# Global test setup that runs automatically for ALL tests +@pytest.fixture(autouse=True) +def auto_mock_setup(mocker): + """Automatically mock common dependencies for all tests.""" mocker.patch("kubernetes.config.load_kube_config") - # Mock the RayjobApi class entirely - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + # Always mock get_default_kueue_name to prevent K8s API calls + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_default_kueue_name", + return_value="default-queue", + ) + + mock_get_ns = mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", + return_value="test-namespace", + ) + + mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mock_rayjob_instance = MagicMock() + mock_rayjob_api.return_value = mock_rayjob_instance - # Mock the RayClusterApi class - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_cluster_instance = MagicMock() + mock_cluster_api.return_value = mock_cluster_instance + + mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") + mock_k8s_instance = MagicMock() + mock_k8s_api.return_value = mock_k8s_instance + + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + + # Return the mocked instances so tests can configure them as needed + return { + "rayjob_api": mock_rayjob_instance, + "cluster_api": mock_cluster_instance, + "k8s_api": mock_k8s_instance, + "get_current_namespace": mock_get_ns, + } + + +def test_rayjob_submit_success(auto_mock_setup): + """Test successful RayJob submission.""" + mock_api_instance = auto_mock_setup["rayjob_api"] - # Configure the mock to return success when submit is called mock_api_instance.submit.return_value = {"metadata": {"name": "test-rayjob"}} - # Create RayJob instance rayjob = RayJob( job_name="test-rayjob", cluster_name="test-ray-cluster", @@ -54,20 +83,15 @@ def test_rayjob_submit_success(mocker): runtime_env={"pip": ["requests"]}, ) - # Submit the job job_id = rayjob.submit() - # Assertions assert job_id == "test-rayjob" - # Verify the API was called with correct parameters mock_api_instance.submit_job.assert_called_once() call_args = mock_api_instance.submit_job.call_args - # Check the namespace parameter assert call_args.kwargs["k8s_namespace"] == "test-namespace" - # Check the job custom resource job_cr = call_args.kwargs["job"] assert job_cr["metadata"]["name"] == "test-rayjob" assert job_cr["metadata"]["namespace"] == "test-namespace" @@ -76,23 +100,12 @@ def test_rayjob_submit_success(mocker): assert job_cr["spec"]["runtimeEnvYAML"] == "{'pip': ['requests']}" -def test_rayjob_submit_failure(mocker): +def test_rayjob_submit_failure(auto_mock_setup): """Test RayJob submission failure.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") + mock_api_instance = auto_mock_setup["rayjob_api"] - # Mock the RayjobApi class entirely - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance - - # Mock the RayClusterApi class - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - # Configure the mock to return failure (False/None) when submit_job is called mock_api_instance.submit_job.return_value = None - # Create a RayJob instance rayjob = RayJob( job_name="test-rayjob", cluster_name="test-ray-cluster", @@ -101,19 +114,12 @@ def test_rayjob_submit_failure(mocker): runtime_env={"pip": ["numpy"]}, ) - # Test that RuntimeError is raised on failure with pytest.raises(RuntimeError, match="Failed to submit RayJob test-rayjob"): rayjob.submit() -def test_rayjob_init_validation_both_provided(mocker): +def test_rayjob_init_validation_both_provided(auto_mock_setup): """Test that providing both cluster_name and cluster_config raises error.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - cluster_config = ClusterConfiguration(name="test-cluster", namespace="test") with pytest.raises( @@ -128,14 +134,8 @@ def test_rayjob_init_validation_both_provided(mocker): ) -def test_rayjob_init_validation_neither_provided(mocker): +def test_rayjob_init_validation_neither_provided(auto_mock_setup): """Test that providing neither cluster_name nor cluster_config raises error.""" - # Mock kubernetes config loading (though this should fail before reaching it) - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely (though this should fail before reaching it) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - with pytest.raises( ValueError, match="โŒ Configuration Error: You must provide either 'cluster_name'", @@ -143,14 +143,8 @@ def test_rayjob_init_validation_neither_provided(mocker): RayJob(job_name="test-job", entrypoint="python script.py") -def test_rayjob_init_with_cluster_config(mocker): +def test_rayjob_init_with_cluster_config(auto_mock_setup): """Test RayJob initialization with cluster configuration for auto-creation.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - cluster_config = ClusterConfiguration( name="auto-cluster", namespace="test-namespace", num_workers=2 ) @@ -168,14 +162,8 @@ def test_rayjob_init_with_cluster_config(mocker): assert rayjob._cluster_name is None -def test_rayjob_cluster_name_generation(mocker): +def test_rayjob_cluster_name_generation(auto_mock_setup): """Test that cluster names are generated when config has empty name.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - cluster_config = ClusterConfiguration( name="", # Empty name should trigger generation namespace="test-namespace", @@ -192,14 +180,8 @@ def test_rayjob_cluster_name_generation(mocker): assert rayjob.cluster_name == "my-job-cluster" -def test_rayjob_cluster_config_namespace_none(mocker): +def test_rayjob_cluster_config_namespace_none(auto_mock_setup): """Test that cluster config namespace is set when None.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - cluster_config = ClusterConfiguration( name="test-cluster", namespace=None, # This should be set to job namespace @@ -216,14 +198,8 @@ def test_rayjob_cluster_config_namespace_none(mocker): assert rayjob.namespace == "job-namespace" -def test_rayjob_with_active_deadline_seconds(mocker): +def test_rayjob_with_active_deadline_seconds(auto_mock_setup): """Test RayJob CR generation with active deadline seconds.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="test-cluster", @@ -237,15 +213,8 @@ def test_rayjob_with_active_deadline_seconds(mocker): assert rayjob_cr["spec"]["activeDeadlineSeconds"] == 30 -def test_build_ray_cluster_spec_no_config_error(mocker): +def test_build_ray_cluster_spec_no_config_error(auto_mock_setup): """Test _build_ray_cluster_spec raises error when no cluster config.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Create RayJob with cluster_name (no cluster_config) rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -253,22 +222,14 @@ def test_build_ray_cluster_spec_no_config_error(mocker): namespace="test-namespace", ) - # Since we removed _build_ray_cluster_spec method, this test is no longer applicable - # The method is now called internally by _build_rayjob_cr when needed - # We can test this by calling _build_rayjob_cr instead rayjob_cr = rayjob._build_rayjob_cr() - # Should use clusterSelector for existing cluster assert rayjob_cr["spec"]["clusterSelector"]["ray.io/cluster"] == "existing-cluster" assert "rayClusterSpec" not in rayjob_cr["spec"] -def test_build_ray_cluster_spec(mocker): +def test_build_ray_cluster_spec(mocker, auto_mock_setup): """Test _build_ray_cluster_spec method.""" - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") mock_ray_cluster = { "apiVersion": "ray.io/v1", @@ -280,10 +241,7 @@ def test_build_ray_cluster_spec(mocker): "workerGroupSpecs": [{"replicas": 2}], }, } - # Use ManagedClusterConfig which has the build_ray_cluster_spec method cluster_config = ManagedClusterConfig(num_workers=2) - - # Mock the method that will be called mocker.patch.object( cluster_config, "build_ray_cluster_spec", return_value=mock_ray_cluster["spec"] ) @@ -295,24 +253,16 @@ def test_build_ray_cluster_spec(mocker): namespace="test-namespace", ) - # Test the integration through _build_rayjob_cr rayjob_cr = rayjob._build_rayjob_cr() - # Should have rayClusterSpec assert "rayClusterSpec" in rayjob_cr["spec"] - - # Verify build_ray_cluster_spec was called on the cluster config cluster_config.build_ray_cluster_spec.assert_called_once_with( cluster_name="test-job-cluster" ) -def test_build_rayjob_cr_with_existing_cluster(mocker): +def test_build_rayjob_cr_with_existing_cluster(auto_mock_setup): """Test _build_rayjob_cr method with existing cluster.""" - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") rayjob = RayJob( job_name="test-job", @@ -324,30 +274,20 @@ def test_build_rayjob_cr_with_existing_cluster(mocker): rayjob_cr = rayjob._build_rayjob_cr() - # Check basic structure assert rayjob_cr["apiVersion"] == "ray.io/v1" assert rayjob_cr["kind"] == "RayJob" assert rayjob_cr["metadata"]["name"] == "test-job" - - # Check lifecycle parameters spec = rayjob_cr["spec"] assert spec["entrypoint"] == "python main.py" - # shutdownAfterJobFinishes should be False when using existing cluster (auto-set) assert spec["shutdownAfterJobFinishes"] is False assert spec["ttlSecondsAfterFinished"] == 300 - # Should use clusterSelector for existing cluster assert spec["clusterSelector"]["ray.io/cluster"] == "existing-cluster" assert "rayClusterSpec" not in spec -def test_build_rayjob_cr_with_auto_cluster(mocker): +def test_build_rayjob_cr_with_auto_cluster(mocker, auto_mock_setup): """Test _build_rayjob_cr method with auto-created cluster.""" - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_ray_cluster = { "apiVersion": "ray.io/v1", "kind": "RayCluster", @@ -360,7 +300,6 @@ def test_build_rayjob_cr_with_auto_cluster(mocker): } cluster_config = ManagedClusterConfig(num_workers=2) - # Mock the method that will be called mocker.patch.object( cluster_config, "build_ray_cluster_spec", return_value=mock_ray_cluster["spec"] ) @@ -373,17 +312,12 @@ def test_build_rayjob_cr_with_auto_cluster(mocker): ) rayjob_cr = rayjob._build_rayjob_cr() - - # Should use rayClusterSpec for auto-created cluster assert rayjob_cr["spec"]["rayClusterSpec"] == mock_ray_cluster["spec"] assert "clusterSelector" not in rayjob_cr["spec"] -def test_submit_validation_no_entrypoint(mocker): +def test_submit_validation_no_entrypoint(auto_mock_setup): """Test that submit() raises error when entrypoint is None.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="test-cluster", @@ -397,9 +331,9 @@ def test_submit_validation_no_entrypoint(mocker): rayjob.submit() -def test_submit_with_auto_cluster(mocker): +def test_submit_with_auto_cluster(mocker, auto_mock_setup): """Test successful submission with auto-created cluster.""" - mocker.patch("kubernetes.config.load_kube_config") + mock_api_instance = auto_mock_setup["rayjob_api"] mock_ray_cluster = { "apiVersion": "ray.io/v1", @@ -410,16 +344,9 @@ def test_submit_with_auto_cluster(mocker): "workerGroupSpecs": [{"replicas": 1}], }, } - # Mock the RayjobApi - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance mock_api_instance.submit_job.return_value = True - # Use ManagedClusterConfig and mock its build_ray_cluster_spec method cluster_config = ManagedClusterConfig(num_workers=1) - - # Mock the method that will be called mocker.patch.object( cluster_config, "build_ray_cluster_spec", return_value=mock_ray_cluster["spec"] ) @@ -435,7 +362,6 @@ def test_submit_with_auto_cluster(mocker): assert result == "test-job" - # Verify the correct RayJob CR was submitted mock_api_instance.submit_job.assert_called_once() call_args = mock_api_instance.submit_job.call_args @@ -444,15 +370,9 @@ def test_submit_with_auto_cluster(mocker): assert job_cr["spec"]["rayClusterSpec"] == mock_ray_cluster["spec"] -def test_namespace_auto_detection_success(mocker): +def test_namespace_auto_detection_success(auto_mock_setup): """Test successful namespace auto-detection.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="detected-ns", - ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + auto_mock_setup["get_current_namespace"].return_value = "detected-ns" rayjob = RayJob( job_name="test-job", entrypoint="python script.py", cluster_name="test-cluster" @@ -461,14 +381,9 @@ def test_namespace_auto_detection_success(mocker): assert rayjob.namespace == "detected-ns" -def test_namespace_auto_detection_fallback(mocker): +def test_namespace_auto_detection_fallback(auto_mock_setup): """Test that namespace auto-detection failure raises an error.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", return_value=None - ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + auto_mock_setup["get_current_namespace"].return_value = None with pytest.raises(ValueError, match="Could not auto-detect Kubernetes namespace"): RayJob( @@ -478,15 +393,9 @@ def test_namespace_auto_detection_fallback(mocker): ) -def test_namespace_explicit_override(mocker): +def test_namespace_explicit_override(auto_mock_setup): """Test that explicit namespace overrides auto-detection.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="detected-ns", - ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + auto_mock_setup["get_current_namespace"].return_value = "detected-ns" rayjob = RayJob( job_name="test-job", @@ -498,46 +407,8 @@ def test_namespace_explicit_override(mocker): assert rayjob.namespace == "explicit-ns" -def test_shutdown_behavior_with_cluster_config(mocker): - """Test that shutdown_after_job_finishes is True when cluster_config is provided.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - cluster_config = ManagedClusterConfig() - - rayjob = RayJob( - job_name="test-job", - entrypoint="python script.py", - cluster_config=cluster_config, - namespace="test-namespace", - ) - - assert rayjob.shutdown_after_job_finishes is True - - -def test_shutdown_behavior_with_existing_cluster(mocker): - """Test that shutdown_after_job_finishes is False when using existing cluster.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - rayjob = RayJob( - job_name="test-job", - entrypoint="python script.py", - cluster_name="existing-cluster", - namespace="test-namespace", - ) - - assert rayjob.shutdown_after_job_finishes is False - - -def test_rayjob_with_rayjob_cluster_config(mocker): +def test_rayjob_with_rayjob_cluster_config(auto_mock_setup): """Test RayJob with the new ManagedClusterConfig.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - cluster_config = ManagedClusterConfig( num_workers=2, head_cpu_requests="500m", @@ -555,13 +426,8 @@ def test_rayjob_with_rayjob_cluster_config(mocker): assert rayjob.cluster_name == "test-job-cluster" # Generated from job name -def test_rayjob_cluster_config_validation(mocker): +def test_rayjob_cluster_config_validation(auto_mock_setup): """Test validation of ManagedClusterConfig parameters.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - # Test with minimal valid config cluster_config = ManagedClusterConfig() rayjob = RayJob( @@ -574,32 +440,20 @@ def test_rayjob_cluster_config_validation(mocker): assert rayjob._cluster_config is not None -def test_rayjob_missing_entrypoint_validation(mocker): +def test_rayjob_missing_entrypoint_validation(auto_mock_setup): """Test that RayJob requires entrypoint for submission.""" - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Should raise an error during construction with pytest.raises( TypeError, match="missing 1 required positional argument: 'entrypoint'" ): RayJob( job_name="test-job", cluster_name="test-cluster", - # No entrypoint provided ) -def test_build_ray_cluster_spec_integration(mocker): +def test_build_ray_cluster_spec_integration(mocker, auto_mock_setup): """Test integration with the new build_ray_cluster_spec method.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi class entirely - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - cluster_config = ManagedClusterConfig() - - # Mock the build_ray_cluster_spec method on the cluster config mock_spec = {"spec": "test-spec"} mocker.patch.object( cluster_config, "build_ray_cluster_spec", return_value=mock_spec @@ -612,25 +466,17 @@ def test_build_ray_cluster_spec_integration(mocker): namespace="test-namespace", ) - # Build the RayJob CR rayjob_cr = rayjob._build_rayjob_cr() - # Verify the method was called correctly cluster_config.build_ray_cluster_spec.assert_called_once_with( cluster_name="test-job-cluster" ) - - # Verify the spec is included in the RayJob CR assert "rayClusterSpec" in rayjob_cr["spec"] assert rayjob_cr["spec"]["rayClusterSpec"] == mock_spec -def test_rayjob_with_runtime_env(mocker): +def test_rayjob_with_runtime_env(auto_mock_setup): """Test RayJob with runtime environment configuration.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - runtime_env = {"pip": ["numpy", "pandas"]} rayjob = RayJob( @@ -643,16 +489,12 @@ def test_rayjob_with_runtime_env(mocker): assert rayjob.runtime_env == runtime_env - # Verify runtime env is included in the CR rayjob_cr = rayjob._build_rayjob_cr() assert rayjob_cr["spec"]["runtimeEnvYAML"] == str(runtime_env) -def test_rayjob_with_active_deadline_and_ttl(mocker): +def test_rayjob_with_active_deadline_and_ttl(auto_mock_setup): """Test RayJob with both active deadline and TTL settings.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") rayjob = RayJob( job_name="test-job", @@ -666,17 +508,13 @@ def test_rayjob_with_active_deadline_and_ttl(mocker): assert rayjob.active_deadline_seconds == 300 assert rayjob.ttl_seconds_after_finished == 600 - # Verify both are included in the CR rayjob_cr = rayjob._build_rayjob_cr() assert rayjob_cr["spec"]["activeDeadlineSeconds"] == 300 assert rayjob_cr["spec"]["ttlSecondsAfterFinished"] == 600 -def test_rayjob_cluster_name_generation_with_config(mocker): +def test_rayjob_cluster_name_generation_with_config(auto_mock_setup): """Test cluster name generation when using cluster_config.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") cluster_config = ManagedClusterConfig() @@ -688,20 +526,11 @@ def test_rayjob_cluster_name_generation_with_config(mocker): ) assert rayjob.cluster_name == "my-job-cluster" - # Note: cluster_config.name is not set in RayJob (it's only for resource config) - # The cluster name is generated independently for the RayJob -def test_rayjob_namespace_propagation_to_cluster_config(mocker): +def test_rayjob_namespace_propagation_to_cluster_config(auto_mock_setup): """Test that job namespace is propagated to cluster config when None.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="detected-ns", - ) + auto_mock_setup["get_current_namespace"].return_value = "detected-ns" cluster_config = ManagedClusterConfig() @@ -714,9 +543,8 @@ def test_rayjob_namespace_propagation_to_cluster_config(mocker): assert rayjob.namespace == "detected-ns" -def test_rayjob_error_handling_invalid_cluster_config(mocker): +def test_rayjob_error_handling_invalid_cluster_config(auto_mock_setup): """Test error handling with invalid cluster configuration.""" - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") with pytest.raises(ValueError): RayJob( @@ -725,13 +553,8 @@ def test_rayjob_error_handling_invalid_cluster_config(mocker): ) -def test_rayjob_constructor_parameter_validation(mocker): +def test_rayjob_constructor_parameter_validation(auto_mock_setup): """Test constructor parameter validation.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - # Test with valid parameters rayjob = RayJob( job_name="test-job", entrypoint="python script.py", @@ -751,9 +574,8 @@ def test_rayjob_constructor_parameter_validation(mocker): assert rayjob.active_deadline_seconds == 600 -def test_build_ray_cluster_spec_function(mocker): +def test_build_ray_cluster_spec_function(): """Test the build_ray_cluster_spec method directly.""" - # Create a test cluster config cluster_config = ManagedClusterConfig( num_workers=2, head_cpu_requests="500m", @@ -762,23 +584,16 @@ def test_build_ray_cluster_spec_function(mocker): worker_memory_requests="256Mi", ) - # Build the spec using the method on the cluster config spec = cluster_config.build_ray_cluster_spec("test-cluster") - - # Verify basic structure assert "rayVersion" in spec - assert "enableInTreeAutoscaling" in spec assert "headGroupSpec" in spec assert "workerGroupSpecs" in spec - # Verify head group spec head_spec = spec["headGroupSpec"] assert head_spec["serviceType"] == "ClusterIP" assert head_spec["enableIngress"] is False assert "rayStartParams" in head_spec assert "template" in head_spec - - # Verify worker group spec worker_specs = spec["workerGroupSpecs"] assert len(worker_specs) == 1 worker_spec = worker_specs[0] @@ -788,24 +603,19 @@ def test_build_ray_cluster_spec_function(mocker): assert worker_spec["groupName"] == "worker-group-test-cluster" -def test_build_ray_cluster_spec_with_accelerators(mocker): +def test_build_ray_cluster_spec_with_accelerators(): """Test build_ray_cluster_spec with GPU accelerators.""" - # Create a test cluster config with GPU accelerators cluster_config = ManagedClusterConfig( head_accelerators={"nvidia.com/gpu": 1}, worker_accelerators={"nvidia.com/gpu": 2}, ) - # Build the spec using the method on the cluster config spec = cluster_config.build_ray_cluster_spec("test-cluster") - - # Verify head group has GPU parameters head_spec = spec["headGroupSpec"] head_params = head_spec["rayStartParams"] assert "num-gpus" in head_params assert head_params["num-gpus"] == "1" - # Verify worker group has GPU parameters worker_specs = spec["workerGroupSpecs"] worker_spec = worker_specs[0] worker_params = worker_spec["rayStartParams"] @@ -813,43 +623,32 @@ def test_build_ray_cluster_spec_with_accelerators(mocker): assert worker_params["num-gpus"] == "2" -def test_build_ray_cluster_spec_with_custom_volumes(mocker): +def test_build_ray_cluster_spec_with_custom_volumes(): """Test build_ray_cluster_spec with custom volumes and volume mounts.""" - # Create custom volumes and volume mounts custom_volume = V1Volume(name="custom-data", empty_dir={}) custom_volume_mount = V1VolumeMount(name="custom-data", mount_path="/data") - - # Create a test cluster config with custom volumes cluster_config = ManagedClusterConfig( volumes=[custom_volume], volume_mounts=[custom_volume_mount], ) - # Build the spec using the method on the cluster config spec = cluster_config.build_ray_cluster_spec("test-cluster") - - # Verify custom volumes are included head_spec = spec["headGroupSpec"] - head_pod_spec = head_spec["template"].spec # Access the spec attribute - # Note: We can't easily check DEFAULT_VOLUMES length since they're now part of the class + head_pod_spec = head_spec["template"].spec assert len(head_pod_spec.volumes) > 0 - # Verify custom volume mounts are included - head_container = head_pod_spec.containers[0] # Access the containers attribute - # Note: We can't easily check DEFAULT_VOLUME_MOUNTS length since they're now part of the class + head_container = head_pod_spec.containers[0] assert len(head_container.volume_mounts) > 0 -def test_build_ray_cluster_spec_with_environment_variables(mocker): +def test_build_ray_cluster_spec_with_environment_variables(): """Test build_ray_cluster_spec with environment variables.""" - # Create a test cluster config with environment variables cluster_config = ManagedClusterConfig( envs={"CUDA_VISIBLE_DEVICES": "0", "RAY_DISABLE_IMPORT_WARNING": "1"}, ) spec = cluster_config.build_ray_cluster_spec("test-cluster") - # Verify environment variables are included in head container head_spec = spec["headGroupSpec"] head_pod_spec = head_spec["template"].spec head_container = head_pod_spec.containers[0] @@ -857,8 +656,6 @@ def test_build_ray_cluster_spec_with_environment_variables(mocker): env_vars = {env.name: env.value for env in head_container.env} assert env_vars["CUDA_VISIBLE_DEVICES"] == "0" assert env_vars["RAY_DISABLE_IMPORT_WARNING"] == "1" - - # Verify environment variables are included in worker container worker_specs = spec["workerGroupSpecs"] worker_spec = worker_specs[0] worker_pod_spec = worker_spec["template"].spec @@ -870,9 +667,8 @@ def test_build_ray_cluster_spec_with_environment_variables(mocker): assert worker_env_vars["RAY_DISABLE_IMPORT_WARNING"] == "1" -def test_build_ray_cluster_spec_with_tolerations(mocker): +def test_build_ray_cluster_spec_with_tolerations(): """Test build_ray_cluster_spec with tolerations.""" - # Create test tolerations head_toleration = V1Toleration( key="node-role.kubernetes.io/master", operator="Exists", effect="NoSchedule" ) @@ -880,42 +676,36 @@ def test_build_ray_cluster_spec_with_tolerations(mocker): key="nvidia.com/gpu", operator="Exists", effect="NoSchedule" ) - # Create a test cluster config with tolerations cluster_config = ManagedClusterConfig( head_tolerations=[head_toleration], worker_tolerations=[worker_toleration], ) spec = cluster_config.build_ray_cluster_spec("test-cluster") - - # Verify head tolerations head_spec = spec["headGroupSpec"] - head_pod_spec = head_spec["template"].spec # Access the spec attribute + head_pod_spec = head_spec["template"].spec assert hasattr(head_pod_spec, "tolerations") assert len(head_pod_spec.tolerations) == 1 assert head_pod_spec.tolerations[0].key == "node-role.kubernetes.io/master" - # Verify worker tolerations worker_specs = spec["workerGroupSpecs"] worker_spec = worker_specs[0] - worker_pod_spec = worker_spec["template"].spec # Access the spec attribute + worker_pod_spec = worker_spec["template"].spec assert hasattr(worker_pod_spec, "tolerations") assert len(worker_pod_spec.tolerations) == 1 assert worker_pod_spec.tolerations[0].key == "nvidia.com/gpu" -def test_build_ray_cluster_spec_with_image_pull_secrets(mocker): +def test_build_ray_cluster_spec_with_image_pull_secrets(): """Test build_ray_cluster_spec with image pull secrets.""" - # Create a test cluster config with image pull secrets cluster_config = ManagedClusterConfig( image_pull_secrets=["my-registry-secret", "another-secret"] ) spec = cluster_config.build_ray_cluster_spec("test-cluster") - # Verify image pull secrets are included in head pod head_spec = spec["headGroupSpec"] - head_pod_spec = head_spec["template"].spec # Access the spec attribute + head_pod_spec = head_spec["template"].spec assert hasattr(head_pod_spec, "image_pull_secrets") head_secrets = head_pod_spec.image_pull_secrets @@ -923,7 +713,6 @@ def test_build_ray_cluster_spec_with_image_pull_secrets(mocker): assert head_secrets[0].name == "my-registry-secret" assert head_secrets[1].name == "another-secret" - # Verify image pull secrets are included in worker pod worker_specs = spec["workerGroupSpecs"] worker_spec = worker_specs[0] worker_pod_spec = worker_spec["template"].spec @@ -935,58 +724,12 @@ def test_build_ray_cluster_spec_with_image_pull_secrets(mocker): assert worker_secrets[1].name == "another-secret" -def test_rayjob_user_override_shutdown_behavior(mocker): - """Test that user can override the auto-detected shutdown behavior.""" - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - - # Test 1: User overrides shutdown to True even when using existing cluster - rayjob_existing_override = RayJob( - job_name="test-job", - entrypoint="python script.py", - cluster_name="existing-cluster", - shutdown_after_job_finishes=True, # User override - namespace="test-namespace", # Explicitly specify namespace - ) - - assert rayjob_existing_override.shutdown_after_job_finishes is True - - # Test 2: User overrides shutdown to False even when creating new cluster - cluster_config = ManagedClusterConfig() - - rayjob_new_override = RayJob( - job_name="test-job", - entrypoint="python script.py", - cluster_config=cluster_config, - shutdown_after_job_finishes=False, # User override - namespace="test-namespace", # Explicitly specify namespace - ) - - assert rayjob_new_override.shutdown_after_job_finishes is False - - # Test 3: User override takes precedence over auto-detection - rayjob_override_priority = RayJob( - job_name="test-job", - entrypoint="python script.py", - cluster_config=cluster_config, - shutdown_after_job_finishes=True, # Should override auto-detection - namespace="test-namespace", # Explicitly specify namespace - ) - - assert rayjob_override_priority.shutdown_after_job_finishes is True - - class TestRayVersionValidation: """Test Ray version validation in RayJob.""" - def test_submit_with_cluster_config_compatible_image_passes(self, mocker): + def test_submit_with_cluster_config_compatible_image_passes(self, auto_mock_setup): """Test that submission passes with compatible cluster_config image.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.submit_job.return_value = True cluster_config = ManagedClusterConfig(image=f"ray:{RAY_VERSION}") @@ -998,17 +741,12 @@ def test_submit_with_cluster_config_compatible_image_passes(self, mocker): entrypoint="python script.py", ) - # Should not raise any validation errors result = rayjob.submit() assert result == "test-job" - def test_submit_with_cluster_config_incompatible_image_fails(self, mocker): + def test_submit_with_cluster_config_incompatible_image_fails(self, auto_mock_setup): """Test that submission fails with incompatible cluster_config image.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + # cluster_config = ManagedClusterConfig(image="ray:2.8.0") # Different version @@ -1019,19 +757,14 @@ def test_submit_with_cluster_config_incompatible_image_fails(self, mocker): entrypoint="python script.py", ) - # Should raise ValueError for version mismatch with pytest.raises( ValueError, match="Cluster config image: Ray version mismatch detected" ): rayjob.submit() - def test_validate_ray_version_compatibility_method(self, mocker): + def test_validate_ray_version_compatibility_method(self, auto_mock_setup): """Test the _validate_ray_version_compatibility method directly.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + # rayjob = RayJob( job_name="test-job", @@ -1040,34 +773,24 @@ def test_validate_ray_version_compatibility_method(self, mocker): entrypoint="python script.py", ) - # Test with no cluster_config (should not raise) - rayjob._validate_ray_version_compatibility() # Should not raise - - # Test with compatible cluster_config version + rayjob._validate_ray_version_compatibility() rayjob._cluster_config = ManagedClusterConfig(image=f"ray:{RAY_VERSION}") - rayjob._validate_ray_version_compatibility() # Should not raise - - # Test with incompatible cluster_config version + rayjob._validate_ray_version_compatibility() rayjob._cluster_config = ManagedClusterConfig(image="ray:2.8.0") with pytest.raises( ValueError, match="Cluster config image: Ray version mismatch detected" ): rayjob._validate_ray_version_compatibility() - # Test with unknown cluster_config version (should warn but not fail) rayjob._cluster_config = ManagedClusterConfig(image="custom-image:latest") with pytest.warns( UserWarning, match="Cluster config image: Cannot determine Ray version" ): rayjob._validate_ray_version_compatibility() - def test_validate_cluster_config_image_method(self, mocker): + def test_validate_cluster_config_image_method(self, auto_mock_setup): """Test the _validate_cluster_config_image method directly.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + # rayjob = RayJob( job_name="test-job", @@ -1076,34 +799,23 @@ def test_validate_cluster_config_image_method(self, mocker): entrypoint="python script.py", ) - # Test with no image (should not raise) - rayjob._validate_cluster_config_image() # Should not raise - - # Test with compatible image + rayjob._validate_cluster_config_image() rayjob._cluster_config.image = f"ray:{RAY_VERSION}" - rayjob._validate_cluster_config_image() # Should not raise - - # Test with incompatible image + rayjob._validate_cluster_config_image() rayjob._cluster_config.image = "ray:2.8.0" with pytest.raises( ValueError, match="Cluster config image: Ray version mismatch detected" ): rayjob._validate_cluster_config_image() - # Test with unknown image (should warn but not fail) rayjob._cluster_config.image = "custom-image:latest" with pytest.warns( UserWarning, match="Cluster config image: Cannot determine Ray version" ): rayjob._validate_cluster_config_image() - def test_validate_cluster_config_image_edge_cases(self, mocker): + def test_validate_cluster_config_image_edge_cases(self, auto_mock_setup): """Test edge cases in _validate_cluster_config_image method.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance rayjob = RayJob( job_name="test-job", @@ -1112,31 +824,22 @@ def test_validate_cluster_config_image_edge_cases(self, mocker): entrypoint="python script.py", ) - # Test with None image (should not raise) rayjob._cluster_config.image = None - rayjob._validate_cluster_config_image() # Should not raise - - # Test with empty string image (should not raise) + rayjob._validate_cluster_config_image() rayjob._cluster_config.image = "" - rayjob._validate_cluster_config_image() # Should not raise - - # Test with non-string image (should log warning and skip) + rayjob._validate_cluster_config_image() rayjob._cluster_config.image = 123 - rayjob._validate_cluster_config_image() # Should log warning and not raise + rayjob._validate_cluster_config_image() - # Test with cluster config that has no image attribute class MockClusterConfig: pass rayjob._cluster_config = MockClusterConfig() - rayjob._validate_cluster_config_image() # Should not raise + rayjob._validate_cluster_config_image() -def test_extract_script_files_from_entrypoint_single_script(mocker, tmp_path): +def test_extract_script_files_from_entrypoint_single_script(auto_mock_setup, tmp_path): """Test extracting a single script file from entrypoint.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") # Create a test script test_script = tmp_path / "test_script.py" @@ -1164,11 +867,8 @@ def test_extract_script_files_from_entrypoint_single_script(mocker, tmp_path): os.chdir(original_cwd) -def test_extract_script_files_with_dependencies(mocker, tmp_path): +def test_extract_script_files_with_dependencies(auto_mock_setup, tmp_path): """Test extracting script files with local dependencies.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") # Create main script and dependency main_script = tmp_path / "main.py" @@ -1223,7 +923,6 @@ def calculate(x): assert "helper.py" in scripts assert "utils.py" in scripts - # Verify content assert "import helper" in scripts["main.py"] assert "def do_something" in scripts["helper.py"] assert "def calculate" in scripts["utils.py"] @@ -1232,11 +931,8 @@ def calculate(x): os.chdir(original_cwd) -def test_extract_script_files_no_local_scripts(mocker): +def test_extract_script_files_no_local_scripts(auto_mock_setup): """Test entrypoint with no local script files.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") rayjob = RayJob( job_name="test-job", @@ -1250,11 +946,8 @@ def test_extract_script_files_no_local_scripts(mocker): assert scripts is None -def test_extract_script_files_nonexistent_script(mocker): +def test_extract_script_files_nonexistent_script(auto_mock_setup): """Test entrypoint referencing non-existent script.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") rayjob = RayJob( job_name="test-job", @@ -1309,7 +1002,6 @@ def test_add_script_volumes(): config.add_script_volumes(configmap_name="test-scripts") - # Should have added one volume and one mount assert len(config.volumes) == 1 assert len(config.volume_mounts) == 1 @@ -1331,21 +1023,13 @@ def test_add_script_volumes_duplicate_prevention(): config.add_script_volumes(configmap_name="test-scripts") config.add_script_volumes(configmap_name="test-scripts") - # Should still have only one of each assert len(config.volumes) == 1 assert len(config.volume_mounts) == 1 -def test_create_configmap_from_spec(mocker): +def test_create_configmap_from_spec(auto_mock_setup): """Test creating ConfigMap via Kubernetes API.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = MagicMock() - mock_k8s_api.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_api_instance = auto_mock_setup["k8s_api"] rayjob = RayJob( job_name="test-job", @@ -1367,16 +1051,9 @@ def test_create_configmap_from_spec(mocker): mock_api_instance.create_namespaced_config_map.assert_called_once() -def test_create_configmap_already_exists(mocker): +def test_create_configmap_already_exists(auto_mock_setup): """Test creating ConfigMap when it already exists (409 conflict).""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = MagicMock() - mock_k8s_api.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_api_instance = auto_mock_setup["k8s_api"] mock_api_instance.create_namespaced_config_map.side_effect = ApiException( status=409 @@ -1403,19 +1080,9 @@ def test_create_configmap_already_exists(mocker): mock_api_instance.replace_namespaced_config_map.assert_called_once() -def test_create_configmap_with_owner_reference_basic(mocker, caplog): +def test_create_configmap_with_owner_reference_basic(mocker, auto_mock_setup, caplog): """Test creating ConfigMap with owner reference from valid RayJob result.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Mock Kubernetes API - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = MagicMock() - mock_k8s_api.return_value = mock_api_instance - - # Mock get_api_client - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_api_instance = auto_mock_setup["k8s_api"] # Mock client.V1ObjectMeta and V1ConfigMap mock_v1_metadata = mocker.patch("kubernetes.client.V1ObjectMeta") @@ -1465,21 +1132,15 @@ def test_create_configmap_with_owner_reference_basic(mocker, caplog): in caplog.text ) - # Verify owner_references was set on metadata assert mock_metadata_instance.owner_references is not None mock_api_instance.create_namespaced_config_map.assert_called_once() -def test_create_configmap_without_owner_reference_no_uid(mocker, caplog): +def test_create_configmap_without_owner_reference_no_uid( + mocker, auto_mock_setup, caplog +): """Test creating ConfigMap without owner reference when RayJob has no UID.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = MagicMock() - mock_k8s_api.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_api_instance = auto_mock_setup["k8s_api"] mock_v1_metadata = mocker.patch("kubernetes.client.V1ObjectMeta") mock_metadata_instance = MagicMock() @@ -1523,19 +1184,9 @@ def test_create_configmap_without_owner_reference_no_uid(mocker, caplog): mock_api_instance.create_namespaced_config_map.assert_called_once() -def test_create_configmap_with_invalid_rayjob_result(mocker, caplog): +def test_create_configmap_with_invalid_rayjob_result(auto_mock_setup, caplog): """Test creating ConfigMap with None or invalid rayjob_result.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Mock Kubernetes API - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = MagicMock() - mock_k8s_api.return_value = mock_api_instance - - # Mock get_api_client - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_api_instance = auto_mock_setup["k8s_api"] rayjob = RayJob( job_name="test-job", @@ -1567,10 +1218,9 @@ def test_create_configmap_with_invalid_rayjob_result(mocker, caplog): assert "No valid RayJob result with UID found" in caplog.text -def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): +def test_handle_script_volumes_for_new_cluster(mocker, auto_mock_setup, tmp_path): """Test handling script volumes for new cluster creation.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + # auto_mock_setup handles kubernetes and API mocking mock_create = mocker.patch.object(RayJob, "_create_configmap_from_spec") mock_create.return_value = "test-job-scripts" @@ -1603,10 +1253,8 @@ def test_handle_script_volumes_for_new_cluster(mocker, tmp_path): os.chdir(original_cwd) -def test_ast_parsing_import_detection(mocker, tmp_path): +def test_ast_parsing_import_detection(auto_mock_setup, tmp_path): """Test AST parsing correctly detects import statements.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") main_script = tmp_path / "main.py" main_script.write_text( @@ -1656,13 +1304,11 @@ def func2(): pass os.chdir(original_cwd) -def test_script_handling_timing_after_rayjob_submission(mocker, tmp_path): +def test_script_handling_timing_after_rayjob_submission( + mocker, auto_mock_setup, tmp_path +): """Test that script handling happens after RayJob is submitted (not before).""" - mocker.patch("kubernetes.config.load_kube_config") - - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + mock_api_instance = auto_mock_setup["rayjob_api"] submit_result = { "metadata": { @@ -1677,7 +1323,7 @@ def test_script_handling_timing_after_rayjob_submission(mocker, tmp_path): RayJob, "_handle_script_volumes_for_new_cluster" ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + # RayClusterApi is already mocked by auto_mock_setup test_script = tmp_path / "test.py" test_script.write_text("print('test')") @@ -1721,22 +1367,12 @@ def track_handle_scripts(*args, **kwargs): mock_handle_new.assert_called_with({"test.py": "print('test')"}, submit_result) -def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): +def test_rayjob_submit_with_scripts_new_cluster(auto_mock_setup, tmp_path): """Test RayJob submission with script detection for new cluster.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.submit_job.return_value = True - # Mock ConfigMap creation - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_k8s_instance = MagicMock() - mock_k8s_api.return_value = mock_k8s_instance - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") + mock_k8s_instance = auto_mock_setup["k8s_api"] # Create test script test_script = tmp_path / "test.py" @@ -1760,24 +1396,18 @@ def test_rayjob_submit_with_scripts_new_cluster(mocker, tmp_path): assert result == "test-job" - # Verify ConfigMap was created mock_k8s_instance.create_namespaced_config_map.assert_called_once() - # Verify volumes were added assert len(cluster_config.volumes) == 1 assert len(cluster_config.volume_mounts) == 1 - - # Verify entrypoint was updated assert f"{MOUNT_PATH}/test.py" in rayjob.entrypoint finally: os.chdir(original_cwd) -def test_process_script_and_imports_io_error(mocker, tmp_path): +def test_process_script_and_imports_io_error(mocker, auto_mock_setup, tmp_path): """Test _process_script_and_imports handles IO errors gracefully.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") rayjob = RayJob( job_name="test-job", @@ -1793,19 +1423,13 @@ def test_process_script_and_imports_io_error(mocker, tmp_path): mocker.patch("os.path.isfile", return_value=True) mocker.patch("builtins.open", side_effect=IOError("Permission denied")) - # Should handle the error gracefully and not crash rayjob._process_script_and_imports("test.py", scripts, MOUNT_PATH, processed_files) - - # Should add to processed_files but not to scripts (due to error) assert "test.py" in processed_files assert len(scripts) == 0 -def test_process_script_and_imports_container_path_skip(mocker): +def test_process_script_and_imports_container_path_skip(auto_mock_setup): """Test that scripts already in container paths are skipped.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -1821,16 +1445,12 @@ def test_process_script_and_imports_container_path_skip(mocker): f"{MOUNT_PATH}/test.py", scripts, MOUNT_PATH, processed_files ) - # Should skip processing assert len(scripts) == 0 assert len(processed_files) == 0 -def test_process_script_and_imports_already_processed(mocker, tmp_path): +def test_process_script_and_imports_already_processed(auto_mock_setup, tmp_path): """Test that already processed scripts are skipped (infinite loop prevention).""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -1841,22 +1461,18 @@ def test_process_script_and_imports_already_processed(mocker, tmp_path): scripts = {} processed_files = {"test.py"} # Already processed - # Should return early without processing rayjob._process_script_and_imports("test.py", scripts, MOUNT_PATH, processed_files) assert len(scripts) == 0 assert processed_files == {"test.py"} -def test_submit_with_scripts_owner_reference_integration(mocker, tmp_path, caplog): +def test_submit_with_scripts_owner_reference_integration( + mocker, auto_mock_setup, tmp_path, caplog +): """Integration test for submit() with local scripts to verify end-to-end owner reference flow.""" - # Mock kubernetes config loading - mocker.patch("kubernetes.config.load_kube_config") - - # Mock the RayjobApi - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_k8s_instance = auto_mock_setup["k8s_api"] # RayJob submission returns result with UID submit_result = { @@ -1868,12 +1484,6 @@ def test_submit_with_scripts_owner_reference_integration(mocker, tmp_path, caplo } mock_api_instance.submit_job.return_value = submit_result - # Mock Kubernetes ConfigMap API - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_k8s_instance = MagicMock() - mock_k8s_api.return_value = mock_k8s_instance - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - # Capture the ConfigMap that gets created created_configmap = None @@ -1910,10 +1520,7 @@ def capture_configmap(namespace, body): assert result == "test-job" - # Verify RayJob was submitted first mock_api_instance.submit_job.assert_called_once() - - # Verify ConfigMap was created with owner reference mock_k8s_instance.create_namespaced_config_map.assert_called_once() assert created_configmap is not None @@ -1941,11 +1548,8 @@ def capture_configmap(namespace, body): == "rayjob-scripts" ) - # Verify scripts were included assert "main.py" in created_configmap.data assert "helper.py" in created_configmap.data - - # Verify log message assert ( "Adding owner reference to ConfigMap 'test-job-scripts' with RayJob UID: unique-rayjob-uid-12345" in caplog.text @@ -1955,11 +1559,8 @@ def capture_configmap(namespace, body): os.chdir(original_cwd) -def test_find_local_imports_syntax_error(mocker): +def test_find_local_imports_syntax_error(mocker, auto_mock_setup): """Test _find_local_imports handles syntax errors gracefully.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -1972,29 +1573,19 @@ def test_find_local_imports_syntax_error(mocker): mock_callback = mocker.Mock() - # Should handle syntax error gracefully rayjob._find_local_imports(invalid_script_content, "test.py", mock_callback) - - # Callback should not be called due to syntax error mock_callback.assert_not_called() -def test_create_configmap_api_error_non_409(mocker): +def test_create_configmap_api_error_non_409(auto_mock_setup): """Test _create_configmap_from_spec handles non-409 API errors.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Mock Kubernetes API with 500 error - mock_k8s_api = mocker.patch("kubernetes.client.CoreV1Api") - mock_api_instance = mocker.Mock() - mock_k8s_api.return_value = mock_api_instance + mock_api_instance = auto_mock_setup["k8s_api"] + # Configure to raise 500 error mock_api_instance.create_namespaced_config_map.side_effect = ApiException( status=500 ) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client") - rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -2009,23 +1600,15 @@ def test_create_configmap_api_error_non_409(mocker): "data": {"test.py": "print('test')"}, } - # Should raise RuntimeError for non-409 API errors with pytest.raises(RuntimeError, match="Failed to create ConfigMap"): rayjob._create_configmap_from_spec(configmap_spec) -def test_update_existing_cluster_get_cluster_error(mocker): +def test_update_existing_cluster_get_cluster_error(mocker, auto_mock_setup): """Test _update_existing_cluster_for_scripts handles get cluster errors.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Mock RayClusterApi with error - mock_cluster_api_class = mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi" - ) - mock_cluster_api_instance = mocker.Mock() - mock_cluster_api_class.return_value = mock_cluster_api_instance + mock_cluster_api_instance = auto_mock_setup["cluster_api"] + # Configure it to raise an error mock_cluster_api_instance.get_ray_cluster.side_effect = ApiException(status=404) config_builder = ManagedClusterConfig() @@ -2037,22 +1620,13 @@ def test_update_existing_cluster_get_cluster_error(mocker): namespace="test-namespace", ) - # Should raise RuntimeError when getting cluster fails with pytest.raises(RuntimeError, match="Failed to get RayCluster"): rayjob._update_existing_cluster_for_scripts("test-scripts", config_builder) -def test_update_existing_cluster_patch_error(mocker): +def test_update_existing_cluster_patch_error(mocker, auto_mock_setup): """Test _update_existing_cluster_for_scripts handles patch errors.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_rayjob_api = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - - # Mock RayClusterApi - mock_cluster_api_class = mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi" - ) - mock_cluster_api_instance = mocker.Mock() - mock_cluster_api_class.return_value = mock_cluster_api_instance + mock_cluster_api_instance = auto_mock_setup["cluster_api"] # Mock successful get but failed patch mock_cluster_api_instance.get_ray_cluster.return_value = { @@ -2083,16 +1657,12 @@ def test_update_existing_cluster_patch_error(mocker): namespace="test-namespace", ) - # Should raise RuntimeError when patching fails with pytest.raises(RuntimeError, match="Failed to update RayCluster"): rayjob._update_existing_cluster_for_scripts("test-scripts", config_builder) -def test_extract_script_files_empty_entrypoint(mocker): +def test_extract_script_files_empty_entrypoint(auto_mock_setup): """Test script extraction with empty entrypoint.""" - mocker.patch("kubernetes.config.load_kube_config") - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - rayjob = RayJob( job_name="test-job", cluster_name="existing-cluster", @@ -2116,10 +1686,7 @@ def test_add_script_volumes_existing_volume_skip(): ) config.volumes.append(existing_volume) - # Should skip adding duplicate volume config.add_script_volumes(configmap_name="new-scripts") - - # Should still have only one volume assert len(config.volumes) == 1 assert len(config.volume_mounts) == 0 # Mount not added due to volume skip @@ -2132,23 +1699,14 @@ def test_add_script_volumes_existing_mount_skip(): existing_mount = V1VolumeMount(name="ray-job-scripts", mount_path="/existing/path") config.volume_mounts.append(existing_mount) - # Should skip adding duplicate mount config.add_script_volumes(configmap_name="new-scripts") - - # Should still have only one mount and no volume added assert len(config.volumes) == 0 # Volume not added due to mount skip assert len(config.volume_mounts) == 1 -def test_rayjob_stop_success(mocker, caplog): +def test_rayjob_stop_success(auto_mock_setup, caplog): """Test successful RayJob stop operation.""" - mocker.patch("kubernetes.config.load_kube_config") - - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.suspend_job.return_value = { "metadata": {"name": "test-rayjob"}, @@ -2175,15 +1733,9 @@ def test_rayjob_stop_success(mocker, caplog): assert "Successfully stopped the RayJob test-rayjob" in caplog.text -def test_rayjob_stop_failure(mocker): +def test_rayjob_stop_failure(auto_mock_setup): """Test RayJob stop operation when API call fails.""" - mocker.patch("kubernetes.config.load_kube_config") - - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.suspend_job.return_value = None @@ -2202,15 +1754,9 @@ def test_rayjob_stop_failure(mocker): ) -def test_rayjob_resubmit_success(mocker): +def test_rayjob_resubmit_success(auto_mock_setup): """Test successful RayJob resubmit operation.""" - mocker.patch("kubernetes.config.load_kube_config") - - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.resubmit_job.return_value = { "metadata": {"name": "test-rayjob"}, @@ -2233,15 +1779,9 @@ def test_rayjob_resubmit_success(mocker): ) -def test_rayjob_resubmit_failure(mocker): +def test_rayjob_resubmit_failure(auto_mock_setup): """Test RayJob resubmit operation when API call fails.""" - mocker.patch("kubernetes.config.load_kube_config") - - mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") - mock_api_instance = MagicMock() - mock_api_class.return_value = mock_api_instance - - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = auto_mock_setup["rayjob_api"] mock_api_instance.resubmit_job.return_value = None @@ -2260,18 +1800,9 @@ def test_rayjob_resubmit_failure(mocker): ) -def test_rayjob_delete_success(mocker): +def test_rayjob_delete_success(auto_mock_setup): """Test successful RayJob deletion.""" - # Mock the API - mocker.patch("kubernetes.config.load_kube_config") - mock_api_instance = mocker.MagicMock() - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) + mock_api_instance = auto_mock_setup["rayjob_api"] rayjob = RayJob( job_name="test-rayjob", @@ -2289,17 +1820,9 @@ def test_rayjob_delete_success(mocker): ) -def test_rayjob_delete_failure(mocker): +def test_rayjob_delete_failure(auto_mock_setup): """Test failed RayJob deletion.""" - mock_api_instance = mocker.MagicMock() - mocker.patch("kubernetes.config.load_kube_config") - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) + mock_api_instance = auto_mock_setup["rayjob_api"] rayjob = RayJob( job_name="test-rayjob", @@ -2317,13 +1840,8 @@ def test_rayjob_delete_failure(mocker): ) -def test_rayjob_init_both_none_error(mocker): +def test_rayjob_init_both_none_error(auto_mock_setup): """Test RayJob initialization error when both cluster_name and cluster_config are None.""" - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) - with pytest.raises( ValueError, match="Configuration Error: You must provide either 'cluster_name' .* or 'cluster_config'", @@ -2336,13 +1854,8 @@ def test_rayjob_init_both_none_error(mocker): ) -def test_rayjob_init_missing_cluster_name_with_no_config(mocker): +def test_rayjob_init_missing_cluster_name_with_no_config(auto_mock_setup): """Test RayJob initialization error when cluster_name is None without cluster_config.""" - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) - with pytest.raises( ValueError, match="Configuration Error: a 'cluster_name' is required when not providing 'cluster_config'", @@ -2353,7 +1866,7 @@ def test_rayjob_init_missing_cluster_name_with_no_config(mocker): rayjob.runtime_env = None rayjob.ttl_seconds_after_finished = 0 rayjob.active_deadline_seconds = None - rayjob.shutdown_after_job_finishes = False + rayjob.shutdown_after_job_finishes = True rayjob.namespace = "test-namespace" rayjob._cluster_name = None rayjob._cluster_config = None @@ -2363,26 +1876,11 @@ def test_rayjob_init_missing_cluster_name_with_no_config(mocker): ) -def test_handle_script_volumes_for_existing_cluster_direct_call(mocker): +def test_handle_script_volumes_for_existing_cluster_direct_call(auto_mock_setup): """Test _handle_script_volumes_for_existing_cluster method directly.""" - # Mock APIs - mock_api_instance = mocker.MagicMock() - mock_cluster_api = mocker.MagicMock() - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) - - # Mock the Kubernetes API for ConfigMap creation - mock_k8s_api = mocker.MagicMock() - mocker.patch("kubernetes.client.CoreV1Api", return_value=mock_k8s_api) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client", return_value=None) + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_cluster_api = auto_mock_setup["cluster_api"] + mock_k8s_api = auto_mock_setup["k8s_api"] # Mock existing cluster mock_cluster = { @@ -2423,24 +1921,11 @@ def test_handle_script_volumes_for_existing_cluster_direct_call(mocker): ) -def test_handle_script_volumes_for_existing_cluster_no_volumes_init(mocker): +def test_handle_script_volumes_for_existing_cluster_no_volumes_init(auto_mock_setup): """Test _handle_script_volumes_for_existing_cluster when volumes/mounts don't exist initially.""" - mock_api_instance = mocker.MagicMock() - mock_cluster_api = mocker.MagicMock() - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) - - mock_k8s_api = mocker.MagicMock() - mocker.patch("kubernetes.client.CoreV1Api", return_value=mock_k8s_api) - mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.get_api_client", return_value=None) + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_cluster_api = auto_mock_setup["cluster_api"] + mock_k8s_api = auto_mock_setup["k8s_api"] # Mock existing cluster WITHOUT volumes/volumeMounts (to test initialization) mock_cluster = { @@ -2482,20 +1967,10 @@ def test_handle_script_volumes_for_existing_cluster_no_volumes_init(mocker): assert len(worker_spec["containers"][0]["volumeMounts"]) == 1 -def test_update_existing_cluster_for_scripts_api_errors(mocker): +def test_update_existing_cluster_for_scripts_api_errors(mocker, auto_mock_setup): """Test _update_existing_cluster_for_scripts error handling.""" - mock_api_instance = mocker.MagicMock() - mock_cluster_api = mocker.MagicMock() - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayjobApi", return_value=mock_api_instance - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi", return_value=mock_cluster_api - ) - mocker.patch( - "codeflare_sdk.ray.rayjobs.rayjob.get_current_namespace", - return_value="test-namespace", - ) + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_cluster_api = auto_mock_setup["cluster_api"] # Mock config builder mock_config_builder = mocker.MagicMock() @@ -2529,3 +2004,193 @@ def test_update_existing_cluster_for_scripts_api_errors(mocker): rayjob._update_existing_cluster_for_scripts( "test-configmap", mock_config_builder ) + + +def test_rayjob_kueue_label_no_default_queue(auto_mock_setup, mocker, caplog): + """Test RayJob falls back to 'default' queue when no default queue exists.""" + mocker.patch( + "codeflare_sdk.ray.rayjobs.rayjob.get_default_kueue_name", + return_value=None, + ) + + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}} + + cluster_config = ManagedClusterConfig() + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python script.py", + ) + + with caplog.at_level("WARNING"): + rayjob.submit() + + # Verify the submitted job has the fallback label + call_args = mock_api_instance.submit_job.call_args + submitted_job = call_args.kwargs["job"] + assert submitted_job["metadata"]["labels"]["kueue.x-k8s.io/queue-name"] == "default" + + # Verify warning was logged + assert "No default Kueue LocalQueue found" in caplog.text + + +def test_rayjob_kueue_explicit_local_queue(auto_mock_setup): + """Test RayJob uses explicitly specified local queue.""" + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}} + + cluster_config = ManagedClusterConfig() + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python script.py", + local_queue="custom-queue", + ) + + rayjob.submit() + + # Verify the submitted job has the explicit queue label + call_args = mock_api_instance.submit_job.call_args + submitted_job = call_args.kwargs["job"] + assert ( + submitted_job["metadata"]["labels"]["kueue.x-k8s.io/queue-name"] + == "custom-queue" + ) + + +def test_rayjob_no_kueue_label_for_existing_cluster(auto_mock_setup): + """Test RayJob doesn't add Kueue label for existing clusters.""" + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}} + + # Using existing cluster (no cluster_config) + rayjob = RayJob( + job_name="test-job", + cluster_name="existing-cluster", + entrypoint="python script.py", + ) + + rayjob.submit() + + # Verify no Kueue label was added + call_args = mock_api_instance.submit_job.call_args + submitted_job = call_args.kwargs["job"] + assert "kueue.x-k8s.io/queue-name" not in submitted_job["metadata"]["labels"] + + +def test_rayjob_with_ttl_and_deadline(auto_mock_setup): + """Test RayJob with TTL and active deadline seconds.""" + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}} + + cluster_config = ManagedClusterConfig() + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python script.py", + ttl_seconds_after_finished=300, + active_deadline_seconds=600, + ) + + rayjob.submit() + + # Verify TTL and deadline were set + call_args = mock_api_instance.submit_job.call_args + submitted_job = call_args.kwargs["job"] + assert submitted_job["spec"]["ttlSecondsAfterFinished"] == 300 + assert submitted_job["spec"]["activeDeadlineSeconds"] == 600 + + +def test_rayjob_shutdown_after_job_finishes(auto_mock_setup): + """Test RayJob sets shutdownAfterJobFinishes correctly.""" + mock_api_instance = auto_mock_setup["rayjob_api"] + mock_api_instance.submit_job.return_value = {"metadata": {"name": "test-job"}} + + # Test with managed cluster (should shutdown) + cluster_config = ManagedClusterConfig() + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python script.py", + ) + + rayjob.submit() + + call_args = mock_api_instance.submit_job.call_args + submitted_job = call_args.kwargs["job"] + assert submitted_job["spec"]["shutdownAfterJobFinishes"] is True + + # Test with existing cluster (should not shutdown) + rayjob2 = RayJob( + job_name="test-job2", + cluster_name="existing-cluster", + entrypoint="python script.py", + ) + + rayjob2.submit() + + call_args2 = mock_api_instance.submit_job.call_args + submitted_job2 = call_args2.kwargs["job"] + assert submitted_job2["spec"]["shutdownAfterJobFinishes"] is False + + +def test_rayjob_stop_delete_resubmit_logging(auto_mock_setup, caplog): + """Test logging for stop, delete, and resubmit operations.""" + mock_api_instance = auto_mock_setup["rayjob_api"] + + # Test stop with logging + mock_api_instance.suspend_job.return_value = { + "metadata": {"name": "test-rayjob"}, + "spec": {"suspend": True}, + } + + rayjob = RayJob( + job_name="test-rayjob", + cluster_name="test-cluster", + namespace="test-namespace", + entrypoint="python script.py", + ) + + with caplog.at_level("INFO"): + result = rayjob.stop() + + assert result is True + assert "Successfully stopped the RayJob test-rayjob" in caplog.text + + # Test delete with logging + caplog.clear() + mock_api_instance.delete_job.return_value = True + + with caplog.at_level("INFO"): + result = rayjob.delete() + + assert result is True + assert "Successfully deleted the RayJob test-rayjob" in caplog.text + + # Test resubmit with logging + caplog.clear() + mock_api_instance.resubmit_job.return_value = { + "metadata": {"name": "test-rayjob"}, + "spec": {"suspend": False}, + } + + with caplog.at_level("INFO"): + result = rayjob.resubmit() + + assert result is True + assert "Successfully resubmitted the RayJob test-rayjob" in caplog.text + + +def test_rayjob_initialization_logging(auto_mock_setup, caplog): + """Test RayJob initialization logging.""" + with caplog.at_level("INFO"): + cluster_config = ManagedClusterConfig() + rayjob = RayJob( + job_name="test-job", + cluster_config=cluster_config, + entrypoint="python script.py", + ) + + assert "Creating new cluster: test-job-cluster" in caplog.text + assert "Initialized RayJob: test-job in namespace: test-namespace" in caplog.text diff --git a/src/codeflare_sdk/ray/rayjobs/test_status.py b/src/codeflare_sdk/ray/rayjobs/test_status.py index f3ed7ef8..2f2b9957 100644 --- a/src/codeflare_sdk/ray/rayjobs/test_status.py +++ b/src/codeflare_sdk/ray/rayjobs/test_status.py @@ -302,3 +302,72 @@ def test_rayjob_status_print_job_found(mocker): assert status == CodeflareRayJobStatus.RUNNING assert ready == False + + +def test_rayjob_status_all_deployment_states(mocker): + """Test RayJob status method with all deployment states.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = mock_api_class.return_value + + rayjob = RayJob( + job_name="test-job", + cluster_name="test-cluster", + namespace="test-ns", + entrypoint="python test.py", + ) + + # Test mapping of deployment statuses to CodeflareRayJobStatus + test_cases = [ + # (deployment_status_str, expected CodeflareRayJobStatus, expected ready) + ("Complete", CodeflareRayJobStatus.COMPLETE, True), + ("Running", CodeflareRayJobStatus.RUNNING, False), + ("Failed", CodeflareRayJobStatus.FAILED, False), + ("Suspended", CodeflareRayJobStatus.SUSPENDED, False), + ] + + for deployment_status_str, expected_status, expected_ready in test_cases: + mock_api_instance.get_job_status.return_value = { + "jobId": "test-job-abc123", + "jobDeploymentStatus": deployment_status_str, + "startTime": "2025-07-28T11:37:07Z", + "failed": 0, + "succeeded": 0, + "rayClusterName": "test-cluster", + } + status, ready = rayjob.status(print_to_console=False) + assert status == expected_status, f"Failed for {deployment_status_str}" + assert ( + ready == expected_ready + ), f"Failed ready check for {deployment_status_str}" + + +def test_rayjob_status_with_end_time(mocker): + """Test RayJob status with end time field.""" + mocker.patch("kubernetes.config.load_kube_config") + mock_api_class = mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayjobApi") + mocker.patch("codeflare_sdk.ray.rayjobs.rayjob.RayClusterApi") + mock_api_instance = mock_api_class.return_value + + rayjob = RayJob( + job_name="test-job", + cluster_name="test-cluster", + namespace="test-ns", + entrypoint="python test.py", + ) + + # Test with end time field + mock_api_instance.get_job_status.return_value = { + "jobId": "test-job-abc123", + "jobDeploymentStatus": "Complete", + "startTime": "2025-07-28T11:37:07Z", + "endTime": "2025-07-28T11:47:07Z", + "failed": 0, + "succeeded": 1, + "rayClusterName": "test-cluster", + } + + status, ready = rayjob.status(print_to_console=False) + assert status == CodeflareRayJobStatus.COMPLETE + assert ready == True diff --git a/tests/e2e/cluster_apply_kind_test.py b/tests/e2e/cluster_apply_kind_test.py index 398bf73b..e023e92d 100644 --- a/tests/e2e/cluster_apply_kind_test.py +++ b/tests/e2e/cluster_apply_kind_test.py @@ -1,6 +1,8 @@ from codeflare_sdk import Cluster, ClusterConfiguration import pytest +import time from kubernetes import client +from codeflare_sdk.common.utils import constants from support import ( initialize_kubernetes_client, @@ -38,6 +40,7 @@ def test_cluster_apply(self): worker_cpu_limits="1", worker_memory_requests="1Gi", worker_memory_limits="2Gi", + image=f"rayproject/ray:{constants.RAY_VERSION}", write_to_file=True, verify_tls=False, ) @@ -47,9 +50,9 @@ def test_cluster_apply(self): cluster.apply() # Wait for the cluster to be ready - cluster.wait_ready() - status = cluster.status() - assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" + cluster.wait_ready(dashboard_check=False) + status, ready = cluster.status() + assert ready, f"Cluster {cluster_name} is not ready: {status}" # Verify the cluster is created ray_cluster = get_ray_cluster(cluster_name, namespace) @@ -58,7 +61,7 @@ def test_cluster_apply(self): ray_cluster["spec"]["workerGroupSpecs"][0]["replicas"] == 1 ), "Initial worker count does not match" - # Update configuration with 3 workers + # Update configuration with 2 workers updated_config = ClusterConfiguration( name=cluster_name, namespace=namespace, @@ -71,6 +74,7 @@ def test_cluster_apply(self): worker_cpu_limits="1", worker_memory_requests="1Gi", worker_memory_limits="2Gi", + image=f"rayproject/ray:{constants.RAY_VERSION}", write_to_file=True, verify_tls=False, ) @@ -79,12 +83,15 @@ def test_cluster_apply(self): cluster.config = updated_config cluster.apply() + # Give Kubernetes a moment to process the update + time.sleep(5) + # Wait for the updated cluster to be ready - cluster.wait_ready() - updated_status = cluster.status() - assert updated_status[ - "ready" - ], f"Cluster {cluster_name} is not ready after update: {updated_status}" + cluster.wait_ready(dashboard_check=False) + updated_status, updated_ready = cluster.status() + assert ( + updated_ready + ), f"Cluster {cluster_name} is not ready after update: {updated_status}" # Verify the cluster is updated updated_ray_cluster = get_ray_cluster(cluster_name, namespace) @@ -94,63 +101,19 @@ def test_cluster_apply(self): # Clean up cluster.down() - ray_cluster = get_ray_cluster(cluster_name, namespace) - assert ray_cluster is None, "Cluster was not deleted successfully" - def test_apply_invalid_update(self): - self.setup_method() - create_namespace(self) + # Wait for deletion to complete (finalizers may delay deletion) + max_wait = 30 # seconds + wait_interval = 2 + elapsed = 0 - cluster_name = "test-cluster-apply-invalid" - namespace = self.namespace + while elapsed < max_wait: + ray_cluster = get_ray_cluster(cluster_name, namespace) + if ray_cluster is None: + break + time.sleep(wait_interval) + elapsed += wait_interval - # Initial configuration - initial_config = ClusterConfiguration( - name=cluster_name, - namespace=namespace, - num_workers=1, - head_cpu_requests="500m", - head_cpu_limits="1", - head_memory_requests="1Gi", - head_memory_limits="2Gi", - worker_cpu_requests="500m", - worker_cpu_limits="1", - worker_memory_requests="1Gi", - worker_memory_limits="2Gi", - write_to_file=True, - verify_tls=False, - ) - - # Create the cluster - cluster = Cluster(initial_config) - cluster.apply() - - # Wait for the cluster to be ready - cluster.wait_ready() - status = cluster.status() - assert status["ready"], f"Cluster {cluster_name} is not ready: {status}" - - # Update with an invalid configuration (e.g., immutable field change) - invalid_config = ClusterConfiguration( - name=cluster_name, - namespace=namespace, - num_workers=2, - head_cpu_requests="1", - head_cpu_limits="2", # Changing CPU limits (immutable) - head_memory_requests="1Gi", - head_memory_limits="2Gi", - worker_cpu_requests="500m", - worker_cpu_limits="1", - worker_memory_requests="1Gi", - worker_memory_limits="2Gi", - write_to_file=True, - verify_tls=False, - ) - - # Try to apply the invalid configuration and expect failure - cluster.config = invalid_config - with pytest.raises(RuntimeError, match="Immutable fields detected"): - cluster.apply() - - # Clean up - cluster.down() + assert ( + ray_cluster is None + ), f"Cluster was not deleted successfully after {max_wait}s" diff --git a/tests/e2e/rayjob/existing_cluster_oauth_test.py b/tests/e2e/rayjob/existing_cluster_oauth_test.py deleted file mode 100644 index 5face339..00000000 --- a/tests/e2e/rayjob/existing_cluster_oauth_test.py +++ /dev/null @@ -1,139 +0,0 @@ -import pytest -import sys -import os -from time import sleep - -# Add the parent directory to the path to import support -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from support import * - -from codeflare_sdk import ( - Cluster, - ClusterConfiguration, - TokenAuthentication, -) -from codeflare_sdk.ray.rayjobs import RayJob -from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus - -# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift - - -@pytest.mark.openshift -class TestRayJobExistingClusterOauth: - def setup_method(self): - initialize_kubernetes_client(self) - - def teardown_method(self): - delete_namespace(self) - delete_kueue_resources(self) - - def test_rayjob_against_existing_cluster_oauth(self): - self.setup_method() - create_namespace(self) - create_kueue_resources(self) - self.run_rayjob_against_existing_cluster_oauth() - - def run_rayjob_against_existing_cluster_oauth(self): - ray_image = get_ray_image() - - auth = TokenAuthentication( - token=run_oc_command(["whoami", "--show-token=true"]), - server=run_oc_command(["whoami", "--show-server=true"]), - skip_tls=True, - ) - auth.login() - - cluster_name = "existing-cluster" - - cluster = Cluster( - ClusterConfiguration( - name=cluster_name, - namespace=self.namespace, - num_workers=1, - head_cpu_requests="500m", - head_cpu_limits="500m", - worker_cpu_requests=1, - worker_cpu_limits=1, - worker_memory_requests=1, - worker_memory_limits=4, - image=ray_image, - write_to_file=True, - verify_tls=False, - ) - ) - - cluster.apply() - cluster.status() - cluster.wait_ready() - cluster.status() - cluster.details() - - print(f"Ray cluster '{cluster_name}' is ready!") - - job_name = "existing-cluster-rayjob" - - rayjob = RayJob( - job_name=job_name, - cluster_name=cluster_name, - namespace=self.namespace, - entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"", - runtime_env={ - "pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"], - "env_vars": get_setup_env_variables(ACCELERATOR="cpu"), - }, - shutdown_after_job_finishes=False, - ) - - # Submit the job - print( - f"Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'" - ) - submission_result = rayjob.submit() - assert ( - submission_result == job_name - ), f"Job submission failed, expected {job_name}, got {submission_result}" - print(f"Successfully submitted RayJob '{job_name}'!") - - # Monitor the job status until completion - self.monitor_rayjob_completion(rayjob) - - # Cleanup - manually tear down the cluster since job won't do it - print("๐Ÿงน Cleaning up Ray cluster") - cluster.down() - - def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): - """ - Monitor a RayJob until it completes or fails. - Args: - rayjob: The RayJob instance to monitor - timeout: Maximum time to wait in seconds (default: 5 minutes) - """ - print(f"Monitoring RayJob '{rayjob.name}' status...") - - elapsed_time = 0 - check_interval = 10 # Check every 10 seconds - - while elapsed_time < timeout: - status, ready = rayjob.status(print_to_console=True) - - # Check if job has completed (either successfully or failed) - if status == CodeflareRayJobStatus.COMPLETE: - print(f"RayJob '{rayjob.name}' completed successfully!") - return - elif status == CodeflareRayJobStatus.FAILED: - raise AssertionError(f"RayJob '{rayjob.name}' failed!") - elif status == CodeflareRayJobStatus.RUNNING: - print(f"RayJob '{rayjob.name}' is still running...") - elif status == CodeflareRayJobStatus.UNKNOWN: - print(f"RayJob '{rayjob.name}' status is unknown") - - # Wait before next check - sleep(check_interval) - elapsed_time += check_interval - - # If we reach here, the job has timed out - final_status, _ = rayjob.status(print_to_console=True) - raise TimeoutError( - f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. " - f"Final status: {final_status}" - ) diff --git a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py deleted file mode 100644 index 7db71441..00000000 --- a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py +++ /dev/null @@ -1,148 +0,0 @@ -import pytest -import sys -import os -from time import sleep - -sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) -from support import * - -from codeflare_sdk import RayJob, ManagedClusterConfig -from codeflare_sdk.ray.rayjobs.status import ( - CodeflareRayJobStatus, - RayJobDeploymentStatus, -) -import kubernetes.client.rest -from python_client.kuberay_job_api import RayjobApi -from python_client.kuberay_cluster_api import RayClusterApi - - -@pytest.mark.openshift -class TestRayJobLifecycledClusterOauth: - """Test RayJob with auto-created cluster lifecycle management on OpenShift.""" - - def setup_method(self): - initialize_kubernetes_client(self) - - def teardown_method(self): - delete_namespace(self) - - def test_rayjob_with_lifecycled_cluster_oauth(self): - """ - Test RayJob submission with embedded cluster configuration, including: - 1. Job submission with auto-cluster creation - 2. Job suspension (stop) and verification - 3. Job resumption (resubmit) and verification - 4. Job completion monitoring - 5. Automatic cluster cleanup after job deletion - """ - self.setup_method() - create_namespace(self) - ray_image = get_ray_image() - self.job_api = RayjobApi() - job_name = "lifecycled-job" - - cluster_config = ManagedClusterConfig( - head_cpu_requests="500m", - head_cpu_limits="500m", - head_memory_requests=1, - head_memory_limits=4, - num_workers=1, - worker_cpu_requests="500m", - worker_cpu_limits="500m", - worker_memory_requests=1, - worker_memory_limits=4, - image=ray_image, - ) - - rayjob = RayJob( - job_name=job_name, - namespace=self.namespace, - cluster_config=cluster_config, - entrypoint="python -c \"import ray; ray.init(); print('RayJob completed successfully')\"", - runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, - shutdown_after_job_finishes=True, - ) - - try: - # 1. Submit and wait for job to reach running state - assert rayjob.submit() == job_name - assert self.job_api.wait_until_job_running( - name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300 - ), "Job did not reach running state" - - # 2. Stop (suspend) the job and - assert rayjob.stop(), "Job stop failed" - job_cr = self.job_api.get_job( - name=rayjob.name, k8s_namespace=rayjob.namespace - ) - assert job_cr["spec"]["suspend"] is True, "Job suspend not set to true" - - assert self._wait_for_job_status( - rayjob, "Suspended", timeout=30 - ), "Job did not reach Suspended state" - - # 3. Test Job Resubmission - assert rayjob.resubmit(), "Job resubmit failed" - job_cr = self.job_api.get_job( - name=rayjob.name, k8s_namespace=rayjob.namespace - ) - assert job_cr["spec"]["suspend"] is False, "Job suspend not set to false" - - assert self.job_api.wait_until_job_finished( - name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300 - ), "Job did not complete" - - finally: - # 4. Delete the job and cleanup - assert rayjob.delete() - self.verify_cluster_cleanup(rayjob) - - def _wait_for_job_status( - self, - rayjob: RayJob, - expected_status: str, - timeout: int = 30, - ) -> bool: - """Wait for a job to reach a specific deployment status.""" - elapsed_time = 0 - check_interval = 2 - - while elapsed_time < timeout: - status = self.job_api.get_job_status( - name=rayjob.name, k8s_namespace=rayjob.namespace - ) - if status and status.get("jobDeploymentStatus") == expected_status: - return True - - sleep(check_interval) - elapsed_time += check_interval - - return False - - def verify_cluster_cleanup(self, rayjob: RayJob, timeout: int = 60): - """Verify that the cluster created by the RayJob has been cleaned up.""" - elapsed_time = 0 - check_interval = 5 - cluster_api = RayClusterApi() - - while elapsed_time < timeout: - try: - cluster_info = cluster_api.get_ray_cluster( - name=rayjob.cluster_name, k8s_namespace=rayjob.namespace - ) - # Cluster doesn't exist - if cluster_info is None: - return - - sleep(check_interval) - elapsed_time += check_interval - - except kubernetes.client.rest.ApiException as e: - if e.status == 404: - return - else: - raise e - - raise TimeoutError( - f"Cluster '{rayjob.cluster_name}' was not cleaned up within {timeout} seconds" - ) diff --git a/tests/e2e/rayjob/ray_version_validation_oauth_test.py b/tests/e2e/rayjob/ray_version_validation_oauth_test.py index 68c69aee..794d739a 100644 --- a/tests/e2e/rayjob/ray_version_validation_oauth_test.py +++ b/tests/e2e/rayjob/ray_version_validation_oauth_test.py @@ -7,15 +7,11 @@ from support import * from codeflare_sdk import ( - TokenAuthentication, RayJob, ManagedClusterConfig, ) -# This test validates Ray version compatibility checking for RayJob with cluster lifecycling scenarios - -@pytest.mark.openshift class TestRayJobRayVersionValidationOauth: def setup_method(self): initialize_kubernetes_client(self) @@ -50,12 +46,6 @@ def test_rayjob_lifecycled_cluster_incompatible_ray_version_oauth(self): def run_rayjob_lifecycled_cluster_incompatible_version(self): """Test Ray version validation with cluster lifecycling using incompatible image.""" - auth = TokenAuthentication( - token=run_oc_command(["whoami", "--show-token=true"]), - server=run_oc_command(["whoami", "--show-server=true"]), - skip_tls=True, - ) - auth.login() job_name = "incompatible-lifecycle-rayjob" @@ -76,7 +66,6 @@ def run_rayjob_lifecycled_cluster_incompatible_version(self): cluster_config=cluster_config, namespace=self.namespace, entrypoint="python -c 'print(\"This should not run due to version mismatch\")'", - shutdown_after_job_finishes=True, ttl_seconds_after_finished=30, ) @@ -101,12 +90,6 @@ def test_rayjob_lifecycled_cluster_unknown_ray_version_oauth(self): def run_rayjob_lifecycled_cluster_unknown_version(self): """Test Ray version validation with unknown image (should warn but not fail).""" - auth = TokenAuthentication( - token=run_oc_command(["whoami", "--show-token=true"]), - server=run_oc_command(["whoami", "--show-server=true"]), - skip_tls=True, - ) - auth.login() job_name = "unknown-version-rayjob" @@ -125,7 +108,6 @@ def run_rayjob_lifecycled_cluster_unknown_version(self): cluster_config=cluster_config, namespace=self.namespace, entrypoint="python -c 'print(\"Testing unknown Ray version scenario\")'", - shutdown_after_job_finishes=True, ttl_seconds_after_finished=30, ) diff --git a/tests/e2e/rayjob/rayjob_existing_cluster_test.py b/tests/e2e/rayjob/rayjob_existing_cluster_test.py new file mode 100644 index 00000000..b62ea1ef --- /dev/null +++ b/tests/e2e/rayjob/rayjob_existing_cluster_test.py @@ -0,0 +1,111 @@ +import pytest +import sys +import os +from time import sleep + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from support import * + +from codeflare_sdk import ( + Cluster, + ClusterConfiguration, +) +from codeflare_sdk import RayJob, TokenAuthentication +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus +from python_client.kuberay_job_api import RayjobApi + + +class TestRayJobExistingCluster: + """Test RayJob against existing Kueue-managed clusters.""" + + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_existing_kueue_cluster(self): + """Test RayJob against Kueue-managed RayCluster.""" + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + + cluster_name = "kueue-cluster" + + if is_openshift(): + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + image=get_ray_image(), + local_queue=self.local_queues[0], + write_to_file=True, + verify_tls=False, + ) + ) + + cluster.apply() + sleep(20) + + # RayJob with explicit local_queue + rayjob_explicit = RayJob( + job_name="job-explicit-queue", + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint="python -c \"import ray; ray.init(); print('Job with explicit queue')\"", + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + local_queue=self.local_queues[0], + ) + + # RayJob using default queue + rayjob_default = RayJob( + job_name="job-default-queue", + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint="python -c \"import ray; ray.init(); print('Job with default queue')\"", + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + ) + + try: + # Test RayJob with explicit queue + assert rayjob_explicit.submit() == "job-explicit-queue" + self._wait_completion(rayjob_explicit) + + # Test RayJob with default queue + assert rayjob_default.submit() == "job-default-queue" + self._wait_completion(rayjob_default) + finally: + rayjob_explicit.delete() + rayjob_default.delete() + cluster.down() + + def _wait_completion(self, rayjob: RayJob, timeout: int = 600): + """Wait for RayJob completion.""" + elapsed = 0 + interval = 10 + + while elapsed < timeout: + status, _ = rayjob.status(print_to_console=False) + if status == CodeflareRayJobStatus.COMPLETE: + return + elif status == CodeflareRayJobStatus.FAILED: + raise AssertionError(f"RayJob '{rayjob.name}' failed") + sleep(interval) + elapsed += interval + + raise TimeoutError(f"RayJob '{rayjob.name}' timeout after {timeout}s") diff --git a/tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py b/tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py new file mode 100644 index 00000000..ad489922 --- /dev/null +++ b/tests/e2e/rayjob/rayjob_lifecycled_cluster_test.py @@ -0,0 +1,166 @@ +import pytest +import sys +import os +from time import sleep + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from support import * + +from codeflare_sdk import RayJob, ManagedClusterConfig +import kubernetes.client.rest +from python_client.kuberay_job_api import RayjobApi +from python_client.kuberay_cluster_api import RayClusterApi + + +class TestRayJobLifecycledCluster: + """Test RayJob with auto-created cluster lifecycle management.""" + + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_lifecycled_kueue_managed(self): + """Test RayJob with Kueue-managed lifecycled cluster.""" + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + + self.job_api = RayjobApi() + cluster_api = RayClusterApi() + job_name = "kueue-lifecycled" + + # Get platform-appropriate resource configurations + resources = get_platform_appropriate_resources() + + cluster_config = ManagedClusterConfig( + head_cpu_requests=resources["head_cpu_requests"], + head_cpu_limits=resources["head_cpu_limits"], + head_memory_requests=resources["head_memory_requests"], + head_memory_limits=resources["head_memory_limits"], + num_workers=1, + worker_cpu_requests=resources["worker_cpu_requests"], + worker_cpu_limits=resources["worker_cpu_limits"], + worker_memory_requests=resources["worker_memory_requests"], + worker_memory_limits=resources["worker_memory_limits"], + image=get_ray_image(), + ) + + rayjob = RayJob( + job_name=job_name, + namespace=self.namespace, + cluster_config=cluster_config, + entrypoint="python -c \"import ray; ray.init(); print('Kueue job done')\"", + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + local_queue=self.local_queues[0], + ) + + try: + assert rayjob.submit() == job_name + assert self.job_api.wait_until_job_running( + name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=600 + ) + + assert self.job_api.wait_until_job_finished( + name=rayjob.name, k8s_namespace=rayjob.namespace, timeout=300 + ) + finally: + try: + rayjob.delete() + except Exception: + pass # Job might already be deleted + verify_rayjob_cluster_cleanup(cluster_api, rayjob.name, rayjob.namespace) + + def test_lifecycled_kueue_resource_queueing(self): + """Test Kueue resource queueing with lifecycled clusters.""" + self.setup_method() + create_namespace(self) + create_limited_kueue_resources(self) + + self.job_api = RayjobApi() + cluster_api = RayClusterApi() + + # Get platform-appropriate resource configurations + resources = get_platform_appropriate_resources() + + cluster_config = ManagedClusterConfig( + head_cpu_requests=resources["head_cpu_requests"], + head_cpu_limits=resources["head_cpu_limits"], + head_memory_requests=resources["head_memory_requests"], + head_memory_limits=resources["head_memory_limits"], + num_workers=0, + image=get_ray_image(), + ) + + job1 = None + job2 = None + try: + job1 = RayJob( + job_name="holder", + namespace=self.namespace, + cluster_config=cluster_config, + entrypoint='python -c "import ray; import time; ray.init(); time.sleep(15)"', + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + local_queue=self.local_queues[0], + ) + assert job1.submit() == "holder" + assert self.job_api.wait_until_job_running( + name=job1.name, k8s_namespace=job1.namespace, timeout=60 + ) + + job2 = RayJob( + job_name="waiter", + namespace=self.namespace, + cluster_config=cluster_config, + entrypoint='python -c "import ray; ray.init()"', + runtime_env={"env_vars": get_setup_env_variables(ACCELERATOR="cpu")}, + local_queue=self.local_queues[0], + ) + assert job2.submit() == "waiter" + + # Wait for Kueue to process the job + sleep(5) + job2_cr = self.job_api.get_job(name=job2.name, k8s_namespace=job2.namespace) + + # For RayJobs with managed clusters, check if Kueue is holding resources + job2_status = job2_cr.get("status", {}) + ray_cluster_name = job2_status.get("rayClusterName", "") + + # If RayCluster is not created yet, it means Kueue is holding the job + if not ray_cluster_name: + # This is the expected behavior + job_is_queued = True + else: + # Check RayCluster resources - if all are 0, it's queued + ray_cluster_status = job2_status.get("rayClusterStatus", {}) + desired_cpu = ray_cluster_status.get("desiredCPU", "0") + desired_memory = ray_cluster_status.get("desiredMemory", "0") + + # Kueue creates the RayCluster but with 0 resources when queued + job_is_queued = desired_cpu == "0" and desired_memory == "0" + + assert job_is_queued, "Job2 should be queued by Kueue while Job1 is running" + + assert self.job_api.wait_until_job_finished( + name=job1.name, k8s_namespace=job1.namespace, timeout=60 + ) + + assert wait_for_kueue_admission( + self, self.job_api, job2.name, job2.namespace, timeout=30 + ) + + assert self.job_api.wait_until_job_finished( + name=job2.name, k8s_namespace=job2.namespace, timeout=60 + ) + finally: + for job in [job1, job2]: + if job: + try: + job.delete() + verify_rayjob_cluster_cleanup( + cluster_api, job.name, job.namespace + ) + except: + pass diff --git a/tests/e2e/support.py b/tests/e2e/support.py index 165a680b..0ceffd1c 100644 --- a/tests/e2e/support.py +++ b/tests/e2e/support.py @@ -2,6 +2,7 @@ import random import string import subprocess +from time import sleep from codeflare_sdk import get_cluster from kubernetes import client, config from codeflare_sdk.common.kubernetes_cluster.kube_api_helpers import ( @@ -26,8 +27,82 @@ def get_ray_cluster(cluster_name, namespace): raise +def is_openshift(): + """Detect if running on OpenShift by checking for OpenShift-specific API resources.""" + try: + api = client.ApiClient() + discovery = client.ApisApi(api) + # Check for OpenShift-specific API group + groups = discovery.get_api_versions().groups + for group in groups: + if group.name == "image.openshift.io": + return True + return False + except Exception: + # If we can't determine, assume it's not OpenShift + return False + + def get_ray_image(): - return os.getenv("RAY_IMAGE", constants.CUDA_RUNTIME_IMAGE) + """ + Get appropriate Ray image based on platform (OpenShift vs Kind/vanilla K8s). + + The tests marked with @pytest.mark.openshift can run on both OpenShift and Kind clusters + with Kueue installed. This function automatically selects the appropriate image: + - OpenShift: Uses the CUDA runtime image (quay.io/modh/ray:...) + - Kind/K8s: Uses the standard Ray image (rayproject/ray:VERSION) + + You can override this behavior by setting the RAY_IMAGE environment variable. + """ + # Allow explicit override via environment variable + if "RAY_IMAGE" in os.environ: + return os.environ["RAY_IMAGE"] + + # Auto-detect platform and return appropriate image + if is_openshift(): + return constants.CUDA_RUNTIME_IMAGE + else: + # Use standard Ray image for Kind/vanilla K8s + return f"rayproject/ray:{constants.RAY_VERSION}" + + +def get_platform_appropriate_resources(): + """ + Get appropriate resource configurations based on platform. + + OpenShift with MODH images requires more memory than Kind with standard Ray images. + + Returns: + dict: Resource configurations with keys: + - head_cpu_requests, head_cpu_limits + - head_memory_requests, head_memory_limits + - worker_cpu_requests, worker_cpu_limits + - worker_memory_requests, worker_memory_limits + """ + if is_openshift(): + # MODH runtime images require more memory + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 7, + "head_memory_limits": 8, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 5, + "worker_memory_limits": 6, + } + else: + # Standard Ray images require less memory + return { + "head_cpu_requests": "1", + "head_cpu_limits": "1.5", + "head_memory_requests": 3, + "head_memory_limits": 4, + "worker_cpu_requests": "1", + "worker_cpu_limits": "1", + "worker_memory_requests": 2, + "worker_memory_limits": 3, + } def get_setup_env_variables(**kwargs): @@ -143,6 +218,17 @@ def run_oc_command(args): return None +def run_kubectl_command(args): + try: + result = subprocess.run( + ["kubectl"] + args, capture_output=True, text=True, check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + print(f"Error executing 'kubectl {' '.join(args)}': {e}") + return None + + def create_cluster_queue(self, cluster_queue, flavor): cluster_queue_json = { "apiVersion": "kueue.x-k8s.io/v1beta1", @@ -297,7 +383,6 @@ def create_kueue_resources( def delete_kueue_resources(self): - # Delete if given cluster-queue exists for cq in self.cluster_queues: try: self.custom_api.delete_cluster_custom_object( @@ -405,3 +490,234 @@ def assert_get_cluster_and_jobsubmit( assert job_list[0].submission_id == submission_id cluster.down() + + +def wait_for_kueue_admission(self, job_api, job_name, namespace, timeout=120): + print(f"Waiting for Kueue admission of job '{job_name}'...") + elapsed_time = 0 + check_interval = 5 + + while elapsed_time < timeout: + try: + job_cr = job_api.get_job(name=job_name, k8s_namespace=namespace) + + # Check if the job is no longer suspended + is_suspended = job_cr.get("spec", {}).get("suspend", False) + + if not is_suspended: + print(f"โœ“ Job '{job_name}' admitted by Kueue (no longer suspended)") + return True + + # Debug: Check workload status every 10 seconds + if elapsed_time % 10 == 0: + workload = get_kueue_workload_for_job(self, job_name, namespace) + if workload: + conditions = workload.get("status", {}).get("conditions", []) + print(f" DEBUG: Workload conditions for '{job_name}':") + for condition in conditions: + print( + f" - {condition.get('type')}: {condition.get('status')} - {condition.get('reason', '')} - {condition.get('message', '')}" + ) + + # Optional: Check status conditions for more details + conditions = job_cr.get("status", {}).get("conditions", []) + for condition in conditions: + if ( + condition.get("type") == "Suspended" + and condition.get("status") == "False" + ): + print( + f"โœ“ Job '{job_name}' admitted by Kueue (Suspended=False condition)" + ) + return True + + except Exception as e: + print(f"Error checking job status: {e}") + + sleep(check_interval) + elapsed_time += check_interval + + print(f"โœ— Timeout waiting for Kueue admission of job '{job_name}'") + return False + + +def create_limited_kueue_resources(self): + print("Creating limited Kueue resources for preemption testing...") + + # Create a resource flavor with default (no special labels/tolerations) + resource_flavor = f"limited-flavor-{random_choice()}" + create_resource_flavor( + self, resource_flavor, default=True, with_labels=False, with_tolerations=False + ) + self.resource_flavors = [resource_flavor] + + # Create a cluster queue with very limited resources + # Adjust quota based on platform - OpenShift needs more memory + if is_openshift(): + # MODH images need more memory, so higher quota but still limited to allow only 1 job + cpu_quota = 3 + memory_quota = "8Gi" # Reduced from 15Gi to ensure second job is suspended (1 job needs 7Gi) + else: + # Standard Ray images need less memory + cpu_quota = 3 + memory_quota = "4Gi" + + cluster_queue_name = f"limited-cq-{random_choice()}" + cluster_queue_json = { + "apiVersion": "kueue.x-k8s.io/v1beta1", + "kind": "ClusterQueue", + "metadata": {"name": cluster_queue_name}, + "spec": { + "namespaceSelector": {}, + "resourceGroups": [ + { + "coveredResources": ["cpu", "memory"], + "flavors": [ + { + "name": resource_flavor, + "resources": [ + { + "name": "cpu", + "nominalQuota": cpu_quota, + }, + { + "name": "memory", + "nominalQuota": memory_quota, + }, + ], + } + ], + } + ], + }, + } + + try: + self.custom_api.create_cluster_custom_object( + group="kueue.x-k8s.io", + plural="clusterqueues", + version="v1beta1", + body=cluster_queue_json, + ) + print(f"โœ“ Created limited ClusterQueue: {cluster_queue_name}") + except Exception as e: + print(f"Error creating limited ClusterQueue: {e}") + raise + + self.cluster_queues = [cluster_queue_name] + + # Create a local queue + local_queue_name = f"limited-lq-{random_choice()}" + create_local_queue(self, cluster_queue_name, local_queue_name, is_default=True) + self.local_queues = [local_queue_name] + + print("โœ“ Limited Kueue resources created successfully") + + +def get_kueue_workload_for_job(self, job_name, namespace): + try: + # List all workloads in the namespace + workloads = self.custom_api.list_namespaced_custom_object( + group="kueue.x-k8s.io", + version="v1beta1", + plural="workloads", + namespace=namespace, + ) + + # Find workload with matching RayJob owner reference + for workload in workloads.get("items", []): + owner_refs = workload.get("metadata", {}).get("ownerReferences", []) + + for owner_ref in owner_refs: + if ( + owner_ref.get("kind") == "RayJob" + and owner_ref.get("name") == job_name + ): + workload_name = workload.get("metadata", {}).get("name") + print( + f"โœ“ Found Kueue workload '{workload_name}' for RayJob '{job_name}'" + ) + return workload + + print(f"โœ— No Kueue workload found for RayJob '{job_name}'") + return None + + except Exception as e: + print(f"Error getting Kueue workload for job '{job_name}': {e}") + return None + + +def wait_for_job_status( + job_api, rayjob_name: str, namespace: str, expected_status: str, timeout: int = 30 +) -> bool: + """ + Wait for a RayJob to reach a specific deployment status. + + Args: + job_api: RayjobApi instance + rayjob_name: Name of the RayJob + namespace: Namespace of the RayJob + expected_status: Expected jobDeploymentStatus value + timeout: Maximum time to wait in seconds + + Returns: + bool: True if status reached, False if timeout + """ + elapsed_time = 0 + check_interval = 2 + + while elapsed_time < timeout: + status = job_api.get_job_status(name=rayjob_name, k8s_namespace=namespace) + if status and status.get("jobDeploymentStatus") == expected_status: + return True + + sleep(check_interval) + elapsed_time += check_interval + + return False + + +def verify_rayjob_cluster_cleanup( + cluster_api, rayjob_name: str, namespace: str, timeout: int = 60 +): + """ + Verify that the RayCluster created by a RayJob has been cleaned up. + Handles KubeRay's automatic suffix addition to cluster names. + + Args: + cluster_api: RayClusterApi instance + rayjob_name: Name of the RayJob + namespace: Namespace to check + timeout: Maximum time to wait in seconds + + Raises: + TimeoutError: If cluster is not cleaned up within timeout + """ + elapsed_time = 0 + check_interval = 5 + + while elapsed_time < timeout: + # List all RayClusters in the namespace + clusters = cluster_api.list_ray_clusters( + k8s_namespace=namespace, async_req=False + ) + + # Check if any cluster exists that starts with our job name + found = False + for cluster in clusters.get("items", []): + cluster_name = cluster.get("metadata", {}).get("name", "") + # KubeRay creates clusters with pattern: {job_name}-raycluster-{suffix} + if cluster_name.startswith(f"{rayjob_name}-raycluster"): + found = True + break + + if not found: + # No cluster found, cleanup successful + return + + sleep(check_interval) + elapsed_time += check_interval + + raise TimeoutError( + f"RayCluster for job '{rayjob_name}' was not cleaned up within {timeout} seconds" + ) diff --git a/tests/test_cluster_yamls/appwrapper/unit-test-all-params.yaml b/tests/test_cluster_yamls/appwrapper/unit-test-all-params.yaml index 3d710bfc..fe07e331 100644 --- a/tests/test_cluster_yamls/appwrapper/unit-test-all-params.yaml +++ b/tests/test_cluster_yamls/appwrapper/unit-test-all-params.yaml @@ -39,7 +39,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '1' resources: '"{\"TPU\": 2}"' serviceType: ClusterIP diff --git a/tests/test_cluster_yamls/kueue/aw_kueue.yaml b/tests/test_cluster_yamls/kueue/aw_kueue.yaml index 7f72d25b..92e5078d 100644 --- a/tests/test_cluster_yamls/kueue/aw_kueue.yaml +++ b/tests/test_cluster_yamls/kueue/aw_kueue.yaml @@ -33,7 +33,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP diff --git a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml index 7a5a62ba..04331aed 100644 --- a/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml +++ b/tests/test_cluster_yamls/kueue/ray_cluster_kueue.yaml @@ -33,7 +33,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP diff --git a/tests/test_cluster_yamls/ray/default-appwrapper.yaml b/tests/test_cluster_yamls/ray/default-appwrapper.yaml index 734f3d33..1041f3b5 100644 --- a/tests/test_cluster_yamls/ray/default-appwrapper.yaml +++ b/tests/test_cluster_yamls/ray/default-appwrapper.yaml @@ -31,7 +31,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP diff --git a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml index cc5f2ada..213b22cf 100644 --- a/tests/test_cluster_yamls/ray/default-ray-cluster.yaml +++ b/tests/test_cluster_yamls/ray/default-ray-cluster.yaml @@ -23,7 +23,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '0' resources: '"{}"' serviceType: ClusterIP diff --git a/tests/test_cluster_yamls/ray/unit-test-all-params.yaml b/tests/test_cluster_yamls/ray/unit-test-all-params.yaml index 213a082a..7c7d82d6 100644 --- a/tests/test_cluster_yamls/ray/unit-test-all-params.yaml +++ b/tests/test_cluster_yamls/ray/unit-test-all-params.yaml @@ -30,7 +30,6 @@ spec: rayStartParams: block: 'true' dashboard-host: 0.0.0.0 - dashboard-port: '8265' num-gpus: '1' resources: '"{\"TPU\": 2}"' serviceType: ClusterIP