diff --git a/tests/e2e/rayjob/__init__.py b/tests/e2e/rayjob/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/rayjob/existing_cluster_oauth_test.py b/tests/e2e/rayjob/existing_cluster_oauth_test.py new file mode 100644 index 00000000..5face339 --- /dev/null +++ b/tests/e2e/rayjob/existing_cluster_oauth_test.py @@ -0,0 +1,139 @@ +import pytest +import sys +import os +from time import sleep + +# Add the parent directory to the path to import support +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from support import * + +from codeflare_sdk import ( + Cluster, + ClusterConfiguration, + TokenAuthentication, +) +from codeflare_sdk.ray.rayjobs import RayJob +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus + +# This test creates a Ray Cluster and then submits a RayJob against the existing cluster on OpenShift + + +@pytest.mark.openshift +class TestRayJobExistingClusterOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_rayjob_against_existing_cluster_oauth(self): + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_against_existing_cluster_oauth() + + def run_rayjob_against_existing_cluster_oauth(self): + ray_image = get_ray_image() + + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + cluster_name = "existing-cluster" + + cluster = Cluster( + ClusterConfiguration( + name=cluster_name, + namespace=self.namespace, + num_workers=1, + head_cpu_requests="500m", + head_cpu_limits="500m", + worker_cpu_requests=1, + worker_cpu_limits=1, + worker_memory_requests=1, + worker_memory_limits=4, + image=ray_image, + write_to_file=True, + verify_tls=False, + ) + ) + + cluster.apply() + cluster.status() + cluster.wait_ready() + cluster.status() + cluster.details() + + print(f"Ray cluster '{cluster_name}' is ready!") + + job_name = "existing-cluster-rayjob" + + rayjob = RayJob( + job_name=job_name, + cluster_name=cluster_name, + namespace=self.namespace, + entrypoint="python -c \"import ray; ray.init(); print('Hello from RayJob!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"", + runtime_env={ + "pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"], + "env_vars": get_setup_env_variables(ACCELERATOR="cpu"), + }, + shutdown_after_job_finishes=False, + ) + + # Submit the job + print( + f"Submitting RayJob '{job_name}' against existing cluster '{cluster_name}'" + ) + submission_result = rayjob.submit() + assert ( + submission_result == job_name + ), f"Job submission failed, expected {job_name}, got {submission_result}" + print(f"Successfully submitted RayJob '{job_name}'!") + + # Monitor the job status until completion + self.monitor_rayjob_completion(rayjob) + + # Cleanup - manually tear down the cluster since job won't do it + print("๐Ÿงน Cleaning up Ray cluster") + cluster.down() + + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): + """ + Monitor a RayJob until it completes or fails. + Args: + rayjob: The RayJob instance to monitor + timeout: Maximum time to wait in seconds (default: 5 minutes) + """ + print(f"Monitoring RayJob '{rayjob.name}' status...") + + elapsed_time = 0 + check_interval = 10 # Check every 10 seconds + + while elapsed_time < timeout: + status, ready = rayjob.status(print_to_console=True) + + # Check if job has completed (either successfully or failed) + if status == CodeflareRayJobStatus.COMPLETE: + print(f"RayJob '{rayjob.name}' completed successfully!") + return + elif status == CodeflareRayJobStatus.FAILED: + raise AssertionError(f"RayJob '{rayjob.name}' failed!") + elif status == CodeflareRayJobStatus.RUNNING: + print(f"RayJob '{rayjob.name}' is still running...") + elif status == CodeflareRayJobStatus.UNKNOWN: + print(f"RayJob '{rayjob.name}' status is unknown") + + # Wait before next check + sleep(check_interval) + elapsed_time += check_interval + + # If we reach here, the job has timed out + final_status, _ = rayjob.status(print_to_console=True) + raise TimeoutError( + f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. " + f"Final status: {final_status}" + ) diff --git a/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py new file mode 100644 index 00000000..54186de3 --- /dev/null +++ b/tests/e2e/rayjob/lifecycled_cluster_oauth_test.py @@ -0,0 +1,170 @@ +import pytest +import sys +import os +from time import sleep + +# Add the parent directory to the path to import support +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from support import * + +from codeflare_sdk import ( + TokenAuthentication, + RayJob, + ManagedClusterConfig, +) +from codeflare_sdk.ray.rayjobs.status import CodeflareRayJobStatus + +# This test creates a RayJob that will create and lifecycle its own cluster on OpenShift + + +@pytest.mark.openshift +class TestRayJobLifecycledClusterOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def test_rayjob_with_lifecycled_cluster_oauth(self): + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_with_lifecycled_cluster_oauth() + + def run_rayjob_with_lifecycled_cluster_oauth(self): + ray_image = get_ray_image() + + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + job_name = "lifecycled-cluster-rayjob" + + # Create cluster configuration for auto-creation + cluster_config = ManagedClusterConfig( + head_cpu_requests="500m", + head_cpu_limits="500m", + head_memory_requests=1, + head_memory_limits=4, + num_workers=1, + worker_cpu_requests="500m", + worker_cpu_limits="500m", + worker_memory_requests=1, + worker_memory_limits=4, + image=ray_image, + ) + + # Create RayJob with embedded cluster - will auto-create and manage cluster lifecycle + rayjob = RayJob( + job_name=job_name, + cluster_config=cluster_config, # This triggers auto-cluster creation + namespace=self.namespace, + entrypoint="python -c \"import ray; ray.init(); print('Hello from auto-created cluster!'); print(f'Ray version: {ray.__version__}'); import time; time.sleep(30); print('RayJob completed successfully!')\"", + runtime_env={ + "pip": ["torch", "pytorch-lightning", "torchmetrics", "torchvision"], + "env_vars": get_setup_env_variables(ACCELERATOR="cpu"), + }, + shutdown_after_job_finishes=True, # Auto-cleanup cluster after job finishes + ttl_seconds_after_finished=30, # Wait 30s after job completion before cleanup + ) + + # Submit the job + print( + f"Submitting RayJob '{job_name}' with auto-cluster creation and lifecycle management" + ) + submission_result = rayjob.submit() + assert ( + submission_result == job_name + ), f"Job submission failed, expected {job_name}, got {submission_result}" + print( + f"Successfully submitted RayJob '{job_name}' with cluster '{rayjob.cluster_name}'!" + ) + + # Monitor the job status until completion + self.monitor_rayjob_completion(rayjob) + + # Verify cluster auto-cleanup + print("๐Ÿ” Verifying cluster auto-cleanup after job completion...") + self.verify_cluster_cleanup(rayjob.cluster_name, timeout=60) + + def monitor_rayjob_completion(self, rayjob: RayJob, timeout: int = 900): + """ + Monitor a RayJob until it completes or fails. + Args: + rayjob: The RayJob instance to monitor + timeout: Maximum time to wait in seconds (default: 15 minutes) + """ + print(f"Monitoring RayJob '{rayjob.name}' status...") + + elapsed_time = 0 + check_interval = 10 # Check every 10 seconds + + while elapsed_time < timeout: + status, ready = rayjob.status(print_to_console=True) + + # Check if job has completed (either successfully or failed) + if status == CodeflareRayJobStatus.COMPLETE: + print(f"RayJob '{rayjob.name}' completed successfully!") + return + elif status == CodeflareRayJobStatus.FAILED: + raise AssertionError(f"RayJob '{rayjob.name}' failed!") + elif status == CodeflareRayJobStatus.RUNNING: + print(f"RayJob '{rayjob.name}' is still running...") + elif status == CodeflareRayJobStatus.UNKNOWN: + print(f"RayJob '{rayjob.name}' status is unknown") + + # Wait before next check + sleep(check_interval) + elapsed_time += check_interval + + # If we reach here, the job has timed out + final_status, _ = rayjob.status(print_to_console=True) + raise TimeoutError( + f"RayJob '{rayjob.name}' did not complete within {timeout} seconds. " + f"Final status: {final_status}" + ) + + def verify_cluster_cleanup(self, cluster_name: str, timeout: int = 60): + """ + Verify that the cluster created by the RayJob has been cleaned up. + Args: + cluster_name: The name of the cluster to check for cleanup + timeout: Maximum time to wait for cleanup in seconds (default: 1 minute) + """ + from kubernetes import client + import kubernetes.client.rest + + elapsed_time = 0 + check_interval = 5 # Check every 5 seconds + + while elapsed_time < timeout: + try: + # Try to get the RayCluster resource + custom_api = client.CustomObjectsApi() + custom_api.get_namespaced_custom_object( + group="ray.io", + version="v1", + namespace=self.namespace, + plural="rayclusters", + name=cluster_name, + ) + print(f"Cluster '{cluster_name}' still exists, waiting for cleanup...") + sleep(check_interval) + elapsed_time += check_interval + except kubernetes.client.rest.ApiException as e: + if e.status == 404: + print( + f"โœ… Cluster '{cluster_name}' has been successfully cleaned up!" + ) + return + else: + raise e + + # If we reach here, the cluster was not cleaned up in time + raise TimeoutError( + f"Cluster '{cluster_name}' was not cleaned up within {timeout} seconds" + ) diff --git a/tests/e2e/rayjob/ray_version_validation_oauth_test.py b/tests/e2e/rayjob/ray_version_validation_oauth_test.py new file mode 100644 index 00000000..68c69aee --- /dev/null +++ b/tests/e2e/rayjob/ray_version_validation_oauth_test.py @@ -0,0 +1,145 @@ +import pytest +import sys +import os + +# Add the parent directory to the path to import support +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from support import * + +from codeflare_sdk import ( + TokenAuthentication, + RayJob, + ManagedClusterConfig, +) + +# This test validates Ray version compatibility checking for RayJob with cluster lifecycling scenarios + + +@pytest.mark.openshift +class TestRayJobRayVersionValidationOauth: + def setup_method(self): + initialize_kubernetes_client(self) + + def teardown_method(self): + delete_namespace(self) + delete_kueue_resources(self) + + def _create_basic_managed_cluster_config( + self, ray_image: str + ) -> ManagedClusterConfig: + """Helper method to create basic managed cluster configuration.""" + return ManagedClusterConfig( + head_cpu_requests="500m", + head_cpu_limits="500m", + head_memory_requests=1, + head_memory_limits=2, + num_workers=1, + worker_cpu_requests="500m", + worker_cpu_limits="500m", + worker_memory_requests=1, + worker_memory_limits=2, + image=ray_image, + ) + + def test_rayjob_lifecycled_cluster_incompatible_ray_version_oauth(self): + """Test that RayJob creation fails when cluster config specifies incompatible Ray version.""" + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_lifecycled_cluster_incompatible_version() + + def run_rayjob_lifecycled_cluster_incompatible_version(self): + """Test Ray version validation with cluster lifecycling using incompatible image.""" + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + job_name = "incompatible-lifecycle-rayjob" + + # Create cluster configuration with incompatible Ray version (2.46.1 instead of expected 2.47.1) + incompatible_ray_image = "quay.io/modh/ray:2.46.1-py311-cu121" + + print( + f"Creating RayJob with incompatible Ray image in cluster config: {incompatible_ray_image}" + ) + + cluster_config = self._create_basic_managed_cluster_config( + incompatible_ray_image + ) + + # Create RayJob with incompatible cluster config - this should fail during submission + rayjob = RayJob( + job_name=job_name, + cluster_config=cluster_config, + namespace=self.namespace, + entrypoint="python -c 'print(\"This should not run due to version mismatch\")'", + shutdown_after_job_finishes=True, + ttl_seconds_after_finished=30, + ) + + print( + f"Attempting to submit RayJob '{job_name}' with incompatible Ray version..." + ) + + # This should fail during submission due to Ray version validation + with pytest.raises(ValueError, match="Ray version mismatch detected"): + rayjob.submit() + + print( + "โœ… Ray version validation correctly prevented RayJob submission with incompatible cluster config!" + ) + + def test_rayjob_lifecycled_cluster_unknown_ray_version_oauth(self): + """Test that RayJob creation succeeds with warning when Ray version cannot be determined.""" + self.setup_method() + create_namespace(self) + create_kueue_resources(self) + self.run_rayjob_lifecycled_cluster_unknown_version() + + def run_rayjob_lifecycled_cluster_unknown_version(self): + """Test Ray version validation with unknown image (should warn but not fail).""" + auth = TokenAuthentication( + token=run_oc_command(["whoami", "--show-token=true"]), + server=run_oc_command(["whoami", "--show-server=true"]), + skip_tls=True, + ) + auth.login() + + job_name = "unknown-version-rayjob" + + # Use an image where Ray version cannot be determined (SHA digest) + unknown_ray_image = "quay.io/modh/ray@sha256:6d076aeb38ab3c34a6a2ef0f58dc667089aa15826fa08a73273c629333e12f1e" + + print( + f"Creating RayJob with image where Ray version cannot be determined: {unknown_ray_image}" + ) + + cluster_config = self._create_basic_managed_cluster_config(unknown_ray_image) + + # Create RayJob with unknown version image - this should succeed with warning + rayjob = RayJob( + job_name=job_name, + cluster_config=cluster_config, + namespace=self.namespace, + entrypoint="python -c 'print(\"Testing unknown Ray version scenario\")'", + shutdown_after_job_finishes=True, + ttl_seconds_after_finished=30, + ) + + print(f"Attempting to submit RayJob '{job_name}' with unknown Ray version...") + + # This should succeed but with a warning + with pytest.warns(UserWarning, match="Cannot determine Ray version"): + submission_result = rayjob.submit() + + assert ( + submission_result == job_name + ), f"Job submission failed, expected {job_name}, got {submission_result}" + + print("โœ… RayJob submission succeeded with warning for unknown Ray version!") + print( + f"Note: RayJob '{job_name}' was submitted successfully but may need manual cleanup." + )