diff --git a/tests/trainer/kubeflow_sdk_test.go b/tests/trainer/kubeflow_sdk_test.go index dd743ed1e..659dcff27 100644 --- a/tests/trainer/kubeflow_sdk_test.go +++ b/tests/trainer/kubeflow_sdk_test.go @@ -88,3 +88,35 @@ func TestRhaiFeaturesRocm(t *testing.T) { Tags(t, KftoRocm, MultiNodeGpu(2, support.AMD)) sdktests.RunRhaiFeaturesAllTest(t, support.AMD) } + +// Multi-GPU CUDA tests - 2 nodes, 2 GPUs each (requires 4 total NVIDIA GPUs) +func TestRhaiTrainingProgressionMultiGpuCuda(t *testing.T) { + Tags(t, KftoCuda, MultiNodeMultiGpu(2, support.NVIDIA, 2)) + sdktests.RunRhaiFeaturesProgressionMultiGpuTest(t, support.NVIDIA, 2, 2) +} + +func TestRhaiJitCheckpointingMultiGpuCuda(t *testing.T) { + Tags(t, KftoCuda, MultiNodeMultiGpu(2, support.NVIDIA, 2)) + sdktests.RunRhaiFeaturesCheckpointMultiGpuTest(t, support.NVIDIA, 2, 2) +} + +func TestRhaiFeaturesMultiGpuCuda(t *testing.T) { + Tags(t, KftoCuda, MultiNodeMultiGpu(2, support.NVIDIA, 2)) + sdktests.RunRhaiFeaturesAllMultiGpuTest(t, support.NVIDIA, 2, 2) +} + +// Multi-GPU ROCm tests - 2 nodes, 2 GPUs each (requires 4 total AMD GPUs) +func TestRhaiTrainingProgressionMultiGpuRocm(t *testing.T) { + Tags(t, KftoRocm, MultiNodeMultiGpu(2, support.AMD, 2)) + sdktests.RunRhaiFeaturesProgressionMultiGpuTest(t, support.AMD, 2, 2) +} + +func TestRhaiJitCheckpointingMultiGpuRocm(t *testing.T) { + Tags(t, KftoRocm, MultiNodeMultiGpu(2, support.AMD, 2)) + sdktests.RunRhaiFeaturesCheckpointMultiGpuTest(t, support.AMD, 2, 2) +} + +func TestRhaiFeaturesMultiGpuRocm(t *testing.T) { + Tags(t, KftoRocm, MultiNodeMultiGpu(2, support.AMD, 2)) + sdktests.RunRhaiFeaturesAllMultiGpuTest(t, support.AMD, 2, 2) +} diff --git a/tests/trainer/resources/disconnected_env/README.md b/tests/trainer/resources/disconnected_env/README.md new file mode 100644 index 000000000..4705c605c --- /dev/null +++ b/tests/trainer/resources/disconnected_env/README.md @@ -0,0 +1,469 @@ +# Disconnected Environment Setup for Trainer Tests + +This guide covers setting up and running trainer v2/SDK tests in a disconnected OpenShift environment (no internet access from cluster nodes). + +## Prerequisites + +- OpenShift cluster with RHOAI/ODH installed +- Bastion host with internet access for mirroring +- MinIO/S3-compatible storage accessible from cluster +- PyPI mirror (Nexus/DevPI) accessible from cluster +- Image registry accessible from cluster (e.g., Quay, internal registry) +- **Go 1.24** (not 1.25+) for running tests +- **Notebook image with Python 3.9+** (required for kubeflow-trainer-api>=2.0.0) + +## Overview + +Disconnected environments require: +1. **Container images** mirrored to internal registry +2. **Models/datasets** pre-staged to S3/MinIO +3. **Python packages** available via PyPI mirror or S3 +4. **Environment variables** configured for tests + +--- + +## Available Test Cases + +### RHAI Features Tests + +| Test Name | Description | Resources | +|-----------|-------------|-----------| +| `TestRhaiTrainingProgressionCPU` | Progression tracking on CPU | 2 nodes, 2 CPUs each | +| `TestRhaiJitCheckpointingCPU` | JIT checkpoint save/resume on CPU | 2 nodes, 2 CPUs each | +| `TestRhaiFeaturesCPU` | All RHAI features combined (CPU) | 2 nodes, 2 CPUs each | +| `TestRhaiTrainingProgressionCuda` | Progression tracking on NVIDIA GPU | 2 nodes, 1 GPU each | +| `TestRhaiJitCheckpointingCuda` | JIT checkpoint on NVIDIA GPU | 2 nodes, 1 GPU each | +| `TestRhaiFeaturesCuda` | All RHAI features (NVIDIA GPU) | 2 nodes, 1 GPU each | +| `TestRhaiTrainingProgressionRocm` | Progression tracking on AMD GPU | 2 nodes, 1 GPU each | +| `TestRhaiJitCheckpointingRocm` | JIT checkpoint on AMD GPU | 2 nodes, 1 GPU each | +| `TestRhaiFeaturesRocm` | All RHAI features (AMD GPU) | 2 nodes, 1 GPU each | + +### Multi-GPU Tests + +| Test Name | Description | Resources | +|-----------|-------------|-----------| +| `TestRhaiTrainingProgressionMultiGpuCuda` | Multi-GPU progression (NVIDIA) | 2 nodes, 2 GPUs each | +| `TestRhaiJitCheckpointingMultiGpuCuda` | Multi-GPU checkpoint (NVIDIA) | 2 nodes, 2 GPUs each | +| `TestRhaiFeaturesMultiGpuCuda` | All features multi-GPU (NVIDIA) | 2 nodes, 2 GPUs each | +| `TestRhaiTrainingProgressionMultiGpuRocm` | Multi-GPU progression (AMD) | 2 nodes, 2 GPUs each | +| `TestRhaiJitCheckpointingMultiGpuRocm` | Multi-GPU checkpoint (AMD) | 2 nodes, 2 GPUs each | +| `TestRhaiFeaturesMultiGpuRocm` | All features multi-GPU (AMD) | 2 nodes, 2 GPUs each | + +--- + +## Step 1: Mirror Container Images + +### 1.1 Required Images + +Get image digests for the notebook and training images: + +```bash +# Notebook image - MUST use version with Python 3.9+ (2024.2 or later) +# Python 3.9+ is required for kubeflow-trainer-api>=2.0.0 +skopeo inspect docker://quay.io/modh/odh-generic-data-science-notebook:2024.2 | jq -r '.Digest' + +# Training runtime images are built into the cluster +``` + +### 1.2 Mirror Images + +```bash +# Mirror notebook image +skopeo copy --all \ + docker://quay.io/modh/odh-generic-data-science-notebook@sha256: \ + docker:///modh/odh-generic-data-science-notebook@sha256: +``` + +### 1.3 Configure ImageDigestMirrorSet (if needed) + +```yaml +apiVersion: config.openshift.io/v1 +kind: ImageDigestMirrorSet +metadata: + name: notebook-mirror +spec: + imageDigestMirrors: + - mirrors: + - /modh + source: quay.io/modh +``` + +--- + +## Step 2: Pre-Stage Models and Datasets to S3/MinIO + +### 2.1 Install Required Packages (on bastion with internet) + +```bash +pip install boto3 huggingface_hub datasets tqdm urllib3 +``` + +### 2.2 Set Environment Variables + +```bash +export AWS_DEFAULT_ENDPOINT="https://:9000" +export AWS_ACCESS_KEY_ID="" +export AWS_SECRET_ACCESS_KEY="" +export AWS_STORAGE_BUCKET="" +export HF_TOKEN="" # Optional, for gated models +``` + +### 2.3 Run Pre-Stage Script + +The `prestage_models_datasets.py` script downloads models/datasets from HuggingFace and uploads to S3. + +```bash +cd tests/trainer/resources/disconnected_env + +# List available presets +python3 prestage_models_datasets.py --list-presets + +# Pre-stage for RHAI tests (distilgpt2 + alpaca-cleaned) +python3 prestage_models_datasets.py --preset rhai + +# Pre-stage custom model/dataset +python3 prestage_models_datasets.py \ + --model "distilgpt2" \ + --dataset "yahma/alpaca-cleaned" + +# Force re-upload (overwrites existing) +python3 prestage_models_datasets.py --preset rhai --force + +# Check what exists in S3 +python3 prestage_models_datasets.py --preset rhai --check +``` + +### 2.4 Verify S3 Contents + +After pre-staging, verify files are uploaded: + +```bash +# Using aws CLI or mc (MinIO client) +mc ls myminio//models/distilgpt2/ + +# Expected files for distilgpt2: +# - config.json +# - generation_config.json +# - merges.txt +# - model.safetensors (or pytorch_model.bin) +# - tokenizer.json +# - tokenizer_config.json +# - vocab.json +``` + +### 2.5 S3 Bucket Structure + +The prestage script creates this structure: + +``` +/ +├── models/ +│ └── distilgpt2/ +│ ├── config.json +│ ├── model.safetensors +│ ├── tokenizer.json +│ ├── vocab.json +│ └── ... +├── alpaca-cleaned-datasets/ +│ ├── train/ +│ │ └── data-*.arrow +│ ├── dataset_dict.json +│ └── dataset_info.json +└── wheels/ + └── kubeflow-0.2.1+rhai0-py3-none-any.whl # See Step 3 +``` + +--- + +## Step 3: Prepare Kubeflow SDK Wheel + +The RHAI features require a specific version of the Kubeflow SDK that may not be on public PyPI. + +### 3.1 Download SDK Wheel + +```bash +# Clone and build the wheel +git clone https://github.com/opendatahub-io/kubeflow-sdk.git +cd kubeflow-sdk +git checkout v0.2.1+rhai0 +pip install build +python -m build --wheel + +# Or download pre-built wheel +pip download "kubeflow @ git+https://github.com/opendatahub-io/kubeflow-sdk.git@v0.2.1+rhai0" \ + --no-deps -d /tmp/wheels +``` + +### 3.2 Upload to S3 + +```bash +# Upload wheel to S3 +mc cp kubeflow-0.2.1+rhai0-py3-none-any.whl myminio//wheels/ +``` + +### 3.3 (Alternative) Upload to PyPI Mirror + +If you have a PyPI mirror (Nexus, DevPI): + +```bash +twine upload --repository-url https:///repository/pypi/ \ + dist/kubeflow-0.2.1+rhai0-py3-none-any.whl +``` + +--- + +## Step 4: Configure Test Environment Variables + +### 4.1 Required Variables + +```bash +# Notebook configuration +export NOTEBOOK_USER_NAME="" +export NOTEBOOK_USER_PASSWORD="" +export NOTEBOOK_IMAGE="quay.io/modh/odh-generic-data-science-notebook@sha256:" + +# S3/MinIO configuration +export AWS_DEFAULT_ENDPOINT="https://:9000" +export AWS_ACCESS_KEY_ID="" +export AWS_SECRET_ACCESS_KEY="" +export AWS_STORAGE_BUCKET="" + +# PyPI mirror (optional but recommended) +export PIP_INDEX_URL="https:///simple/" +export PIP_TRUSTED_HOST="" + +# Test timeout (increase for slow environments) +export TEST_TIMEOUT_LONG="15m" +``` + +### 4.2 Optional Variables + +```bash +# Model/dataset S3 paths (defaults shown) +export MODEL_S3_PREFIX="models/distilgpt2" +export DATASET_S3_PREFIX="alpaca-cleaned-datasets" + +# Kubeflow wheel S3 path (default shown) +export KUBEFLOW_WHEEL_S3_KEY="wheels/kubeflow-0.2.1+rhai0-py3-none-any.whl" +export KUBEFLOW_REQUIRED_VERSION="0.2.1" + +# SSL verification (set to "false" for self-signed certs on S3) +export VERIFY_SSL="false" +``` + +--- + +## Step 5: Run Tests + +### 5.1 Verify Go Version + +The tests require Go 1.24. If you have multiple Go versions: + +```bash +# macOS with Homebrew +export PATH=/opt/homebrew/opt/go@1.24/bin:$PATH +export GOROOT=/opt/homebrew/opt/go@1.24/libexec + +# Verify +go version # Should show go1.24.x +``` + +### 5.2 RHAI Features Test (CPU) + +```bash +NOTEBOOK_USER_NAME="htpasswd-cluster-admin-user" \ +NOTEBOOK_USER_PASSWORD="" \ +NOTEBOOK_IMAGE="quay.io/modh/odh-generic-data-science-notebook@sha256:" \ +AWS_DEFAULT_ENDPOINT="https://:9000" \ +AWS_ACCESS_KEY_ID="" \ +AWS_SECRET_ACCESS_KEY="" \ +AWS_STORAGE_BUCKET="" \ +PIP_INDEX_URL="https:///simple/" \ +PIP_TRUSTED_HOST="" \ +go test -v ./tests/trainer -run TestRhaiTrainingProgressionCPU -timeout 30m +``` + +### 5.3 All Trainer Tests + +```bash +go test -v ./tests/trainer/... -timeout 60m +``` + +### 5.4 Monitor Test Progress + +While tests are running, monitor pods in another terminal: + +```bash +# Watch pods in test namespace +oc get pods -n -w + +# Check training pod logs +oc logs -n -node-0-0 -f + +# Check notebook pod logs +oc logs -n jupyter-nb- -f +``` + +--- + +## Troubleshooting + +### Issue: Model files missing in S3 + +**Symptom:** +``` +TypeError: expected str, bytes or os.PathLike object, not NoneType +``` + +**Cause:** Tokenizer files (vocab.json, etc.) not uploaded to S3. + +**Fix:** Re-run prestage with `--force`: +```bash +python3 prestage_models_datasets.py --preset rhai --force +``` + +### Issue: Kubeflow SDK not found + +**Symptom:** +``` +ModuleNotFoundError: No module named 'kubeflow.trainer' +``` + +**Cause:** Wrong kubeflow version installed or wheel not in S3. + +**Fix:** +1. Upload correct wheel to S3: + ```bash + mc cp kubeflow-0.2.1+rhai0-py3-none-any.whl myminio//wheels/ + ``` +2. Verify `KUBEFLOW_WHEEL_S3_KEY` points to correct path + +### Issue: SSL Certificate errors + +**Symptom:** +``` +SSL: CERTIFICATE_VERIFY_FAILED +``` + +**Cause:** Self-signed certificates on PyPI mirror or S3. + +**Fix:** Set trusted host: +```bash +export PIP_TRUSTED_HOST="" +``` + +### Issue: Training pods stuck in ContainerCreating + +**Symptom:** Pods stay in ContainerCreating for extended time. + +**Cause:** NFS volume provisioning is slow. + +**Fix:** Increase timeout: +```bash +export TEST_TIMEOUT_LONG="15m" +``` + +### Issue: Image pull errors + +**Symptom:** +``` +ImagePullBackOff +``` + +**Cause:** Images not mirrored or IDMS not configured. + +**Fix:** +1. Mirror images to internal registry +2. Configure ImageDigestMirrorSet +3. Use digest-based image references + +## Verifying Success + +A successful `TestRhaiTrainingProgressionCPU` test shows: + +``` +=== RUN TestRhaiTrainingProgressionCPU + rhai_features_tests.go:XXX: S3 mode: endpoint=https://..., bucket=... + rhai_features_tests.go:XXX: PyPI mirror: https://... + rhai_features_tests.go:XXX: Notebook created successfully + rhai_features_tests.go:XXX: Training completed, progression tracked +--- PASS: TestRhaiTrainingProgressionCPU (XXXs) +``` + +In the training pod logs, you should see: +``` +[Kubeflow] Progression tracking enabled +Detected accelerator: CPU +Using backend: gloo +Local mode: loading from shared PVC +Loading model from: /workspace/models/distilgpt2 +Loading dataset from: /workspace/datasets/alpaca-cleaned +Training started... +{'loss': X.XXX, 'epoch': X.X, ...} +[Kubeflow] Progression: epoch=X, step=X +Training completed successfully +``` + +--- + +## Test Flow Diagram + +``` +┌─────────────────┐ +│ Bastion Host │ +│ (has internet) │ +└────────┬────────┘ + │ + ▼ +┌─────────────────┐ ┌─────────────────┐ +│ prestage script │────▶│ S3/MinIO │ +│ downloads from │ │ - models/ │ +│ HuggingFace │ │ - datasets/ │ +└─────────────────┘ │ - wheels/ │ + └────────┬────────┘ + │ + ┌───────────────────────┼───────────────────────┐ + │ │ │ + ▼ ▼ ▼ +┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ +│ Notebook Pod │ │ Training Pod │ │ Training Pod │ +│ - kubeflow SDK │ │ (node-0) │ │ (node-1) │ +│ - downloads S3 │ │ - loads from │ │ - loads from │ +│ to shared PVC │ │ shared PVC │ │ shared PVC │ +└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ + │ │ │ + └─────────────────────┴─────────────────────┘ + │ + ▼ + ┌─────────────────┐ + │ Shared PVC │ + │ /workspace/ │ + │ ├─ models/ │ + │ └─ datasets/ │ + └─────────────────┘ +``` +--- + +## Quick Reference + +| Component | Location | Notes | +|-----------|----------|-------| +| Prestage script | `tests/trainer/resources/disconnected_env/prestage_models_datasets.py` | Run on bastion | +| Kubeflow install | `tests/trainer/resources/disconnected_env/install_kubeflow.py` | Used by notebook | +| RHAI notebook | `tests/trainer/resources/rhai_features.ipynb` | Main test notebook | +| Go test | `tests/trainer/sdk_tests/rhai_features_tests.go` | Test runner | + +| Env Variable | Required | Description | +|--------------|----------|-------------| +| `NOTEBOOK_USER_NAME` | Yes | OpenShift username | +| `NOTEBOOK_USER_PASSWORD` | Yes | OpenShift password | +| `NOTEBOOK_IMAGE` | Yes | Notebook image with digest (must have Python 3.9+) | +| `AWS_DEFAULT_ENDPOINT` | Yes | S3/MinIO endpoint | +| `AWS_ACCESS_KEY_ID` | Yes | S3 access key | +| `AWS_SECRET_ACCESS_KEY` | Yes | S3 secret key | +| `AWS_STORAGE_BUCKET` | Yes | S3 bucket name | +| `PIP_INDEX_URL` | Recommended | PyPI mirror URL | +| `PIP_TRUSTED_HOST` | Recommended | PyPI mirror hostname (for SSL bypass) | +| `TEST_TIMEOUT_LONG` | Recommended | Test timeout (default 5m, set to 15m) | +| `VERIFY_SSL` | Optional | Set to "false" for self-signed S3 certs | + diff --git a/tests/trainer/resources/disconnected_env/install_kubeflow.py b/tests/trainer/resources/disconnected_env/install_kubeflow.py new file mode 100644 index 000000000..aa7911140 --- /dev/null +++ b/tests/trainer/resources/disconnected_env/install_kubeflow.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +""" +Install kubeflow SDK with S3 fallback for disconnected environments. + +Usage: + python install_kubeflow.py + +Tries PyPI first, falls back to S3 wheel if PyPI fails. +Requires env vars for S3 fallback: + - AWS_DEFAULT_ENDPOINT + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + - AWS_STORAGE_BUCKET + - KUBEFLOW_WHEEL_S3_KEY (optional, default: wheels/kubeflow-0.2.1+rhai0-py3-none-any.whl) + - KUBEFLOW_REQUIRED_VERSION (optional, default: 0.2.1) - minimum version required +""" + +import subprocess +import sys +import os +import warnings + +# Suppress SSL warnings for self-signed certs in disconnected environments +try: + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) +except ImportError: + pass +warnings.filterwarnings("ignore", message=".*InsecureRequestWarning.*") + + +def get_required_version(): + """Get required kubeflow version from env or use default.""" + return os.environ.get("KUBEFLOW_REQUIRED_VERSION", "0.2.1+rhai0") + + +def verify_kubeflow_version(): + """Verify kubeflow version matches required version exactly.""" + required_version = get_required_version() + try: + import kubeflow + installed_version = getattr(kubeflow, "__version__", "unknown") + # Exact match required (e.g., 0.2.1+rhai0 must match 0.2.1+rhai0) + if installed_version == required_version: + print(f"Verified: kubeflow version {installed_version} matches required {required_version}") + return True + else: + print(f"Version mismatch: installed '{installed_version}', required '{required_version}'") + return False + except ImportError: + print("kubeflow module not found") + return False + + +def install_from_pypi(): + """Try installing kubeflow from PyPI.""" + required_version = get_required_version() + print(f"Attempting to install kubeflow=={required_version} from PyPI...") + result = subprocess.run( + [sys.executable, "-m", "pip", "install", "--quiet", f"kubeflow=={required_version}"], + capture_output=True, + text=True + ) + if result.returncode == 0: + # Verify the correct version is installed + if verify_kubeflow_version(): + print("Successfully installed kubeflow SDK from PyPI") + return True + else: + print("PyPI kubeflow package version doesn't match, will try S3 fallback") + return False + print(f"PyPI install failed (version {required_version} not found): {result.stderr}") + return False + + +def install_from_s3(): + """Download wheel from S3 and install.""" + print("Falling back to S3 wheel...") + + endpoint = os.environ.get("AWS_DEFAULT_ENDPOINT", "") + access_key = os.environ.get("AWS_ACCESS_KEY_ID", "") + secret_key = os.environ.get("AWS_SECRET_ACCESS_KEY", "") + bucket = os.environ.get("AWS_STORAGE_BUCKET", "") + + if not all([endpoint, access_key, secret_key, bucket]): + print("S3 credentials not configured, cannot fallback to S3") + return False + + try: + import boto3 + from botocore.config import Config + import urllib3 + urllib3.disable_warnings() + + config = Config( + signature_version="s3v4", + s3={"addressing_style": "path"}, + ) + + endpoint_url = endpoint if endpoint.startswith("http") else f"https://{endpoint}" + + s3 = boto3.client( + "s3", + endpoint_url=endpoint_url, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + config=config, + verify=False, + ) + + # Get wheel path from env var or use default + wheel_key = os.environ.get( + "KUBEFLOW_WHEEL_S3_KEY", + "wheels/kubeflow-0.2.1+rhai0-py3-none-any.whl" + ) + # Preserve original wheel filename (pip requires valid wheel name) + wheel_filename = wheel_key.split("/")[-1] + local_path = f"/tmp/{wheel_filename}" + + print(f"Downloading s3://{bucket}/{wheel_key} to {local_path}") + s3.download_file(bucket, wheel_key, local_path) + + # Install the wheel + result = subprocess.run( + [sys.executable, "-m", "pip", "install", "--quiet", local_path], + capture_output=True, + text=True + ) + + if result.returncode == 0: + print("Successfully installed kubeflow from S3 wheel") + return True + + print(f"Wheel install failed: {result.stderr}") + return False + + except Exception as e: + print(f"S3 fallback failed: {e}") + return False + + +def main(): + # Try PyPI first + if install_from_pypi(): + return 0 + + # Fallback to S3 + if install_from_s3(): + return 0 + + print("ERROR: Could not install kubeflow from PyPI or S3") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) + diff --git a/tests/trainer/resources/disconnected_env/prestage_models_datasets.py b/tests/trainer/resources/disconnected_env/prestage_models_datasets.py new file mode 100644 index 000000000..8fb1d1456 --- /dev/null +++ b/tests/trainer/resources/disconnected_env/prestage_models_datasets.py @@ -0,0 +1,852 @@ +#!/usr/bin/env python3 +""" +Pre-stage HuggingFace models and datasets to MinIO for disconnected trainer tests. + +This script downloads models and datasets from HuggingFace Hub and uploads them +to an internal MinIO/S3 storage for use in air-gapped/disconnected environments. + +Usage: + # Set S3/MinIO credentials + export AWS_DEFAULT_ENDPOINT="https://bastion.example.com:9000" + export AWS_ACCESS_KEY_ID="minioadmin" + export AWS_SECRET_ACCESS_KEY="minioadmin" + export AWS_STORAGE_BUCKET="rhoai-dw" + + # Mirror a specific model (auto-generates S3 path: models/distilgpt2/) + python prestage_models_datasets.py --model distilgpt2 + + # Mirror with custom S3 path (format: SOURCE:S3_PATH) + python prestage_models_datasets.py --model distilgpt2:my-models/gpt2 + python prestage_models_datasets.py --dataset yahma/alpaca-cleaned:training-data/alpaca + + # Mirror multiple models and datasets + python prestage_models_datasets.py \ + --model distilgpt2 \ + --model Qwen/Qwen2.5-1.5B-Instruct:models/qwen-1.5b \ + --dataset yahma/alpaca-cleaned + + # Use presets (predefined sets for specific tests) + python prestage_models_datasets.py --preset rhai + python prestage_models_datasets.py --preset sft + python prestage_models_datasets.py --preset all + +Environment Variables: + AWS_DEFAULT_ENDPOINT - MinIO/S3 endpoint URL (required) + AWS_ACCESS_KEY_ID - Access key (required) + AWS_SECRET_ACCESS_KEY - Secret key (required) + AWS_STORAGE_BUCKET - Target bucket name (default: rhoai-dw) + AWS_DEFAULT_REGION - S3 region (default: us-east-1) + VERIFY_SSL - Verify SSL certificate (default: false) + DOWNLOAD_DIR - Local directory for downloads (default: ./downloads) + SKIP_UPLOAD - Skip upload, only download (default: false) + SKIP_DOWNLOAD - Skip download, only upload existing (default: false) +""" + +import os +import sys +import argparse +from pathlib import Path +from typing import Dict, List, Tuple + +# Suppress SSL warnings for self-signed certs +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +# ============================================================================= +# Presets - Predefined sets for specific tests +# Paths match existing MinIO folder structure: +# - models// +# - -datasets/ +# ============================================================================= + +PRESETS: Dict[str, Dict[str, List[str]]] = { + "rhai": { + # Format: "source" or "source:custom/path" + "models": ["distilgpt2:models/distilgpt2"], + "datasets": ["yahma/alpaca-cleaned:alpaca-cleaned-datasets"], + "description": "RHAI Features test - small model for quick testing", + }, + "sft": { + "models": ["Qwen/Qwen2.5-1.5B-Instruct:models/Qwen2.5-1.5B-Instruct"], + "datasets": ["LipengCS/Table-GPT:table-gpt-data"], # Table-GPT dataset for SFT + "description": "SFT Training test - Qwen model (~3GB) + Table-GPT dataset", + }, + "osft": { + "models": ["Qwen/Qwen2.5-1.5B-Instruct:models/Qwen2.5-1.5B-Instruct"], + "datasets": ["LipengCS/Table-GPT:table-gpt-data"], # Same dataset as SFT + "description": "OSFT Training test - Qwen model + Table-GPT dataset", + }, + "all": { + "models": [ + "distilgpt2:models/distilgpt2", + "Qwen/Qwen2.5-1.5B-Instruct:models/Qwen2.5-1.5B-Instruct", + ], + "datasets": [ + "yahma/alpaca-cleaned:alpaca-cleaned-datasets", + "LipengCS/Table-GPT:table-gpt-data", + ], + "description": "All models and datasets for trainer tests", + }, +} + + +# ============================================================================= +# Helper Functions +# ============================================================================= + +def get_env_or_fail(key: str) -> str: + """Get required environment variable or exit.""" + value = os.environ.get(key) + if not value: + print(f"ERROR: Required environment variable '{key}' is not set.") + sys.exit(1) + return value + + +def get_env_bool(key: str, default: bool = False) -> bool: + """Get boolean environment variable.""" + value = os.environ.get(key, str(default)).lower() + return value in ("true", "1", "yes") + + +def format_size(size_bytes: int) -> str: + """Format file size in human-readable format.""" + for unit in ["B", "KB", "MB", "GB", "TB"]: + if size_bytes < 1024: + return f"{size_bytes:.1f} {unit}" if unit != "B" else f"{size_bytes} {unit}" + size_bytes /= 1024 + return f"{size_bytes:.1f} PB" + + +def get_s3_client(endpoint: str, access_key: str, secret_key: str, region: str, verify_ssl: bool): + """Create S3/MinIO client.""" + import boto3 + from botocore.config import Config + + config = Config( + signature_version="s3v4", + s3={"addressing_style": "path"}, + retries={"max_attempts": 3}, + ) + + return boto3.client( + "s3", + endpoint_url=endpoint, + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region, + config=config, + verify=verify_ssl, + ) + + +def get_s3_prefix(name: str, resource_type: str) -> str: + """ + Generate S3 prefix from model/dataset name. + + Matches existing MinIO folder structure: + - Models: models// + - Datasets: -datasets/ + """ + # Extract the model/dataset name (last part after /) + short_name = name.split("/")[-1] + + if resource_type == "models": + return f"models/{short_name}" + elif resource_type == "datasets": + # Match existing pattern: fashion-mnist-datasets, mnist-datasets + return f"{short_name}-datasets" + else: + return f"{resource_type}/{short_name}" + + +def parse_resource_spec(spec: str, resource_type: str) -> Tuple[str, str]: + """ + Parse resource specification in format 'source' or 'source:s3_path'. + + The delimiter is the LAST colon that is followed by a path-like string. + HuggingFace sources may contain '/' but not ':' in the source name. + + Examples: + 'distilgpt2' -> ('distilgpt2', 'models/distilgpt2') + 'distilgpt2:my-models/gpt2' -> ('distilgpt2', 'my-models/gpt2') + 'distilgpt2:models/distilgpt2' -> ('distilgpt2', 'models/distilgpt2') + 'yahma/alpaca-cleaned' -> ('yahma/alpaca-cleaned', 'alpaca-cleaned-datasets') + 'yahma/alpaca-cleaned:alpaca-cleaned-datasets' -> ('yahma/alpaca-cleaned', 'alpaca-cleaned-datasets') + 'yahma/alpaca-cleaned:data/alpaca' -> ('yahma/alpaca-cleaned', 'data/alpaca') + + Returns: + (source_name, s3_prefix) + """ + # Find the last colon - everything after is the custom path + last_colon = spec.rfind(":") + + if last_colon != -1: + source = spec[:last_colon] + s3_path = spec[last_colon + 1:].strip("/") + + # Validate: source should not be empty + if source and s3_path: + return (source, s3_path) + + # No custom path, generate default + return (spec, get_s3_prefix(spec, resource_type)) + + +def check_s3_prefix_exists(s3_client, bucket: str, s3_prefix: str) -> Tuple[bool, int]: + """ + Check if an S3 prefix already has objects. + Returns (exists, file_count). + """ + try: + paginator = s3_client.get_paginator("list_objects_v2") + file_count = 0 + for page in paginator.paginate(Bucket=bucket, Prefix=s3_prefix + "/", MaxKeys=100): + contents = page.get("Contents", []) + file_count += len(contents) + if file_count > 0: + # Early exit once we confirm existence + break + return (file_count > 0, file_count) + except Exception: + return (False, 0) + + +def list_bucket_tree(s3_client, bucket: str, prefix: str = "") -> Dict[str, any]: + """ + List all objects in bucket and return as nested dict for tree display. + """ + tree = {} + try: + paginator = s3_client.get_paginator("list_objects_v2") + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + size = obj["Size"] + parts = key.split("/") + current = tree + for i, part in enumerate(parts): + if not part: # Skip empty parts + continue + if i == len(parts) - 1: # Last part is the file + current[part] = size + else: # Directory + if part not in current: + current[part] = {} + current = current[part] + except Exception as e: + print(f" Error listing bucket: {e}") + return tree + + +def print_tree(tree: Dict, prefix: str = "", is_last: bool = True, indent: str = ""): + """ + Print tree structure with proper indentation. + """ + items = sorted(tree.items(), key=lambda x: (isinstance(x[1], dict), x[0])) + + for i, (name, value) in enumerate(items): + is_last_item = i == len(items) - 1 + connector = "└── " if is_last_item else "├── " + + if isinstance(value, dict): + # Directory + print(f"{indent}{connector}{name}/") + new_indent = indent + (" " if is_last_item else "│ ") + print_tree(value, name, is_last_item, new_indent) + else: + # File with size + size_str = format_size(value) + print(f"{indent}{connector}{name} ({size_str})") + + +def upload_directory(s3_client, local_dir: str, bucket: str, s3_prefix: str, skip_existing: bool = True): + """ + Upload local directory to S3, preserving structure. + Supports resume by skipping files that already exist in S3 with matching size. + Shows progress bar with ETA for large uploads. + """ + from tqdm import tqdm + from tqdm.utils import CallbackIOWrapper + + local_path = Path(local_dir) + file_count = 0 + skipped_count = 0 + + # First pass: collect files and calculate total size + files_to_upload = [] + total_size = 0 + + for file_path in local_path.rglob("*"): + if file_path.is_file(): + relative = file_path.relative_to(local_path) + + # Skip .git and other hidden files + if any(part.startswith(".") for part in relative.parts): + continue + + s3_key = f"{s3_prefix}/{relative}" + file_size = file_path.stat().st_size + + # Check if file already exists in S3 with same size (for resume) + if skip_existing: + try: + response = s3_client.head_object(Bucket=bucket, Key=s3_key) + s3_size = response.get("ContentLength", 0) + if s3_size == file_size: + skipped_count += 1 + continue # File already uploaded + except Exception: + pass # File doesn't exist, proceed with upload + + files_to_upload.append((file_path, s3_key, file_size, relative)) + total_size += file_size + + if not files_to_upload: + if skipped_count > 0: + print(f" ✅ All {skipped_count} files already uploaded") + return 0 + + # Upload with progress bar + print(f" Uploading {len(files_to_upload)} files ({format_size(total_size)})") + + with tqdm( + total=total_size, + unit="B", + unit_scale=True, + unit_divisor=1024, + desc=" Progress", + bar_format="{desc}: {percentage:3.0f}%|{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]" + ) as pbar: + for file_path, s3_key, file_size, relative in files_to_upload: + # Upload with callback for progress + with open(file_path, "rb") as f: + wrapped_file = CallbackIOWrapper(pbar.update, f, "read") + s3_client.upload_fileobj(wrapped_file, bucket, s3_key) + file_count += 1 + + if skipped_count > 0: + print(f" (Skipped {skipped_count} already-uploaded files)") + + return file_count + + +def download_model(model_name: str, local_dir: str): + """Download model from HuggingFace Hub. Automatically resumes incomplete downloads.""" + from huggingface_hub import snapshot_download, list_repo_files + + # Check if already downloaded (has config.json or model files) + local_path = Path(local_dir) + if local_path.exists() and any(local_path.glob("*.json")): + file_count = len(list(local_path.rglob("*"))) + print(f" Model already downloaded ({file_count} files), verifying...") + else: + # Show expected file count + try: + repo_files = list_repo_files(model_name) + print(f" Downloading model: {model_name} ({len(repo_files)} files)") + except Exception: + print(f" Downloading model: {model_name}") + + # HuggingFace's snapshot_download has built-in progress bars with ETA + snapshot_download( + model_name, + local_dir=local_dir, + local_dir_use_symlinks=False, + ) + + # Show final size + total_size = sum(f.stat().st_size for f in Path(local_dir).rglob("*") if f.is_file()) + print(f" ✅ Model ready at: {local_dir} ({format_size(total_size)})") + + +def download_dataset(dataset_name: str, local_dir: str): + """Download dataset from HuggingFace Hub. Caches to HF cache dir for resume. + + Special handling for specific datasets: + - LipengCS/Table-GPT: Saved as JSONL files (required by SFT notebook) + - Others: Saved in Arrow format using save_to_disk() + """ + from datasets import load_dataset + import time + import json + + local_path = Path(local_dir) + + # Special handling for Table-GPT dataset (SFT tests require JSONL format) + if "Table-GPT" in dataset_name or "table-gpt" in dataset_name.lower(): + jsonl_file = local_path / "train" / "train_All_100.jsonl" + if jsonl_file.exists(): + total_size = jsonl_file.stat().st_size + print(f" Table-GPT dataset already downloaded ({format_size(total_size)}), skipping...") + print(f" ✅ Dataset ready at: {jsonl_file}") + return + + print(f" Downloading Table-GPT dataset: {dataset_name}") + print(f" (Saving as JSONL for SFT notebook compatibility)") + start_time = time.time() + + # Load with "All" config as specified in sft.ipynb + ds = load_dataset(dataset_name, "All") + train_data = ds["train"] + + # Save first 100 samples as JSONL (matching sft.ipynb behavior) + subset = train_data.select(range(min(100, len(train_data)))) + + # Create directory structure + train_dir = local_path / "train" + train_dir.mkdir(parents=True, exist_ok=True) + + # Save as JSONL + with open(jsonl_file, "w") as f: + for example in subset: + f.write(json.dumps(example) + "\n") + + elapsed = time.time() - start_time + total_size = jsonl_file.stat().st_size + print(f" ✅ Table-GPT dataset ready at: {jsonl_file} ({format_size(total_size)}, took {elapsed:.1f}s)") + return + + # Standard dataset handling (Arrow format) + if local_path.exists() and (local_path / "dataset_info.json").exists(): + total_size = sum(f.stat().st_size for f in local_path.rglob("*") if f.is_file()) + print(f" Dataset already downloaded ({format_size(total_size)}), skipping...") + print(f" ✅ Dataset ready at: {local_dir}") + return + + print(f" Downloading dataset: {dataset_name}") + print(f" (HuggingFace datasets library shows download progress)") + start_time = time.time() + + # HuggingFace datasets has its own progress bars + ds = load_dataset(dataset_name) + + print(f" Saving dataset to disk...") + ds.save_to_disk(local_dir) + + elapsed = time.time() - start_time + total_size = sum(f.stat().st_size for f in Path(local_dir).rglob("*") if f.is_file()) + print(f" ✅ Dataset ready at: {local_dir} ({format_size(total_size)}, took {elapsed:.1f}s)") + + +def check_dependencies(): + """Check if required Python packages are installed.""" + missing = [] + + try: + import boto3 + except ImportError: + missing.append("boto3") + + try: + from huggingface_hub import snapshot_download + except ImportError: + missing.append("huggingface_hub") + + try: + from datasets import load_dataset + except ImportError: + missing.append("datasets") + + try: + from tqdm import tqdm + except ImportError: + missing.append("tqdm") + + if missing: + print("ERROR: Missing required packages. Install with:") + print(f" pip install {' '.join(missing)}") + sys.exit(1) + + +def process_model(model_name: str, s3_client, bucket: str, download_dir: str, + skip_download: bool, skip_upload: bool, force: bool = False, + custom_s3_prefix: str = None) -> Tuple[str, int, str]: + """ + Process a single model: download and upload. + Returns (s3_prefix, file_count, status) where status is 'uploaded', 'skipped', or 'exists'. + """ + s3_prefix = custom_s3_prefix if custom_s3_prefix else get_s3_prefix(model_name, "models") + local_dir = os.path.join(download_dir, "models", model_name.replace("/", "_")) + + print(f"\n>>> Processing model: {model_name}") + print(f" S3 target: s3://{bucket}/{s3_prefix}/") + + # Check if already exists in S3 + if s3_client and not skip_upload: + exists, existing_count = check_s3_prefix_exists(s3_client, bucket, s3_prefix) + if exists and not force: + print(f" ⏭️ SKIPPED: Already exists in S3 ({existing_count}+ files)") + print(f" Use --force to overwrite existing data") + return s3_prefix, existing_count, "exists" + elif exists and force: + print(f" ⚠️ EXISTS: Found {existing_count}+ files, will overwrite (--force)") + + # Download + if not skip_download: + download_model(model_name, local_dir) + else: + print(f" Skipping download (using existing: {local_dir})") + + # Upload + file_count = 0 + if not skip_upload and s3_client: + print(f" Uploading to s3://{bucket}/{s3_prefix}/") + file_count = upload_directory(s3_client, local_dir, bucket, s3_prefix) + print(f" ✅ Uploaded {file_count} files") + + return s3_prefix, file_count, "uploaded" + + +def process_dataset(dataset_name: str, s3_client, bucket: str, download_dir: str, + skip_download: bool, skip_upload: bool, force: bool = False, + custom_s3_prefix: str = None) -> Tuple[str, int, str]: + """ + Process a single dataset: download and upload. + Returns (s3_prefix, file_count, status) where status is 'uploaded', 'skipped', or 'exists'. + """ + s3_prefix = custom_s3_prefix if custom_s3_prefix else get_s3_prefix(dataset_name, "datasets") + local_dir = os.path.join(download_dir, "datasets", dataset_name.replace("/", "_")) + + print(f"\n>>> Processing dataset: {dataset_name}") + print(f" S3 target: s3://{bucket}/{s3_prefix}/") + + # Check if already exists in S3 + if s3_client and not skip_upload: + exists, existing_count = check_s3_prefix_exists(s3_client, bucket, s3_prefix) + if exists and not force: + print(f" ⏭️ SKIPPED: Already exists in S3 ({existing_count}+ files)") + print(f" Use --force to overwrite existing data") + return s3_prefix, existing_count, "exists" + elif exists and force: + print(f" ⚠️ EXISTS: Found {existing_count}+ files, will overwrite (--force)") + + # Download + if not skip_download: + download_dataset(dataset_name, local_dir) + else: + print(f" Skipping download (using existing: {local_dir})") + + # Upload + file_count = 0 + if not skip_upload and s3_client: + print(f" Uploading to s3://{bucket}/{s3_prefix}/") + file_count = upload_directory(s3_client, local_dir, bucket, s3_prefix) + print(f" ✅ Uploaded {file_count} files") + + return s3_prefix, file_count, "uploaded" + + +# ============================================================================= +# Main +# ============================================================================= + +def main(): + parser = argparse.ArgumentParser( + description="Pre-stage HuggingFace models and datasets to MinIO for disconnected tests.", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=__doc__, + ) + + # Resource selection + parser.add_argument( + "--model", "-m", + action="append", + dest="models", + metavar="MODEL[:PATH]", + help="HuggingFace model ID to mirror. Format: 'model_id' or 'model_id:custom/s3/path'. Can be specified multiple times." + ) + parser.add_argument( + "--dataset", "-d", + action="append", + dest="datasets", + metavar="DATASET[:PATH]", + help="HuggingFace dataset ID to mirror. Format: 'dataset_id' or 'dataset_id:custom/s3/path'. Can be specified multiple times." + ) + parser.add_argument( + "--preset", "-p", + choices=list(PRESETS.keys()), + help="Use a predefined preset (rhai, sft, osft, all)" + ) + + # Actions + parser.add_argument( + "--list-presets", + action="store_true", + help="List available presets and their contents" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Show what would be mirrored without actually doing it" + ) + parser.add_argument( + "--force", "-f", + action="store_true", + help="Force upload even if model/dataset already exists in S3" + ) + parser.add_argument( + "--check", + action="store_true", + help="Only check if models/datasets exist in S3, don't download or upload" + ) + + args = parser.parse_args() + + # List presets mode + if args.list_presets: + print("Available presets:\n") + for name, preset in PRESETS.items(): + print(f" {name}:") + print(f" Description: {preset['description']}") + if preset['models']: + print(f" Models: {', '.join(preset['models'])}") + if preset['datasets']: + print(f" Datasets: {', '.join(preset['datasets'])}") + print() + return + + # Collect models and datasets to process as (source, s3_path) tuples + models_to_process: List[Tuple[str, str]] = [] + datasets_to_process: List[Tuple[str, str]] = [] + + # From preset (presets use default paths) + if args.preset: + preset = PRESETS[args.preset] + for m in preset["models"]: + models_to_process.append(parse_resource_spec(m, "models")) + for d in preset["datasets"]: + datasets_to_process.append(parse_resource_spec(d, "datasets")) + print(f"Using preset: {args.preset} - {preset['description']}") + + # From CLI arguments (may include custom paths) + if args.models: + for m in args.models: + models_to_process.append(parse_resource_spec(m, "models")) + if args.datasets: + for d in args.datasets: + datasets_to_process.append(parse_resource_spec(d, "datasets")) + + # Remove duplicates while preserving order (based on source name) + seen_models = set() + unique_models = [] + for source, path in models_to_process: + if source not in seen_models: + seen_models.add(source) + unique_models.append((source, path)) + models_to_process = unique_models + + seen_datasets = set() + unique_datasets = [] + for source, path in datasets_to_process: + if source not in seen_datasets: + seen_datasets.add(source) + unique_datasets.append((source, path)) + datasets_to_process = unique_datasets + + # Validate we have something to do + if not models_to_process and not datasets_to_process: + print("ERROR: No models or datasets specified.") + print("\nUsage examples:") + print(" python prestage_models_datasets.py --model distilgpt2") + print(" python prestage_models_datasets.py --model distilgpt2:custom/path") + print(" python prestage_models_datasets.py --dataset yahma/alpaca-cleaned") + print(" python prestage_models_datasets.py --preset rhai") + print("\nUse --list-presets to see available presets.") + sys.exit(1) + + # Dry run mode + if args.dry_run: + print("\n" + "=" * 60) + print("DRY RUN - Would mirror the following:") + print("=" * 60) + if models_to_process: + print("\nModels:") + for source, s3_path in models_to_process: + print(f" {source} -> {s3_path}/") + if datasets_to_process: + print("\nDatasets:") + for source, s3_path in datasets_to_process: + print(f" {source} -> {s3_path}/") + return + + # Check mode - only verify if resources exist in S3 + if args.check: + check_dependencies() + endpoint = get_env_or_fail("AWS_DEFAULT_ENDPOINT") + access_key = get_env_or_fail("AWS_ACCESS_KEY_ID") + secret_key = get_env_or_fail("AWS_SECRET_ACCESS_KEY") + bucket = os.environ.get("AWS_STORAGE_BUCKET", "rhoai-dw") + region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") + verify_ssl = get_env_bool("VERIFY_SSL", False) + + print("\n" + "=" * 60) + print("CHECK MODE - S3 Bucket Contents:") + print("=" * 60) + + s3 = get_s3_client(endpoint, access_key, secret_key, region, verify_ssl) + + # Display full bucket tree structure + print(f"\n📦 s3://{bucket}/") + tree = list_bucket_tree(s3, bucket) + if tree: + print_tree(tree) + else: + print(" (empty)") + + # Summary of requested resources + print("\n" + "-" * 60) + print("Requested Resources Status:") + print("-" * 60) + + all_exist = True + if models_to_process: + print("\nModels:") + for source, s3_path in models_to_process: + exists, count = check_s3_prefix_exists(s3, bucket, s3_path) + status = f"✅ EXISTS ({count}+ files)" if exists else "❌ NOT FOUND" + print(f" {source} -> {s3_path}/: {status}") + if not exists: + all_exist = False + + if datasets_to_process: + print("\nDatasets:") + for source, s3_path in datasets_to_process: + exists, count = check_s3_prefix_exists(s3, bucket, s3_path) + status = f"✅ EXISTS ({count}+ files)" if exists else "❌ NOT FOUND" + print(f" {source} -> {s3_path}/: {status}") + if not exists: + all_exist = False + + print("\n" + "-" * 60) + if all_exist: + print("✅ All requested models and datasets exist in S3") + else: + print("❌ Some models or datasets are missing from S3") + print(" Run without --check to upload them") + return + + # Check dependencies + check_dependencies() + + # Get configuration from environment (same env vars as v1 tests) + endpoint = get_env_or_fail("AWS_DEFAULT_ENDPOINT") + access_key = get_env_or_fail("AWS_ACCESS_KEY_ID") + secret_key = get_env_or_fail("AWS_SECRET_ACCESS_KEY") + bucket = os.environ.get("AWS_STORAGE_BUCKET", "rhoai-dw") + region = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") + verify_ssl = get_env_bool("VERIFY_SSL", False) + download_dir = os.environ.get("DOWNLOAD_DIR", "./downloads") + skip_upload = get_env_bool("SKIP_UPLOAD", False) + skip_download = get_env_bool("SKIP_DOWNLOAD", False) + + print("\n" + "=" * 60) + print("Pre-staging Models and Datasets for Disconnected Environment") + print("=" * 60) + print(f"AWS_DEFAULT_ENDPOINT: {endpoint}") + print(f"AWS_STORAGE_BUCKET: {bucket}") + print(f"AWS_DEFAULT_REGION: {region}") + print(f"Download Dir: {download_dir}") + print(f"Skip Download: {skip_download}") + print(f"Skip Upload: {skip_upload}") + + if models_to_process: + print(f"\nModels to mirror: {len(models_to_process)}") + for source, s3_path in models_to_process: + print(f" - {source} -> {s3_path}/") + + if datasets_to_process: + print(f"\nDatasets to mirror: {len(datasets_to_process)}") + for source, s3_path in datasets_to_process: + print(f" - {source} -> {s3_path}/") + + print("=" * 60) + + # Create S3 client and test connection + s3 = None + if not skip_upload: + print("\nConnecting to S3/MinIO...") + s3 = get_s3_client(endpoint, access_key, secret_key, region, verify_ssl) + try: + s3.head_bucket(Bucket=bucket) + print(f"Connected! Bucket '{bucket}' exists.") + except Exception as e: + print(f"ERROR: Cannot access bucket '{bucket}': {e}") + sys.exit(1) + + # Create download directory + Path(download_dir).mkdir(parents=True, exist_ok=True) + + # Track results + results: Dict[str, List[Tuple[str, str, int, str]]] = {"models": [], "datasets": []} + + # Process models + if models_to_process: + print("\n" + "=" * 60) + print("MODELS") + print("=" * 60) + + for model_source, custom_s3_path in models_to_process: + s3_prefix, file_count, status = process_model( + model_source, s3, bucket, download_dir, skip_download, skip_upload, + args.force, custom_s3_prefix=custom_s3_path + ) + results["models"].append((model_source, s3_prefix, file_count, status)) + + # Process datasets + if datasets_to_process: + print("\n" + "=" * 60) + print("DATASETS") + print("=" * 60) + + for dataset_source, custom_s3_path in datasets_to_process: + s3_prefix, file_count, status = process_dataset( + dataset_source, s3, bucket, download_dir, skip_download, skip_upload, + args.force, custom_s3_prefix=custom_s3_path + ) + results["datasets"].append((dataset_source, s3_prefix, file_count, status)) + + # Summary + print("\n" + "=" * 60) + print("SUMMARY") + print("=" * 60) + + uploaded_count = 0 + skipped_count = 0 + + print(f"\nResults for s3://{bucket}/:") + if results["models"]: + print("\n Models:") + for name, prefix, count, status in results["models"]: + if status == "exists": + icon = "⏭️ " + skipped_count += 1 + else: + icon = "✅" + uploaded_count += 1 + print(f" {icon} {name} -> {prefix}/ ({count} files) [{status}]") + + if results["datasets"]: + print("\n Datasets:") + for name, prefix, count, status in results["datasets"]: + if status == "exists": + icon = "⏭️ " + skipped_count += 1 + else: + icon = "✅" + uploaded_count += 1 + print(f" {icon} {name} -> {prefix}/ ({count} files) [{status}]") + + print(f"\n Uploaded: {uploaded_count}, Skipped (already exists): {skipped_count}") + + print("\n" + "-" * 60) + print("To use in tests, ensure these environment variables are set:") + print(f" export AWS_DEFAULT_ENDPOINT=\"{endpoint}\"") + print(f" export AWS_ACCESS_KEY_ID=\"\"") + print(f" export AWS_SECRET_ACCESS_KEY=\"\"") + print(f" export AWS_STORAGE_BUCKET=\"{bucket}\"") + + +if __name__ == "__main__": + main() diff --git a/tests/trainer/resources/rhai_features.ipynb b/tests/trainer/resources/rhai_features.ipynb index 930f8a5a9..46b7031c7 100644 --- a/tests/trainer/resources/rhai_features.ipynb +++ b/tests/trainer/resources/rhai_features.ipynb @@ -18,17 +18,38 @@ "outputs": [], "source": [ "def train_bloom():\n", + " \"\"\"Training function for distributed training.\n", + " \n", + " Auto-detects data source:\n", + " - If local data exists on shared PVC (pre-downloaded by notebook) > use it\n", + " - Otherwise > download from HuggingFace Hub\n", + " \n", + " For disconnected environments:\n", + " - The notebook pre-downloads model/dataset from S3 to shared PVC\n", + " - Training pods just load from local paths (no S3 access needed)\n", + " \"\"\"\n", " import os\n", " os.environ[\"HF_HOME\"] = \"/workspace/hf_cache\"\n", " \n", " import torch\n", " import torch.distributed as dist\n", - " from datasets import load_dataset\n", + " from datasets import load_dataset, load_from_disk\n", " from transformers import (\n", " AutoTokenizer, AutoModelForCausalLM, Trainer,\n", " TrainingArguments, DataCollatorForLanguageModeling,\n", " )\n", "\n", + " # Local paths on shared PVC (pre-downloaded by notebook in disconnected mode)\n", + " model_local_path = \"/workspace/models/distilgpt2\"\n", + " dataset_local_path = \"/workspace/datasets/alpaca-cleaned\"\n", + " \n", + " # HuggingFace model/dataset names (for connected environments)\n", + " model_name = \"distilgpt2\"\n", + " dataset_name = \"yahma/alpaca-cleaned\"\n", + " \n", + " # Auto-detect: use local data if it exists\n", + " use_local_data = os.path.exists(os.path.join(model_local_path, \"config.json\"))\n", + "\n", " # ========== Auto-detect accelerator and configure ==========\n", " if torch.cuda.is_available():\n", " # NVIDIA GPU or AMD ROCm (ROCm exposes as CUDA)\n", @@ -71,34 +92,47 @@ " if torch.cuda.is_available():\n", " torch.cuda.set_device(local_rank)\n", "\n", - " # ========== Load model and tokenizer ==========\n", - " model_name = \"distilgpt2\"\n", - " \n", - " if rank == 0:\n", - " print(f\"Downloading model: {model_name}\")\n", - " tokenizer = AutoTokenizer.from_pretrained(model_name)\n", - " model = AutoModelForCausalLM.from_pretrained(model_name)\n", - " print(\"Model downloaded\")\n", - " dist.barrier()\n", - " \n", - " if rank != 0:\n", - " tokenizer = AutoTokenizer.from_pretrained(model_name)\n", - " model = AutoModelForCausalLM.from_pretrained(model_name)\n", + " # ========== Load model and dataset ==========\n", + " if use_local_data:\n", + " # Local mode: load from shared PVC (pre-downloaded by notebook)\n", + " print(f\"Local mode: loading from shared PVC\")\n", + " print(f\"Loading model from: {model_local_path}\")\n", + " tokenizer = AutoTokenizer.from_pretrained(model_local_path)\n", + " model = AutoModelForCausalLM.from_pretrained(model_local_path)\n", + " \n", + " print(f\"Loading dataset from: {dataset_local_path}\")\n", + " dataset = load_from_disk(dataset_local_path)\n", + " # load_from_disk returns DatasetDict, get train split\n", + " if hasattr(dataset, \"keys\") and \"train\" in dataset.keys():\n", + " dataset = dataset[\"train\"]\n", + " # Take only first 100 samples for testing\n", + " dataset = dataset.select(range(min(100, len(dataset))))\n", + " else:\n", + " # HuggingFace mode: download from internet\n", + " print(f\"HuggingFace mode: model={model_name}, dataset={dataset_name}\")\n", + " \n", + " if rank == 0:\n", + " print(f\"Downloading model from HuggingFace: {model_name}\")\n", + " tokenizer = AutoTokenizer.from_pretrained(model_name)\n", + " model = AutoModelForCausalLM.from_pretrained(model_name)\n", + " print(\"Model downloaded\")\n", + " dist.barrier()\n", + " \n", + " if rank != 0:\n", + " tokenizer = AutoTokenizer.from_pretrained(model_name)\n", + " model = AutoModelForCausalLM.from_pretrained(model_name)\n", + " \n", + " if rank == 0:\n", + " print(f\"Downloading dataset from HuggingFace: {dataset_name}\")\n", + " dataset = load_dataset(dataset_name, split=\"train[:100]\")\n", + " print(\"Dataset downloaded\")\n", + " dist.barrier()\n", + " \n", + " if rank != 0:\n", + " dataset = load_dataset(dataset_name, split=\"train[:100]\")\n", " \n", " if tokenizer.pad_token is None:\n", " tokenizer.pad_token = tokenizer.eos_token\n", - "\n", - " # ========== Load and prepare dataset ==========\n", - " dataset_name = \"yahma/alpaca-cleaned\"\n", - " \n", - " if rank == 0:\n", - " print(f\"Downloading dataset: {dataset_name}\")\n", - " dataset = load_dataset(dataset_name, split=\"train[:100]\")\n", - " print(\"Dataset downloaded\")\n", - " dist.barrier()\n", - " \n", - " if rank != 0:\n", - " dataset = load_dataset(dataset_name, split=\"train[:100]\")\n", " \n", " dataset = dataset.train_test_split(test_size=0.2, shuffle=False)\n", " train_ds = dataset[\"train\"]\n", @@ -158,6 +192,14 @@ "outputs": [], "source": [ "import os\n", + "import warnings\n", + "import urllib3\n", + "\n", + "# Suppress SSL warnings for self-signed certs in disconnected environments\n", + "urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)\n", + "warnings.filterwarnings(\"ignore\", message=\".*Unverified HTTPS.*\")\n", + "warnings.filterwarnings(\"ignore\", category=urllib3.exceptions.InsecureRequestWarning)\n", + "\n", "from kubernetes import client as k8s_client\n", "from kubeflow.trainer import TrainerClient\n", "from kubeflow.common.types import KubernetesBackendConfig\n", @@ -197,6 +239,107 @@ "print(f\"Got runtime: {torch_runtime.name}\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Pre-download model/dataset from S3 to shared PVC (disconnected environments only)\n", + "# Training pods will then load from these local paths\n", + "#\n", + "# NOTE: Shared PVC is mounted at different paths:\n", + "# - Notebook pod: /opt/app-root/src (we write here)\n", + "# - Training pods: /workspace (they read from here)\n", + "# Same PVC, different mount points - data is shared!\n", + "\n", + "import os\n", + "import shutil\n", + "\n", + "# Clean up any leftover checkpoints from previous runs to avoid resume conflicts\n", + "# (Notebook path = /opt/app-root/src, training pods see it as /workspace)\n", + "checkpoints_path = \"/opt/app-root/src/checkpoints\"\n", + "if os.path.exists(checkpoints_path):\n", + " print(f\"Cleaning up old checkpoints at {checkpoints_path}\")\n", + " shutil.rmtree(checkpoints_path)\n", + " print(\" ✅ Old checkpoints removed\")\n", + "\n", + "s3_endpoint = os.getenv(\"AWS_DEFAULT_ENDPOINT\", \"\")\n", + "s3_access_key = os.getenv(\"AWS_ACCESS_KEY_ID\", \"\")\n", + "s3_secret_key = os.getenv(\"AWS_SECRET_ACCESS_KEY\", \"\")\n", + "s3_bucket = os.getenv(\"AWS_STORAGE_BUCKET\", \"\")\n", + "model_s3_prefix = os.getenv(\"MODEL_S3_PREFIX\", \"models/distilgpt2\")\n", + "dataset_s3_prefix = os.getenv(\"DATASET_S3_PREFIX\", \"alpaca-cleaned-datasets\")\n", + "\n", + "# Notebook writes to /opt/app-root/src (its PVC mount point)\n", + "# Training pods will read from /workspace (their PVC mount point)\n", + "# Same underlying storage!\n", + "notebook_pvc_path = \"/opt/app-root/src\"\n", + "model_local_path = f\"{notebook_pvc_path}/models/distilgpt2\"\n", + "dataset_local_path = f\"{notebook_pvc_path}/datasets/alpaca-cleaned\"\n", + "\n", + "use_s3 = bool(s3_endpoint and s3_bucket and s3_access_key and s3_secret_key)\n", + "\n", + "if use_s3:\n", + " print(f\"S3 mode: downloading to shared PVC for training pods\")\n", + " print(f\" Endpoint: {s3_endpoint}\")\n", + " print(f\" Bucket: {s3_bucket}\")\n", + " \n", + " import boto3\n", + " from botocore.config import Config\n", + " from pathlib import Path\n", + " \n", + " config = Config(\n", + " signature_version=\"s3v4\",\n", + " s3={\"addressing_style\": \"path\"},\n", + " )\n", + " \n", + " endpoint_url = s3_endpoint if s3_endpoint.startswith(\"http\") else f\"https://{s3_endpoint}\"\n", + " s3_client = boto3.client(\n", + " \"s3\",\n", + " endpoint_url=endpoint_url,\n", + " aws_access_key_id=s3_access_key,\n", + " aws_secret_access_key=s3_secret_key,\n", + " config=config,\n", + " verify=False,\n", + " )\n", + " \n", + " def download_from_s3(s3_prefix: str, local_path: str):\n", + " \"\"\"Download files from S3/MinIO to local path.\"\"\"\n", + " print(f\" Downloading s3://{s3_bucket}/{s3_prefix}/ -> {local_path}\")\n", + " Path(local_path).mkdir(parents=True, exist_ok=True)\n", + " \n", + " paginator = s3_client.get_paginator(\"list_objects_v2\")\n", + " count = 0\n", + " for page in paginator.paginate(Bucket=s3_bucket, Prefix=s3_prefix):\n", + " for obj in page.get(\"Contents\", []):\n", + " key = obj[\"Key\"]\n", + " rel_path = key[len(s3_prefix):].lstrip(\"/\")\n", + " if not rel_path:\n", + " continue\n", + " local_file = os.path.join(local_path, rel_path)\n", + " os.makedirs(os.path.dirname(local_file), exist_ok=True)\n", + " s3_client.download_file(s3_bucket, key, local_file)\n", + " count += 1\n", + " print(f\" ✅ Downloaded {count} files to {local_path}\")\n", + " \n", + " # Download model if not already present\n", + " if not os.path.exists(os.path.join(model_local_path, \"config.json\")):\n", + " download_from_s3(model_s3_prefix, model_local_path)\n", + " else:\n", + " print(f\" Model already exists at {model_local_path}, skipping\")\n", + " \n", + " # Download dataset if not already present\n", + " if not os.path.exists(os.path.join(dataset_local_path, \"dataset_dict.json\")):\n", + " download_from_s3(dataset_s3_prefix, dataset_local_path)\n", + " else:\n", + " print(f\" Dataset already exists at {dataset_local_path}, skipping\")\n", + " \n", + " print(\"✅ S3 download complete - training pods will load from shared PVC\")\n", + "else:\n", + " print(\"HuggingFace mode: training pods will download directly from HF Hub\")\n" + ] + }, { "cell_type": "code", "execution_count": null, @@ -217,27 +360,35 @@ "print(f\"Progression tracking: {enable_progression}\")\n", "print(f\"JIT Checkpoint: {enable_checkpoint}\")\n", "\n", - "# Read GPU config from environment (passed from Go test)\n", + "\n", + "\n", + "# Read GPU and multi-node config from environment (passed from Go test)\n", "gpu_resource_label = os.environ.get(\"GPU_RESOURCE_LABEL\", \"\")\n", + "num_nodes = int(os.environ.get(\"NUM_NODES\", \"2\"))\n", + "num_gpus_per_node = int(os.environ.get(\"NUM_GPUS_PER_NODE\", \"1\"))\n", + "\n", + "print(f\"Training config: num_nodes={num_nodes}, num_gpus_per_node={num_gpus_per_node}\")\n", "\n", "# Configure resources - GPU label tells k8s to schedule on GPU node\n", "if gpu_resource_label:\n", " resources_per_node = {\n", " \"cpu\": 2, \n", " \"memory\": \"8Gi\",\n", - " gpu_resource_label: 1 # e.g., \"nvidia.com/gpu\": 1, \"amd.com/gpu\": 1\n", + " gpu_resource_label: num_gpus_per_node # e.g., \"nvidia.com/gpu\": 2\n", " }\n", - " print(f\"GPU mode: requesting {gpu_resource_label}: 1\")\n", + " print(f\"GPU mode: requesting {gpu_resource_label}: {num_gpus_per_node}\")\n", "else:\n", " resources_per_node = {\"cpu\": 2, \"memory\": \"8Gi\"}\n", " print(\"CPU mode: no GPU requested\")\n", "\n", + "# Note: train_bloom() auto-detects local vs HuggingFace mode\n", + "# No config needed - it checks if local data exists on shared PVC\n", + "\n", "# Build trainer config\n", "trainer_kwargs = {\n", " \"func\": train_bloom,\n", - " \"func_args\": {},\n", - " \"num_nodes\": 2,\n", - " \"resources_per_node\": resources_per_node, \n", + " \"num_nodes\": num_nodes,\n", + " \"resources_per_node\": resources_per_node,\n", "}\n", "\n", "# Add progression tracking config (must explicitly set to False to disable SDK default)\n", diff --git a/tests/trainer/sdk_tests/rhai_features_tests.go b/tests/trainer/sdk_tests/rhai_features_tests.go index ad9f864ce..cbacee7ff 100644 --- a/tests/trainer/sdk_tests/rhai_features_tests.go +++ b/tests/trainer/sdk_tests/rhai_features_tests.go @@ -66,6 +66,8 @@ type RhaiFeatureConfig struct { CheckpointSaveStrategy string CheckpointSaveTotalLimit string Accelerator Accelerator // CPU, NVIDIA, or AMD + NumNodes int // Number of training nodes (default: 2) + NumGpusPerNode int // GPUs per node for multi-GPU tests (default: 1) } // RunRhaiFeaturesProgressionTest runs the e2e test for RHAI features with progression tracking @@ -77,6 +79,8 @@ func RunRhaiFeaturesProgressionTest(t *testing.T, accelerator Accelerator) { CheckpointSaveStrategy: "epoch", CheckpointSaveTotalLimit: "3", Accelerator: accelerator, + NumNodes: 2, // Default: 2 nodes + NumGpusPerNode: 1, // Default: 1 GPU per node }) } @@ -89,6 +93,8 @@ func RunRhaiFeaturesCheckpointTest(t *testing.T, accelerator Accelerator) { CheckpointSaveStrategy: "epoch", CheckpointSaveTotalLimit: "3", Accelerator: accelerator, + NumNodes: 2, // Default: 2 nodes + NumGpusPerNode: 1, // Default: 1 GPU per node }) } @@ -101,6 +107,50 @@ func RunRhaiFeaturesAllTest(t *testing.T, accelerator Accelerator) { CheckpointSaveStrategy: "epoch", CheckpointSaveTotalLimit: "3", Accelerator: accelerator, + NumNodes: 2, // Default: 2 nodes + NumGpusPerNode: 1, // Default: 1 GPU per node + }) +} + +// RunRhaiFeaturesProgressionMultiGpuTest runs multi-GPU test with progression tracking only +func RunRhaiFeaturesProgressionMultiGpuTest(t *testing.T, accelerator Accelerator, numNodes, numGpusPerNode int) { + runRhaiFeaturesTestWithConfig(t, RhaiFeatureConfig{ + EnableProgressionTracking: true, + EnableJitCheckpoint: false, + CheckpointOutputDir: "/workspace/checkpoints", + CheckpointSaveStrategy: "epoch", + CheckpointSaveTotalLimit: "3", + Accelerator: accelerator, + NumNodes: numNodes, + NumGpusPerNode: numGpusPerNode, + }) +} + +// RunRhaiFeaturesCheckpointMultiGpuTest runs multi-GPU test with JIT checkpointing only +func RunRhaiFeaturesCheckpointMultiGpuTest(t *testing.T, accelerator Accelerator, numNodes, numGpusPerNode int) { + runRhaiFeaturesTestWithConfig(t, RhaiFeatureConfig{ + EnableProgressionTracking: false, + EnableJitCheckpoint: true, + CheckpointOutputDir: "/workspace/checkpoints", + CheckpointSaveStrategy: "epoch", + CheckpointSaveTotalLimit: "3", + Accelerator: accelerator, + NumNodes: numNodes, + NumGpusPerNode: numGpusPerNode, + }) +} + +// RunRhaiFeaturesAllMultiGpuTest runs multi-GPU test with all RHAI features enabled +func RunRhaiFeaturesAllMultiGpuTest(t *testing.T, accelerator Accelerator, numNodes, numGpusPerNode int) { + runRhaiFeaturesTestWithConfig(t, RhaiFeatureConfig{ + EnableProgressionTracking: true, + EnableJitCheckpoint: true, + CheckpointOutputDir: "/workspace/checkpoints", + CheckpointSaveStrategy: "epoch", + CheckpointSaveTotalLimit: "3", + Accelerator: accelerator, + NumNodes: numNodes, + NumGpusPerNode: numGpusPerNode, }) } @@ -121,11 +171,20 @@ func runRhaiFeaturesTestWithConfig(t *testing.T, config RhaiFeatureConfig) { // ClusterRoleBinding for cluster-scoped resources (ClusterTrainingRuntimes) - minimal get/list/watch access trainerutils.CreateUserClusterRoleBindingForTrainerRuntimes(test, userName) - // Create ConfigMap with notebook + // Create ConfigMap with notebook and install script localPath := rhaiFeaturesNotebookPath nb, err := os.ReadFile(localPath) test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read notebook: %s", localPath)) - cm := CreateConfigMap(test, namespace.Name, map[string][]byte{rhaiFeaturesNotebookName: nb}) + + // Read the kubeflow install helper script + installScriptPath := "resources/disconnected_env/install_kubeflow.py" + installScript, err := os.ReadFile(installScriptPath) + test.Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("failed to read install script: %s", installScriptPath)) + + cm := CreateConfigMap(test, namespace.Name, map[string][]byte{ + rhaiFeaturesNotebookName: nb, + "install_kubeflow.py": installScript, + }) // Create shared RWX PVC for distributed training (HF cache shared across nodes) storageClass, err := GetRWXStorageClass(test) @@ -152,6 +211,73 @@ func runRhaiFeaturesTestWithConfig(t *testing.T, config RhaiFeatureConfig) { } } + // S3/MinIO configuration for disconnected environments (optional) + s3Endpoint, _ := GetStorageBucketDefaultEndpoint() + s3AccessKey, _ := GetStorageBucketAccessKeyId() + s3SecretKey, _ := GetStorageBucketSecretKey() + s3Bucket, _ := GetStorageBucketName() + modelS3Prefix := os.Getenv("MODEL_S3_PREFIX") + if modelS3Prefix == "" { + modelS3Prefix = "models/distilgpt2" + } + datasetS3Prefix := os.Getenv("DATASET_S3_PREFIX") + if datasetS3Prefix == "" { + datasetS3Prefix = "alpaca-cleaned-datasets" + } + + // Build S3 export commands (only if configured) + s3Exports := "" + if s3Endpoint != "" && s3Bucket != "" { + test.T().Logf("S3 mode: endpoint=%s, bucket=%s", s3Endpoint, s3Bucket) + s3Exports = fmt.Sprintf( + "export AWS_DEFAULT_ENDPOINT='%s'; "+ + "export AWS_ACCESS_KEY_ID='%s'; "+ + "export AWS_SECRET_ACCESS_KEY='%s'; "+ + "export AWS_STORAGE_BUCKET='%s'; "+ + "export MODEL_S3_PREFIX='%s'; "+ + "export DATASET_S3_PREFIX='%s'; ", + s3Endpoint, s3AccessKey, s3SecretKey, s3Bucket, modelS3Prefix, datasetS3Prefix, + ) + } else { + test.T().Log("HuggingFace mode: S3 not configured, will download from HF Hub") + } + + // PyPI mirror configuration for disconnected environments (optional) + pipIndexUrl := os.Getenv("PIP_INDEX_URL") + pipExports := "" + pipInstallFlags := "" + if pipIndexUrl != "" { + test.T().Logf("PyPI mirror: %s", pipIndexUrl) + // Extract hostname for trusted-host + pipTrustedHost := os.Getenv("PIP_TRUSTED_HOST") + if pipTrustedHost == "" { + // Extract hostname from URL + pipTrustedHost = strings.TrimPrefix(strings.TrimPrefix(pipIndexUrl, "https://"), "http://") + if idx := strings.Index(pipTrustedHost, ":"); idx > 0 { + pipTrustedHost = pipTrustedHost[:idx] + } else if idx := strings.Index(pipTrustedHost, "/"); idx > 0 { + pipTrustedHost = pipTrustedHost[:idx] + } + } + pipExports = fmt.Sprintf( + "export PIP_INDEX_URL='%s'; "+ + "export PIP_TRUSTED_HOST='%s'; "+ + "export PYTHONHTTPSVERIFY='0'; ", + pipIndexUrl, pipTrustedHost, + ) + pipInstallFlags = fmt.Sprintf("--index-url '%s' --trusted-host '%s' ", pipIndexUrl, pipTrustedHost) + } + + // Set defaults for num_nodes and num_gpus_per_node if not specified + numNodes := config.NumNodes + if numNodes <= 0 { + numNodes = 2 + } + numGpusPerNode := config.NumGpusPerNode + if numGpusPerNode <= 0 { + numGpusPerNode = 1 + } + shellCmd := fmt.Sprintf( "set -e; "+ "export IPYTHONDIR='/tmp/.ipython'; "+ @@ -166,8 +292,12 @@ func runRhaiFeaturesTestWithConfig(t *testing.T, config RhaiFeatureConfig) { "export CHECKPOINT_SAVE_TOTAL_LIMIT='%s'; "+ "export GPU_RESOURCE_LABEL='%s'; "+ "export TRAINING_RUNTIME='%s'; "+ - "python -m pip install --quiet --no-cache-dir papermill && "+ - "python -m pip install --quiet --no-cache-dir git+https://github.com/opendatahub-io/kubeflow-sdk.git@main && "+ + "export NUM_NODES='%d'; "+ + "export NUM_GPUS_PER_NODE='%d'; "+ + "%s"+ // S3 exports (if configured) + "%s"+ // PyPI exports (if configured) + "python -m pip install --quiet --no-cache-dir %s papermill && "+ + "python /opt/app-root/notebooks/install_kubeflow.py && "+ "python -m papermill -k python3 /opt/app-root/notebooks/%s /opt/app-root/src/out.ipynb --log-output; "+ "sleep infinity", GetOpenShiftApiUrl(test), userToken, namespace.Name, sharedPVC.Name, @@ -178,10 +308,16 @@ func runRhaiFeaturesTestWithConfig(t *testing.T, config RhaiFeatureConfig) { config.CheckpointSaveTotalLimit, gpuResourceLabel, trainingRuntime, + numNodes, + numGpusPerNode, + s3Exports, + pipExports, + pipInstallFlags, rhaiFeaturesNotebookName, ) - test.T().Logf("Feature config: ProgressionTracking=%v, JitCheckpoint=%v, Accelerator=%s", config.EnableProgressionTracking, config.EnableJitCheckpoint, config.Accelerator.Type) + test.T().Logf("Feature config: ProgressionTracking=%v, JitCheckpoint=%v, Accelerator=%s, NumNodes=%d, NumGpusPerNode=%d", + config.EnableProgressionTracking, config.EnableJitCheckpoint, config.Accelerator.Type, numNodes, numGpusPerNode) command := []string{"/bin/sh", "-c", shellCmd} // Create Notebook CR using the RWX PVC