Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion .github/workflows/e2e_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,74 @@ jobs:
- name: Install NVidia GPU operator for KinD
uses: ./common/github-actions/nvidia-gpu-operator

- name: Verify GPU availability in KinD
run: |
echo "Checking for available GPUs in the KinD cluster..."

# Wait for GPU operator pods to be ready (with timeout)
echo "Waiting for GPU operator pods to be ready..."
TIMEOUT=300 # 5 minutes timeout
END=$((SECONDS + TIMEOUT))

while [ $SECONDS -lt $END ]; do
# Get total number of pods in the namespace
TOTAL_PODS=$(kubectl get pods -n gpu-operator --no-headers | wc -l)

# Count pods that are either running and ready or completed successfully
# Exclude pods that are still initializing
READY_PODS=$(kubectl get pods -n gpu-operator --no-headers | grep -E 'Running|Completed' | grep -v 'PodInitializing' | wc -l)

if [ "$READY_PODS" -eq "$TOTAL_PODS" ] && [ "$TOTAL_PODS" -gt 0 ]; then
echo "All GPU operator pods are ready or completed successfully!"
break
fi

echo "Waiting for GPU operator pods to be ready... ($READY_PODS/$TOTAL_PODS)"
echo "Pod status:"
kubectl get pods -n gpu-operator
sleep 10
done

if [ $SECONDS -ge $END ]; then
echo "::error::Timeout waiting for GPU operator pods to be ready"
echo "GPU operator pod status:"
kubectl get pods -n gpu-operator -o wide
echo "GPU operator pod logs:"
kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator
echo "GPU operator pod events:"
kubectl get events -n gpu-operator
exit 1
fi

echo "Node details:"
kubectl describe nodes | grep -E 'nvidia.com/gpu|Allocatable:|Capacity:|Name:'

# Check if GPU operator has labeled nodes
GPU_LABELS=$(kubectl describe nodes | grep -c "nvidia.com/gpu")
if [ "$GPU_LABELS" -eq 0 ]; then
echo "::error::No NVIDIA GPU labels found on nodes. GPU operator may not be running correctly."
echo "Full node descriptions for debugging:"
kubectl describe nodes
exit 1
fi

# Check if GPUs are actually allocatable
GPU_ALLOCATABLE=$(kubectl get nodes -o jsonpath='{.items[*].status.allocatable.nvidia\.com/gpu}' | tr ' ' '\n' | grep -v '^$' | wc -l)
if [ "$GPU_ALLOCATABLE" -eq 0 ]; then
echo "::error::GPU operator is running but no GPUs are allocatable. Check GPU operator logs."
echo "Checking GPU operator pods:"
kubectl get pods -n gpu-operator -o wide
echo "GPU operator pod logs:"
kubectl logs -n gpu-operator -l app.kubernetes.io/name=gpu-operator
echo "GPU operator pod events:"
kubectl get events -n gpu-operator
echo "GPU operator pod descriptions:"
kubectl describe pods -n gpu-operator
exit 1
fi

echo "Successfully found $GPU_ALLOCATABLE allocatable GPU(s) in the cluster."

- name: Deploy CodeFlare stack
id: deploy
run: |
Expand Down Expand Up @@ -117,7 +185,7 @@ jobs:
pip install poetry
poetry install --with test,docs
echo "Running e2e tests..."
poetry run pytest -v -s ./tests/e2e -m 'kind and nvidia_gpu' > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
poetry run pytest -v -s --log-cli-level=INFO ./tests/e2e/local_interactive_sdk_kind_test.py > ${CODEFLARE_TEST_OUTPUT_DIR}/pytest_output.log 2>&1
env:
GRPC_DNS_RESOLVER: "native"

Expand Down
9 changes: 9 additions & 0 deletions src/codeflare_sdk/common/utils/generate_cert.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,18 @@ def export_env(cluster_name, namespace):
- RAY_TLS_SERVER_CERT: Path to the TLS server certificate.
- RAY_TLS_SERVER_KEY: Path to the TLS server private key.
- RAY_TLS_CA_CERT: Path to the CA certificate.
- RAY_CLIENT_SKIP_TLS_VERIFY: Skips TLS verification by the client.
"""
tls_dir = os.path.join(os.getcwd(), f"tls-{cluster_name}-{namespace}")
os.environ["RAY_USE_TLS"] = "1"
os.environ["RAY_TLS_SERVER_CERT"] = os.path.join(tls_dir, "tls.crt")
os.environ["RAY_TLS_SERVER_KEY"] = os.path.join(tls_dir, "tls.key")
os.environ["RAY_TLS_CA_CERT"] = os.path.join(tls_dir, "ca.crt")
os.environ["RAY_CLIENT_SKIP_TLS_VERIFY"] = "1" # Skip verification for E2E

# Optional: Add print statements here if you still want to log them for verification
print(f"generate_cert.export_env: RAY_USE_TLS set to: {os.environ.get('RAY_USE_TLS')}")
print(f"generate_cert.export_env: RAY_TLS_CA_CERT set to: {os.environ.get('RAY_TLS_CA_CERT')}")
print(f"generate_cert.export_env: RAY_TLS_SERVER_CERT is: {os.environ.get('RAY_TLS_SERVER_CERT')}")
print(f"generate_cert.export_env: RAY_TLS_SERVER_KEY is: {os.environ.get('RAY_TLS_SERVER_KEY')}")
print(f"generate_cert.export_env: RAY_CLIENT_SKIP_TLS_VERIFY is: {os.environ.get('RAY_CLIENT_SKIP_TLS_VERIFY')}")
87 changes: 82 additions & 5 deletions tests/e2e/local_interactive_sdk_kind_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,54 @@
import pytest
import ray
import math
import logging
import time
import os

from support import *

# Configure logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


@pytest.mark.kind
class TestRayLocalInteractiveOauth:
def setup_method(self):
logger.info("Setting up test environment...")
initialize_kubernetes_client(self)
logger.info("Kubernetes client initialized")

def teardown_method(self):
logger.info("Cleaning up test environment...")
delete_namespace(self)
delete_kueue_resources(self)
logger.info("Cleanup completed")

def test_local_interactives(self):
logger.info("Starting test_local_interactives...")
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_local_interactives()
logger.info("test_local_interactives completed")

@pytest.mark.nvidia_gpu
def test_local_interactives_nvidia_gpu(self):
logger.info("Starting test_local_interactives_nvidia_gpu...")
self.setup_method()
create_namespace(self)
create_kueue_resources(self)
self.run_local_interactives(number_of_gpus=1)
logger.info("test_local_interactives_nvidia_gpu completed")

def run_local_interactives(
self, gpu_resource_name="nvidia.com/gpu", number_of_gpus=0
):
cluster_name = "test-ray-cluster-li"
logger.info(f"Starting run_local_interactives with {number_of_gpus} GPUs")

logger.info("Creating cluster configuration...")
cluster = Cluster(
ClusterConfiguration(
name=cluster_name,
Expand All @@ -57,37 +74,97 @@ def run_local_interactives(
verify_tls=False,
)
)
logger.info("Cluster configuration created")

logger.info("Starting cluster deployment...")
cluster.up()
logger.info("Cluster deployment initiated")

logger.info("Waiting for cluster to be ready...")
cluster.wait_ready()
logger.info("Cluster is ready")

logger.info("Generating TLS certificates...")
generate_cert.generate_tls_cert(cluster_name, self.namespace)
logger.info("TLS certificates generated")

logger.info("Exporting environment variables...")
generate_cert.export_env(cluster_name, self.namespace)
logger.info("Environment variables exported")

client_url = cluster.local_client_url()
logger.info(f"Ray client URL: {client_url}")

print(cluster.local_client_url())
logger.info("Checking cluster status...")
status = cluster.status()
logger.info(f"Cluster status: {status}")

logger.info("Checking cluster dashboard URI...")
dashboard_uri = cluster.cluster_dashboard_uri()
logger.info(f"Dashboard URI: {dashboard_uri}")

logger.info("Checking cluster URI...")
cluster_uri = cluster.cluster_uri()
logger.info(f"Cluster URI: {cluster_uri}")

logger.info("Shutting down any existing Ray connections...")
ray.shutdown()
ray.init(address=cluster.local_client_url(), logging_level="DEBUG")
logger.info("Ray shutdown completed")

logger.info("Initializing Ray connection...")
try:
ray.init(address=client_url, logging_level="DEBUG")
logger.info("Ray initialization successful")
except Exception as e:
logger.error(f"Ray initialization failed: {str(e)}")
logger.error(f"Error type: {type(e)}")
raise

logger.info("Defining Ray remote functions...")

@ray.remote(num_gpus=number_of_gpus / 2)
def heavy_calculation_part(num_iterations):
logger.info(
f"Starting heavy_calculation_part with {num_iterations} iterations"
)
result = 0.0
for i in range(num_iterations):
for j in range(num_iterations):
for k in range(num_iterations):
result += math.sin(i) * math.cos(j) * math.tan(k)
logger.info("heavy_calculation_part completed")
return result

@ray.remote(num_gpus=number_of_gpus / 2)
def heavy_calculation(num_iterations):
logger.info(f"Starting heavy_calculation with {num_iterations} iterations")
results = ray.get(
[heavy_calculation_part.remote(num_iterations // 30) for _ in range(30)]
)
logger.info("heavy_calculation completed")
return sum(results)

logger.info("Submitting calculation task...")
ref = heavy_calculation.remote(3000)
result = ray.get(ref)
assert result == 1789.4644387076714
ray.cancel(ref)
logger.info("Task submitted, waiting for result...")

try:
result = ray.get(ref)
logger.info(f"Calculation completed with result: {result}")
assert result == 1789.4644387076714
logger.info("Result assertion passed")
except Exception as e:
logger.error(f"Error during calculation: {str(e)}")
raise
finally:
logger.info("Cancelling task reference...")
ray.cancel(ref)
logger.info("Task cancelled")

logger.info("Shutting down Ray...")
ray.shutdown()
logger.info("Ray shutdown completed")

logger.info("Tearing down cluster...")
cluster.down()
logger.info("Cluster teardown completed")
Loading