diff --git a/.github/workflows/integration-test-k8s.yml b/.github/workflows/integration-test-k8s.yml index 330c904eb..4aa4a52d6 100644 --- a/.github/workflows/integration-test-k8s.yml +++ b/.github/workflows/integration-test-k8s.yml @@ -28,6 +28,7 @@ jobs: strategy: fail-fast: false # Continue testing other profiles even if one fails matrix: + # Note: dynamo profile requires GPU, run manually with: make e2e-test-dynamo profile: [ai-gateway, aibrix, routing-strategies, llm-d, istio, production-stack] steps: diff --git a/config/intelligent-routing/in-tree/generic_categories.yaml b/config/intelligent-routing/in-tree/generic_categories.yaml index 3307d5037..ddf520ca6 100644 --- a/config/intelligent-routing/in-tree/generic_categories.yaml +++ b/config/intelligent-routing/in-tree/generic_categories.yaml @@ -23,8 +23,7 @@ categories: mmlu_categories: ["computer science", "engineering"] - name: finance mmlu_categories: ["economics"] - - name: politics - # If omitted, identity mapping applies when this name matches MMLU + - name: politics # If omitted, identity mapping applies when this name matches MMLU # Decisions define routing logic by combining rules and model selection decisions: diff --git a/deploy/kubernetes/dynamo/dynamo-resources/README.md b/deploy/kubernetes/dynamo/dynamo-resources/README.md new file mode 100644 index 000000000..9185a7f83 --- /dev/null +++ b/deploy/kubernetes/dynamo/dynamo-resources/README.md @@ -0,0 +1,660 @@ +# Dynamo E2E Testing Resources + +This directory contains Kubernetes manifests for E2E testing of **Semantic Router with NVIDIA Dynamo integration**. + +## ⚠️ GPU Requirements + +**This test requires a VM with at least 3 GPUs:** + +| Component | GPU | Description | +|-----------|-----|-------------| +| Frontend | GPU 0 | Dynamo Frontend (coordinates workers via etcd/NATS) | +| VLLMPrefillWorker | GPU 1 | Handles prefill phase of inference | +| VLLMDecodeWorker | GPU 2 | Handles decode phase of inference | + +The E2E framework automatically: + +1. Sets Docker runtime to `nvidia` (required for GPU passthrough) +2. Creates Kind cluster with GPU support +3. Copies NVIDIA libraries to the Kind worker node +4. Deploys the NVIDIA device plugin +5. Restores Docker runtime to default after tests complete + +## What We Test + +✅ **What IS Tested:** + +- Dynamo CRD deployment (`DynamoGraphDeployment`) +- Dynamo Frontend coordination with workers via etcd/NATS +- Semantic Router ExtProc integration with Dynamo +- RBAC for accessing Dynamo CRDs +- Request routing through Dynamo Frontend +- **Real vLLM inference with GPU** (TinyLlama/TinyLlama-1.1B-Chat-v1.0) +- Disaggregated serving (Prefill + Decode workers) +- GPU utilization and memory management +- Model name rewriting via Semantic Router + +## Complete Architecture & Request Flow + +### High-Level Overview + +``` +┌─────────────────────────────────────────────────────────────────────────┐ +│ CLIENT (You) │ +│ curl -X POST http://localhost:8080/v1/chat/completions │ +│ -d '{"model": "MoM", "messages": [...]}' │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ ENVOY GATEWAY (Gateway Proxy) │ +│ • Namespace: envoy-gateway-system │ +│ • Service: envoy-default-semantic-router-31cbd78c:80 │ +│ • Role: API Gateway, routes traffic, applies policies │ +│ • EnvoyPatchPolicy: ENABLED (critical for ExtProc) │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ SEMANTIC ROUTER (ExtProc Filter) │ +│ • Namespace: vllm-semantic-router-system │ +│ • Service: semantic-router-ext-proc:9002 │ +│ • Role: Intelligent request routing & classification │ +│ │ +│ Processing Steps: │ +│ 1. Receives request from Envoy (ExtProc protocol) │ +│ 2. Parses body: {"model": "MoM", "messages": [...]} │ +│ 3. Classifies query: "What is 2+2?" → category="math" (93.3%) │ +│ 4. Looks up category in config: │ +│ categories: │ +│ - name: math │ +│ model_scores: │ +│ - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 │ +│ score: 1.0 │ +│ 5. Selects model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 (score: 1.0) │ +│ 6. Rewrites request: model="MoM" → model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" │ +│ 7. Returns modified request to Envoy │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ ENVOY GATEWAY (After ExtProc) │ +│ • Request now has: model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" │ +│ • Consults HTTPRoute for routing decision │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ GATEWAY API HTTPRoute (Routing Rules) │ +│ • Name: semantic-router-to-dynamo │ +│ • Namespace: default │ +│ • Rule: Match /v1/* → forward to Dynamo Frontend │ +│ • Backend: Service/vllm-frontend (dynamo-system:8000) │ +└─────────────────────────────────────────────────────────────────────────┘ + │ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ DYNAMO FRONTEND (Coordination Layer) │ +│ • Created by: DynamoGraphDeployment CRD │ +│ • Service: vllm-frontend │ +│ • Namespace: dynamo-system │ +│ • Role: Intelligent routing to workers via etcd/NATS │ +│ • Features: Request queuing, worker selection, coordination │ +└─────────────────────────────────────────────────────────────────────────┘ + │ │ + ▼ ▼ + ┌───────────────────────────┐ ┌───────────────────────────┐ + │ PREFILL WORKER (GPU 1) │ │ DECODE WORKER (GPU 2) │ + │ • Created by: Dynamo CRD │ │ • Created by: Dynamo CRD │ + │ • Namespace: dynamo-sys │ │ • Namespace: dynamo-sys │ + │ • Image: vllm-runtime │ │ • Image: vllm-runtime │ + │ • Model: TinyLlama │ │ • Model: TinyLlama │ + │ • Port: 9090 │ │ • Port: 9090 │ + │ • Coordination: etcd+NATS│ │ • Coordination: etcd+NATS│ + │ │ │ │ + │ Processing: │ │ Processing: │ + │ 1. Receives from Frontend│ │ 1. Receives from Frontend│ + │ 2. Prefill phase (KV) │ │ 2. Decode phase (tokens) │ + │ 3. Real GPU inference │ │ 3. Real GPU inference │ + │ 4. Returns via Frontend │ │ 4. Returns via Frontend │ + └───────────────────────────┘ └───────────────────────────┘ + │ │ + └──────────┬───────────────┘ + ▼ +┌─────────────────────────────────────────────────────────────────────────┐ +│ RESPONSE FLOW (Backwards) │ +│ │ +│ Worker → Service → HTTPRoute → Envoy Gateway │ +│ │ │ +│ ▼ │ +│ SEMANTIC ROUTER (Response Processing) │ +│ • Intercepts response via ExtProc │ +│ • Logs usage metrics │ +│ • Updates cache │ +│ • Returns to Envoy │ +│ │ │ +│ ▼ │ +│ ENVOY GATEWAY │ +│ • Forwards response to client │ +│ │ │ +│ ▼ │ +│ CLIENT │ +│ Receives: {"model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", "choices": [...]} │ +└─────────────────────────────────────────────────────────────────────────┘ +``` + +## Detailed Component Breakdown + +### 1. Envoy Gateway (API Gateway Layer) + +**What it is**: A Kubernetes-native API Gateway based on Envoy Proxy + +**Configuration**: + +- Deployed via Helm with custom values (`envoy-gateway-values.yaml`) +- **Critical setting**: `extensionApis.enableEnvoyPatchPolicy: true` +- Without this, EnvoyPatchPolicy resources are rejected! + +**Role**: + +- Entry point for all HTTP traffic +- Applies routing rules from Gateway API resources +- Integrates with external processors (ExtProc) like Semantic Router +- Handles TLS termination, rate limiting, etc. + +**How it works**: + +1. Receives client request on port 80 +2. Checks EnvoyPatchPolicy for custom filters +3. Calls Semantic Router ExtProc service (if configured) +4. Applies HTTPRoute rules to forward to backend +5. Returns response to client + +### 2. Semantic Router (Intelligent Routing Layer) + +**What it is**: An AI-powered router that classifies queries and routes to optimal models + +**Configuration**: + +- Config: `../semantic-router/config.yaml` +- Values: `../semantic-router-values/values.yaml` + +**Role**: + +- Intercepts requests via Envoy ExtProc protocol +- Classifies user queries into categories (math, science, general, etc.) +- Selects the best model based on category and scores +- Rewrites the `model` field in the request +- Logs routing decisions and metrics + +**Classification Models**: + +- Category classifier: `models/category_classifier_modernbert-base_model` +- Embedding model: `models/all-MiniLM-L12-v2` +- Jailbreak detector: `models/jailbreak_classifier_modernbert-base_model` + +**How it works**: + +1. Envoy sends request to ExtProc gRPC service on port 9002 +2. Semantic Router parses JSON body +3. Runs classification: query text → category (with confidence score) +4. Looks up category in config to find best model +5. Rewrites `model` field: "MoM" → "TinyLlama/TinyLlama-1.1B-Chat-v1.0" +6. Returns modified request to Envoy +7. On response path: logs metrics, updates cache + +**Example Classification**: + +``` +Query: "What is 2+2?" + ↓ +Category: "math" (confidence: 93.3%) + ↓ +Config lookup: + categories: + - name: math + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 1.0 + ↓ +Selected: TinyLlama/TinyLlama-1.1B-Chat-v1.0 +``` + +### 3. Gateway API Resources (Routing Configuration) + +**What it is**: Kubernetes-native API for configuring traffic routing + +**Resources deployed** (`gwapi-resources.yaml`): + +#### a. GatewayClass + +- Name: `envoy-gateway` +- Controller: `gateway.envoyproxy.io/gatewayclass-controller` +- Defines the type of Gateway implementation + +#### b. Gateway + +- Name: `semantic-router` +- Namespace: `default` +- Listeners: HTTP on port 80 +- Links to GatewayClass and allows HTTPRoutes + +#### c. HTTPRoute + +- Name: `semantic-router-to-dynamo` +- Matches: All paths starting with `/v1/` +- Backend: `vllm-frontend` (dynamo-system:8000) +- Routes requests through Dynamo Frontend (created by DynamoGraphDeployment) + +#### d. EnvoyPatchPolicy + +- Name: `semantic-router-extproc-patch-policy` +- Applies ExtProc filter to Gateway +- Configures Semantic Router as external processor +- **Status must be "Accepted: True"** or ExtProc won't work! + +#### e. ReferenceGrant + +- Allows HTTPRoute in `default` namespace to reference Service in `dynamo-system` +- Required for cross-namespace routing + +### 4. Dynamo Platform (Coordination Layer) + +**What it is**: NVIDIA's distributed inference platform + +**Components** (deployed via Helm chart `dynamo-platform`): + +#### a. etcd + +- Distributed key-value store +- Stores worker state, model metadata, KV cache mappings +- Workers register themselves in etcd +- Enables KV cache sharing across workers + +#### b. NATS + +- Message queue for asynchronous communication +- Workers subscribe to model-specific topics +- Router publishes requests to appropriate topics +- Enables dynamic load balancing and request batching + +#### c. Dynamo Operator + +- Kubernetes operator that manages Dynamo lifecycle +- Watches for DynamoGraphDeployment CRDs (**actively used in E2E**) +- Creates Frontend and Worker deployments/services +- Manages worker scaling and health + +**Role in E2E**: + +- ✅ **Dynamo Operator**: Actively used to create Frontend and Workers from CRD +- ✅ **Dynamo Frontend**: Coordinates routing to workers via etcd/NATS +- ✅ **etcd**: Used for worker registration and coordination +- ✅ **NATS**: Used for message queuing between Frontend and Workers +- ✅ **DynamoGraphDeployment CRD**: Defines Frontend and Worker specifications +- ✅ **Real vLLM Inference**: GPU-enabled with TinyLlama model + +### 5. DynamoGraphDeployment CRD (Dynamo Configuration) + +**What it is**: Kubernetes CRD that defines Dynamo Frontend and Workers + +**Configuration** (`dynamo-graph-deployment.yaml`): + +- **CRD**: `nvidia.com/v1alpha1/DynamoGraphDeployment` +- **Name**: `vllm` +- **Namespace**: `dynamo-system` + +**Frontend Service** (GPU 0): + +- Image: `nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1` +- Command: `python3 -m dynamo.frontend --http-port 8000` +- Replicas: 1 +- Role: HTTP API server, coordinates requests to workers via etcd/NATS +- Service Name (created by operator): `vllm-frontend` + +**VLLMPrefillWorker Service** (GPU 1): + +- **Image**: `nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1` +- **Command**: `python3 -m dynamo.vllm --model TinyLlama/TinyLlama-1.1B-Chat-v1.0 --is-prefill-worker` +- **Replicas**: 1 +- **Role**: Handles prefill phase (prompt processing, KV cache generation) +- Service Name (created by operator): `vllm-vllmprefillworker` + +**VLLMDecodeWorker Service** (GPU 2): + +- **Image**: `nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1` +- **Command**: `python3 -m dynamo.vllm --model TinyLlama/TinyLlama-1.1B-Chat-v1.0` +- **Replicas**: 1 +- **Role**: Handles decode phase (token generation) +- Service Name (created by operator): `vllm-vllmdecodeworker` + +### 6. RBAC (Permissions) + +**What it is**: Role-based access control for Semantic Router + +**Configuration** (`rbac.yaml`): + +- **ClusterRole**: `dynamo-extproc-access` + - Allows Semantic Router to `get`, `list`, `watch` DynamoGraphDeployments + - Allows access to pods, services, endpoints for monitoring +- **ClusterRoleBinding**: Links role to `semantic-router` ServiceAccount +- **ServiceAccount**: `semantic-router` (in `vllm-semantic-router-system`) + +**Why needed**: Semantic Router may query Dynamo CRDs for routing decisions + +- Port: 8000 +- Selector: `app=vllm-worker-demo` +- Load balances across 2 pods (round-robin) + +**How workers process requests** (disaggregated serving): + +1. Frontend receives POST to `/v1/chat/completions` +2. Routes to Prefill Worker via etcd/NATS +3. Prefill Worker processes prompt, generates KV cache +4. Decode Worker receives KV cache, generates tokens +5. Response flows back through Frontend to client + +### 6. Kubernetes Service (Load Balancing Layer) + +**What it is**: Kubernetes native load balancer + +**How it works**: + +- Service selector matches pods with `app=vllm-worker-demo` +- Maintains list of healthy pod IPs (Endpoints) +- Uses round-robin to distribute requests +- Health checks via readiness probes +- Automatic failover if pod becomes unhealthy + +**Why this matters**: + +- Without service: Client needs to know all pod IPs +- With service: Single stable endpoint, automatic load balancing +- Enables horizontal scaling (add more replicas) + +## What This Tests + +### ✅ Semantic Router Capabilities: + +1. **Request classification** - Classifies "What is 2+2?" → "math" +2. **Model selection** - Selects TinyLlama/TinyLlama-1.1B-Chat-v1.0 based on category +3. **Request rewriting** - Changes model="MoM" → model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" +4. **ExtProc integration** - Works with Envoy Gateway via gRPC +5. **Response processing** - Logs metrics, updates cache +6. **Fallback behavior** - Uses default_model when no category matches + +### ✅ Infrastructure Capabilities: + +1. **Load balancing** - Requests distributed across 2 workers +2. **Service discovery** - HTTPRoute finds workers via Service +3. **High availability** - One worker fails, traffic goes to the other +4. **Cross-namespace routing** - default → dynamo-system via ReferenceGrant +5. **EnvoyPatchPolicy** - Custom filters applied to Gateway + +## Step-by-Step Request Flow Example + +Let's trace a single request through the entire system: + +### Request: "What is 2+2?" + +``` +STEP 1: Client sends request + POST http://localhost:8080/v1/chat/completions + Body: {"model": "MoM", "messages": [{"role": "user", "content": "What is 2+2?"}]} + ↓ + +STEP 2: Port-forward routes to Envoy Gateway pod + kubectl port-forward → envoy-gateway-system/envoy-xxx:80 + ↓ + +STEP 3: Envoy Gateway receives request + • Checks Gateway resource for listeners + • Finds EnvoyPatchPolicy: semantic-router-extproc-patch-policy + • Configures ExtProc filter pointing to semantic-router-ext-proc:9002 + ↓ + +STEP 4: Envoy calls Semantic Router (ExtProc request phase) + gRPC call to semantic-router-ext-proc:9002 + Sends: HTTP headers + JSON body + ↓ + +STEP 5: Semantic Router processes request + a) Parse body: model="MoM", query="What is 2+2?" + b) Run jailbreak detection: BENIGN ✅ + c) Run category classification: + - Input: "What is 2+2?" + - Model: category_classifier_modernbert-base_model + - Output: category="math", confidence=0.933 + d) Look up category in config.yaml: + categories: + - name: math + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 1.0 + e) Select model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 (highest score) + f) Rewrite request body: model="MoM" → model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" + g) Return modified request to Envoy + ↓ + +STEP 6: Envoy routes request using HTTPRoute + • HTTPRoute "dynamo-worker-route" matches path /v1/* + • Backend: Service/vllm-worker-demo-svc in dynamo-system namespace + • ReferenceGrant allows cross-namespace reference + ↓ + +STEP 7: Kubernetes Service load balances + • Service: vllm-worker-demo-svc + • Endpoints: [worker-pod-1:8000, worker-pod-2:8000] + • Algorithm: Round-robin + • Selects: worker-pod-1 (assume) + ↓ + +STEP 8: Worker Pod 1 receives request + POST /v1/chat/completions + Body: {"model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", "messages": [...]} ← rewritten! + a) Validates: model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" matches configured model ✅ + b) Simulator generates random text response + c) Returns JSON: + { + "id": "chatcmpl-xxx", + "model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", + "choices": [{ + "message": {"role": "assistant", "content": "4"} + }], + "usage": {"prompt_tokens": 15, "completion_tokens": 1, "total_tokens": 16} + } + ↓ + +STEP 9: Response flows back through Service → HTTPRoute → Envoy + ↓ + +STEP 10: Envoy calls Semantic Router (ExtProc response phase) + gRPC call to semantic-router-ext-proc:9002 + Sends: Response headers + JSON body + ↓ + +STEP 11: Semantic Router processes response + a) Extract usage metrics: prompt_tokens=15, completion_tokens=1 + b) Log metrics: llm_usage event with model, tokens, latency + c) Update cache: Store (query, response) for future cache hits + d) Return response unmodified to Envoy + ↓ + +STEP 12: Envoy returns response to client + ↓ + +STEP 13: Client receives final response + { + "model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", ← NOT "MoM" anymore! + "choices": [...], + "usage": {...} + } +``` + +### Key Observations + +1. **Model rewriting happened**: Client sent "MoM", worker received "TinyLlama/TinyLlama-1.1B-Chat-v1.0" +2. **Classification worked**: Query classified as "math" with 93.3% confidence +3. **Load balancing worked**: One of 2 workers processed the request +4. **Metrics logged**: Semantic Router tracked tokens, latency, model used +5. **Cache updated**: Next identical query will be a cache hit + +### Components + +1. **Envoy Gateway** (deployed via Helm with custom values): + - `envoy-gateway-values.yaml`: Enables `extensionApis.enableEnvoyPatchPolicy: true` + - **Critical**: EnvoyPatchPolicy MUST be enabled for Semantic Router ExtProc to work + +2. **Dynamo Platform** (deployed via Helm): + - etcd: Distributed key-value store for coordination + - NATS: Message queue for request routing + - Dynamo Operator: Manages Dynamo lifecycle + +3. **DynamoGraphDeployment** (`dynamo-graph-deployment.yaml`): + - Frontend: HTTP API server on port 8000 (GPU 0) + - VLLMPrefillWorker: Prefill phase worker (GPU 1) + - VLLMDecodeWorker: Decode phase worker (GPU 2) + - Model: `TinyLlama/TinyLlama-1.1B-Chat-v1.0` + +4. **Gateway API Resources** (`gwapi-resources.yaml`): + - HTTPRoute routing traffic to Dynamo Frontend + - Semantic Router integration via ExtProc + - EnvoyPatchPolicy for request/response interception + +## GPU-Enabled Testing + +This E2E profile uses **real vLLM inference with GPU** instead of simulators: + +- **Real GPU inference** - Tests actual model loading and inference +- **Disaggregated serving** - Prefill and Decode workers on separate GPUs +- **TinyLlama model** - Uses `TinyLlama/TinyLlama-1.1B-Chat-v1.0` for fast testing +- **Full Dynamo stack** - Frontend coordinates workers via etcd/NATS + +**Note:** This requires a VM with at least 3 GPUs available. + +## Deployment + +The E2E profile automatically deploys these resources in this order: + +1. Dynamo Platform (Helm: `dynamo-crds` + `dynamo-platform`) +2. Worker Pool (2 replicas) +3. Gateway API Resources + +## Testing Dynamo Functionality + +With 2 worker replicas, you can test: + +- **Load balancing** - Requests distributed across workers +- **Dynamic batching** - Multiple requests batched together +- **Failover** - One worker fails, traffic goes to the other +- **KV cache coordination** - Workers share KV cache state via etcd + +## Production Deployment + +For production with larger models, update the deployment: + +```yaml +image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1 +command: ["python3", "-m", "dynamo.vllm"] +args: + - --model + - meta-llama/Llama-3-8b-hf + - --tensor-parallel-size + - "1" + - --enforce-eager +resources: + requests: + nvidia.com/gpu: 1 # Or more for tensor parallelism +``` + +**Note:** The E2E test uses `TinyLlama/TinyLlama-1.1B-Chat-v1.0` for faster testing. + +## Manual Testing + +After deployment, test the Semantic Router + Dynamo integration: + +### 1. Port Forward to Envoy Gateway + +```bash +kubectl port-forward -n envoy-gateway-system service/envoy-default-semantic-router-31cbd78c 8080:80 +``` + +### 2. Send Test Request with "MoM" Model + +**IMPORTANT**: Use `/v1/chat/completions` endpoint (not `/v1/completions`): + +```bash +curl -X POST http://localhost:8080/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "MoM", + "messages": [ + { + "role": "user", + "content": "What is 2+2?" + } + ] + }' +``` + +### 3. Verify the Response + +```json +{ + "model": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", // ← Rewritten from "MoM" ✅ + "usage": { + "prompt_tokens": 15, + "completion_tokens": 54, + "total_tokens": 69 + }, + "choices": [...] +} +``` + +**Success indicators:** + +- ✅ Request sent with `model="MoM"` +- ✅ Response shows `model="TinyLlama/TinyLlama-1.1B-Chat-v1.0"` (rewritten by Semantic Router) +- ✅ No "model does not exist" error + +### 4. Check Semantic Router Logs + +```bash +# See classification and routing decisions +kubectl logs -n vllm-semantic-router-system deployment/semantic-router -f | grep -E "category|routing_decision" +``` + +Expected log output: + +``` +Classified as category: math (confidence=0.933) +Selected model TinyLlama/TinyLlama-1.1B-Chat-v1.0 for category math with score 1.0000 +routing_decision: original_model="MoM", selected_model="TinyLlama/TinyLlama-1.1B-Chat-v1.0" +``` + +### 5. Verify EnvoyPatchPolicy is Enabled + +```bash +kubectl get envoypatchpolicy -n default -o yaml | grep -A 5 "status:" +``` + +Expected status: + +```yaml +status: + conditions: + - type: Accepted + status: "True" # ← Must be True! + - type: Programmed + status: "True" +``` + +If `Accepted: False` with message "EnvoyPatchPolicy is disabled", the Envoy Gateway was not deployed with the correct values file. + +## Files + +- `dynamo-graph-deployment.yaml` - DynamoGraphDeployment CRD (Frontend + Prefill Worker + Decode Worker with GPU) +- `rbac.yaml` - RBAC permissions for Semantic Router to access Dynamo CRDs +- `gwapi-resources.yaml` - Gateway, GatewayClass, HTTPRoute, EnvoyPatchPolicy, ReferenceGrant +- `envoy-gateway-values.yaml` - Envoy Gateway Helm values (enables EnvoyPatchPolicy) +- `README.md` - This file (you are here) diff --git a/deploy/kubernetes/dynamo/dynamo-resources/dynamo-graph-deployment.yaml b/deploy/kubernetes/dynamo/dynamo-resources/dynamo-graph-deployment.yaml new file mode 100644 index 000000000..4e10ca221 --- /dev/null +++ b/deploy/kubernetes/dynamo/dynamo-resources/dynamo-graph-deployment.yaml @@ -0,0 +1,237 @@ +--- +# Disaggregated vLLM Deployment for Dynamo +# GPU-enabled configuration for Kind cluster with NVIDIA support +# +# Architecture: +# Frontend: HTTP API server (GPU 0) +# VLLMPrefillWorker: Specialized prefill-only worker (GPU 1) +# VLLMDecodeWorker: Specialized decode-only worker (GPU 2) +# +# GPU Allocation (4 GPUs total): +# GPU 0: Frontend +# GPU 1: Prefill Worker +# GPU 2: Decode Worker +# GPU 3: (spare) +apiVersion: nvidia.com/v1alpha1 +kind: DynamoGraphDeployment +metadata: + name: vllm + namespace: dynamo-system +spec: + backendFramework: vllm + envs: + - name: DYN_LOG + value: "info" + services: + # Frontend - HTTP API server + Frontend: + dynamoNamespace: dynamo-vllm + componentType: frontend + replicas: 1 + resources: + requests: + cpu: "1" + memory: "4Gi" + gpu: "1" + limits: + cpu: "2" + memory: "8Gi" + gpu: "1" + extraPodSpec: + mainContainer: + image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1 + command: + - /bin/sh + - -c + args: + - "sleep 15 && export CUDA_VISIBLE_DEVICES=0 && export LD_LIBRARY_PATH=/nvidia-driver-libs:/usr/local/cuda/lib64:$LD_LIBRARY_PATH && python3 -m dynamo.frontend --http-port 8000" + securityContext: + privileged: true + livenessProbe: + tcpSocket: + port: 8000 + initialDelaySeconds: 60 + periodSeconds: 30 + failureThreshold: 5 + readinessProbe: + tcpSocket: + port: 8000 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 10 + startupProbe: + tcpSocket: + port: 8000 + initialDelaySeconds: 20 + periodSeconds: 10 + failureThreshold: 30 + env: + - name: ETCD_ENDPOINTS + value: "dynamo-platform-etcd.dynamo-system.svc.cluster.local:2379" + - name: NATS_URL + value: "nats://dynamo-platform-nats.dynamo-system.svc.cluster.local:4222" + - name: NATS_SERVER + value: "nats://dynamo-platform-nats.dynamo-system:4222" + - name: DYN_SYSTEM_ENABLED + value: "true" + - name: DYN_SYSTEM_PORT + value: "9090" + - name: LD_LIBRARY_PATH + value: "/nvidia-driver-libs:/usr/local/cuda/lib64" + - name: NVIDIA_DRIVER_CAPABILITIES + value: "compute,utility" + volumeMounts: + - name: nvidia-driver-libs + mountPath: /nvidia-driver-libs + readOnly: true + - name: dev + mountPath: /dev + volumes: + - name: nvidia-driver-libs + hostPath: + path: /nvidia-driver-libs + - name: dev + hostPath: + path: /dev + + # VLLMPrefillWorker - Specialized prefill-only worker (GPU 1) + VLLMPrefillWorker: + dynamoNamespace: dynamo-vllm + componentType: worker + replicas: 1 + resources: + requests: + cpu: "1" + memory: "4Gi" + gpu: "1" + limits: + cpu: "2" + memory: "8Gi" + gpu: "1" + extraPodSpec: + mainContainer: + image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1 + command: + - /bin/sh + - -c + args: + - "sleep 15 && export CUDA_VISIBLE_DEVICES=1 && export LD_LIBRARY_PATH=/nvidia-driver-libs:/usr/local/cuda/lib64:$LD_LIBRARY_PATH && python3 -m dynamo.vllm --model TinyLlama/TinyLlama-1.1B-Chat-v1.0 --tensor-parallel-size 1 --enforce-eager --is-prefill-worker --connector null" + securityContext: + privileged: true + livenessProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 180 + periodSeconds: 30 + failureThreshold: 5 + readinessProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 120 + periodSeconds: 10 + failureThreshold: 10 + startupProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 + env: + - name: ETCD_ENDPOINTS + value: "dynamo-platform-etcd.dynamo-system.svc.cluster.local:2379" + - name: NATS_URL + value: "nats://dynamo-platform-nats.dynamo-system.svc.cluster.local:4222" + - name: NATS_SERVER + value: "nats://dynamo-platform-nats.dynamo-system:4222" + - name: DYN_SYSTEM_ENABLED + value: "true" + - name: DYN_SYSTEM_PORT + value: "9090" + - name: LD_LIBRARY_PATH + value: "/nvidia-driver-libs:/usr/local/cuda/lib64" + - name: NVIDIA_DRIVER_CAPABILITIES + value: "compute,utility" + volumeMounts: + - name: nvidia-driver-libs + mountPath: /nvidia-driver-libs + readOnly: true + - name: dev + mountPath: /dev + volumes: + - name: nvidia-driver-libs + hostPath: + path: /nvidia-driver-libs + - name: dev + hostPath: + path: /dev + + # VLLMDecodeWorker - Specialized decode-only worker (GPU 2) + VLLMDecodeWorker: + dynamoNamespace: dynamo-vllm + componentType: worker + replicas: 1 + resources: + requests: + cpu: "1" + memory: "4Gi" + gpu: "1" + limits: + cpu: "2" + memory: "8Gi" + gpu: "1" + extraPodSpec: + mainContainer: + image: nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1 + command: + - /bin/sh + - -c + args: + - "sleep 15 && export CUDA_VISIBLE_DEVICES=2 && export LD_LIBRARY_PATH=/nvidia-driver-libs:/usr/local/cuda/lib64:$LD_LIBRARY_PATH && python3 -m dynamo.vllm --model TinyLlama/TinyLlama-1.1B-Chat-v1.0 --tensor-parallel-size 1 --enforce-eager --connector null" + securityContext: + privileged: true + livenessProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 180 + periodSeconds: 30 + failureThreshold: 5 + readinessProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 120 + periodSeconds: 10 + failureThreshold: 10 + startupProbe: + tcpSocket: + port: 9090 + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 30 + env: + - name: ETCD_ENDPOINTS + value: "dynamo-platform-etcd.dynamo-system.svc.cluster.local:2379" + - name: NATS_URL + value: "nats://dynamo-platform-nats.dynamo-system.svc.cluster.local:4222" + - name: NATS_SERVER + value: "nats://dynamo-platform-nats.dynamo-system:4222" + - name: DYN_SYSTEM_ENABLED + value: "true" + - name: DYN_SYSTEM_PORT + value: "9090" + - name: LD_LIBRARY_PATH + value: "/nvidia-driver-libs:/usr/local/cuda/lib64" + - name: NVIDIA_DRIVER_CAPABILITIES + value: "compute,utility" + volumeMounts: + - name: nvidia-driver-libs + mountPath: /nvidia-driver-libs + readOnly: true + - name: dev + mountPath: /dev + volumes: + - name: nvidia-driver-libs + hostPath: + path: /nvidia-driver-libs + - name: dev + hostPath: + path: /dev diff --git a/deploy/kubernetes/dynamo/dynamo-resources/envoy-gateway-values.yaml b/deploy/kubernetes/dynamo/dynamo-resources/envoy-gateway-values.yaml new file mode 100644 index 000000000..8cbb77998 --- /dev/null +++ b/deploy/kubernetes/dynamo/dynamo-resources/envoy-gateway-values.yaml @@ -0,0 +1,8 @@ +# Envoy Gateway values for Dynamo E2E Testing +# Enables ExtensionAPIs (EnvoyPatchPolicy) for Semantic Router integration + +config: + envoyGateway: + extensionApis: + enableEnvoyPatchPolicy: true + diff --git a/deploy/kubernetes/dynamo/dynamo-resources/gwapi-resources.yaml b/deploy/kubernetes/dynamo/dynamo-resources/gwapi-resources.yaml new file mode 100644 index 000000000..0c4528a99 --- /dev/null +++ b/deploy/kubernetes/dynamo/dynamo-resources/gwapi-resources.yaml @@ -0,0 +1,162 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: semantic-router +spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller +--- +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyProxy +metadata: + name: semantic-router + namespace: default +spec: + provider: + type: Kubernetes + kubernetes: + envoyDeployment: + container: + resources: {} + logging: + level: + default: trace +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: semantic-router + namespace: default +spec: + gatewayClassName: semantic-router + listeners: + - name: http + protocol: HTTP + port: 80 + infrastructure: + parametersRef: + group: gateway.envoyproxy.io + kind: EnvoyProxy + name: semantic-router +--- +# By default, Envoy Gateway sets the buffer limit to 32kiB which is not sufficient for AI workloads. +# This ClientTrafficPolicy sets the buffer limit to 50MiB as an example. +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: ClientTrafficPolicy +metadata: + name: semantic-router + namespace: default +spec: + targetRefs: + - group: gateway.networking.k8s.io + kind: Gateway + name: semantic-router + connection: + bufferLimit: 50Mi +--- +# ReferenceGrant to allow cross-namespace service references +# This allows HTTPRoutes in the 'default' namespace to reference Services in 'dynamo-system' +apiVersion: gateway.networking.k8s.io/v1beta1 +kind: ReferenceGrant +metadata: + name: allow-default-to-dynamo + namespace: dynamo-system +spec: + from: + - group: gateway.networking.k8s.io + kind: HTTPRoute + namespace: default + to: + - group: "" + kind: Service +--- +# HTTPRoute that routes requests through Dynamo Frontend +# Note: Semantic Router processes requests via ExtProc filter before forwarding +# +# Routing Flow: +# Client → Envoy Gateway → Semantic Router (ExtProc) → Dynamo Frontend → Workers +# +# The Dynamo operator creates a service named: -frontend +# Based on DynamoGraphDeployment metadata.name: vllm +# Expected service name: vllm-frontend +# +# Reference: https://github.com/ai-dynamo/dynamo +apiVersion: gateway.networking.k8s.io/v1 +kind: HTTPRoute +metadata: + name: semantic-router-to-dynamo + namespace: default +spec: + parentRefs: + - name: semantic-router + kind: Gateway + group: gateway.networking.k8s.io + rules: + - matches: + - path: + type: PathPrefix + value: / + backendRefs: + # Route to Dynamo Frontend (created by DynamoGraphDeployment CRD) + # Frontend coordinates with workers via etcd/NATS for intelligent routing + - name: vllm-frontend + namespace: dynamo-system + port: 8000 + weight: 100 +--- +# EnvoyPatchPolicy to configure ExtProc filter for Semantic Router +apiVersion: gateway.envoyproxy.io/v1alpha1 +kind: EnvoyPatchPolicy +metadata: + name: semantic-router-extproc-patch-policy + namespace: default +spec: + jsonPatches: + - name: default/semantic-router/http + operation: + op: add + path: /default_filter_chain/filters/0/typed_config/http_filters/0 + value: + name: semantic-router-extproc + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_proc.v3.ExternalProcessor + allow_mode_override: true + grpcService: + envoyGrpc: + authority: semantic-router.vllm-semantic-router-system:50051 + clusterName: semantic-router + timeout: 60s + message_timeout: 60s + processing_mode: + request_body_mode: BUFFERED + request_header_mode: SEND + request_trailer_mode: SKIP + response_body_mode: BUFFERED + response_header_mode: SEND + response_trailer_mode: SKIP + type: type.googleapis.com/envoy.config.listener.v3.Listener + - name: semantic-router + operation: + op: add + path: '' + value: + connect_timeout: 60s + http2_protocol_options: {} + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: semantic-router + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: semantic-router.vllm-semantic-router-system.svc.cluster.local + port_value: 50051 + name: semantic-router + type: STRICT_DNS + type: type.googleapis.com/envoy.config.cluster.v3.Cluster + targetRef: + group: gateway.networking.k8s.io + kind: Gateway + name: semantic-router + type: JSONPatch + diff --git a/deploy/kubernetes/dynamo/dynamo-resources/rbac.yaml b/deploy/kubernetes/dynamo/dynamo-resources/rbac.yaml new file mode 100644 index 000000000..10a49a5af --- /dev/null +++ b/deploy/kubernetes/dynamo/dynamo-resources/rbac.yaml @@ -0,0 +1,39 @@ +--- +# RBAC for Semantic Router to access Dynamo CRDs and resources +# Similar to LLM-D profile's approach for testing integration +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: dynamo-extproc-access +rules: +# Access to Dynamo CRDs +- apiGroups: ["nvidia.com"] + resources: + - dynamographdeployments + - dynamographdeployments/status + verbs: ["get", "list", "watch"] + +# Access to worker pods for monitoring +- apiGroups: [""] + resources: ["pods", "services", "endpoints"] + verbs: ["get", "list", "watch"] + +# Access to deployments created by Dynamo operator +- apiGroups: ["apps"] + resources: ["deployments", "statefulsets"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: dynamo-extproc-access-binding +subjects: +# Semantic Router service account (created by Helm chart) +- kind: ServiceAccount + name: semantic-router + namespace: vllm-semantic-router-system +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: dynamo-extproc-access + diff --git a/deploy/kubernetes/dynamo/semantic-router-values/values.yaml b/deploy/kubernetes/dynamo/semantic-router-values/values.yaml new file mode 100644 index 000000000..08423f5a4 --- /dev/null +++ b/deploy/kubernetes/dynamo/semantic-router-values/values.yaml @@ -0,0 +1,114 @@ +# Semantic Router Configuration for Dynamo E2E Testing +# Simplified config without LoRA complexity - focuses on testing: +# 1. Semantic Router: Classification and routing (MoM → TinyLlama/TinyLlama-1.1B-Chat-v1.0) +# 2. Dynamo: Load balancing and batching across 2 worker replicas + +config: + model_config: + "TinyLlama/TinyLlama-1.1B-Chat-v1.0": {} # No reasoning family needed - let Dynamo handle load balancing + + # Simplified categories for E2E testing + # All route to TinyLlama/TinyLlama-1.1B-Chat-v1.0 (the model configured in Dynamo workers) + categories: + - name: math + system_prompt: "You are a mathematics expert. Provide step-by-step solutions." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 1.0 + use_reasoning: false + + - name: science + system_prompt: "You are a science expert covering biology, chemistry, and physics." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.9 + use_reasoning: false + + - name: general + system_prompt: "You are a helpful assistant." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.7 + use_reasoning: false + + - name: other + system_prompt: "You are a helpful assistant for general queries." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.6 + use_reasoning: false + + default_model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + + # Classifier configuration + bert_model: + model_id: models/all-MiniLM-L12-v2 + threshold: 0.6 + use_cpu: true + + classifier: + category_model: + model_id: "models/category_classifier_modernbert-base_model" + use_modernbert: true + threshold: 0.6 + use_cpu: true + category_mapping_path: "models/category_classifier_modernbert-base_model/category_mapping.json" + pii_model: + model_id: "models/pii_classifier_modernbert-base_presidio_token_model" + use_modernbert: true + threshold: 0.7 + use_cpu: true + pii_mapping_path: "models/pii_classifier_modernbert-base_presidio_token_model/pii_type_mapping.json" + + # Semantic cache for performance testing + semantic_cache: + enabled: true + backend_type: "memory" + similarity_threshold: 0.8 + max_entries: 1000 + ttl_seconds: 3600 + eviction_policy: "fifo" + use_hnsw: true + hnsw_m: 16 + hnsw_ef_construction: 200 + embedding_model: "bert" + + # Tools configuration + tools: + enabled: true + top_k: 3 + similarity_threshold: 0.2 + tools_db_path: "config/tools_db.json" + fallback_to_empty: true + + # Prompt guard + prompt_guard: + enabled: true + use_modernbert: true + model_id: "models/jailbreak_classifier_modernbert-base_model" + threshold: 0.7 + use_cpu: true + jailbreak_mapping_path: "models/jailbreak_classifier_modernbert-base_model/jailbreak_type_mapping.json" + + # Router configuration + router: + high_confidence_threshold: 0.99 + low_latency_threshold_ms: 2000 + + # Embedding models + embedding_models: + qwen3_model_path: "models/Qwen3-Embedding-0.6B" + gemma_model_path: "models/embeddinggemma-300m" + use_cpu: true + + # API Configuration + api: + batch_classification: + max_batch_size: 100 + concurrency_threshold: 5 + max_concurrency: 8 + + # Observability (disabled for E2E) + observability: + tracing: + enabled: false diff --git a/deploy/kubernetes/dynamo/semantic-router/config.yaml b/deploy/kubernetes/dynamo/semantic-router/config.yaml new file mode 100644 index 000000000..0c0e35ef4 --- /dev/null +++ b/deploy/kubernetes/dynamo/semantic-router/config.yaml @@ -0,0 +1,111 @@ +# Semantic Router configuration for Dynamo E2E Testing +# Simplified config without LoRA complexity - focuses on testing Dynamo load balancing/batching + +model_config: + "TinyLlama/TinyLlama-1.1B-Chat-v1.0": {} # No reasoning family needed - let Dynamo handle load balancing + +# Simplified categories for E2E testing +# All route to TinyLlama/TinyLlama-1.1B-Chat-v1.0 (the model configured in Dynamo workers) +categories: + - name: math + system_prompt: "You are a mathematics expert. Provide step-by-step solutions." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 1.0 + use_reasoning: false + + - name: science + system_prompt: "You are a science expert covering biology, chemistry, and physics." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.9 + use_reasoning: false + + - name: general + system_prompt: "You are a helpful assistant." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.7 + use_reasoning: false + + - name: other + system_prompt: "You are a helpful assistant for general queries." + model_scores: + - model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + score: 0.6 + use_reasoning: false + +default_model: TinyLlama/TinyLlama-1.1B-Chat-v1.0 + +# Classifier configuration +bert_model: + model_id: models/all-MiniLM-L12-v2 + threshold: 0.6 + use_cpu: true + +classifier: + category_model: + model_id: "models/category_classifier_modernbert-base_model" + use_modernbert: true + threshold: 0.6 + use_cpu: true + category_mapping_path: "models/category_classifier_modernbert-base_model/category_mapping.json" + pii_model: + model_id: "models/pii_classifier_modernbert-base_presidio_token_model" + use_modernbert: true + threshold: 0.7 + use_cpu: true + pii_mapping_path: "models/pii_classifier_modernbert-base_presidio_token_model/pii_type_mapping.json" + +# Semantic cache for performance testing +semantic_cache: + enabled: true + backend_type: "memory" + similarity_threshold: 0.8 + max_entries: 1000 + ttl_seconds: 3600 + eviction_policy: "fifo" + use_hnsw: true + hnsw_m: 16 + hnsw_ef_construction: 200 + embedding_model: "bert" + +# Tools configuration +tools: + enabled: true + top_k: 3 + similarity_threshold: 0.2 + tools_db_path: "config/tools_db.json" + fallback_to_empty: true + +# Prompt guard +prompt_guard: + enabled: true + use_modernbert: true + model_id: "models/jailbreak_classifier_modernbert-base_model" + threshold: 0.7 + use_cpu: true + jailbreak_mapping_path: "models/jailbreak_classifier_modernbert-base_model/jailbreak_type_mapping.json" + +# Router configuration +router: + high_confidence_threshold: 0.99 + low_latency_threshold_ms: 2000 + +# Embedding models +embedding_models: + qwen3_model_path: "models/Qwen3-Embedding-0.6B" + gemma_model_path: "models/embeddinggemma-300m" + use_cpu: true + +# API Configuration +api: + batch_classification: + max_batch_size: 100 + concurrency_threshold: 5 + max_concurrency: 8 + +# Observability (disabled for E2E) +observability: + tracing: + enabled: false diff --git a/deploy/kubernetes/istio/config.yaml b/deploy/kubernetes/istio/config.yaml index 7ff964fc2..a27f417e0 100644 --- a/deploy/kubernetes/istio/config.yaml +++ b/deploy/kubernetes/istio/config.yaml @@ -427,7 +427,7 @@ api: # - EmbeddingGemma-300M: Up to 8K context, fast inference, Matryoshka support (768/512/256/128) embedding_models: qwen3_model_path: "models/Qwen3-Embedding-0.6B" -# gemma_model_path: "models/embeddinggemma-300m" + # gemma_model_path: "models/embeddinggemma-300m" use_cpu: true # Set to false for GPU acceleration (requires CUDA) # Observability Configuration diff --git a/deploy/kubernetes/istio/vLlama3.yaml b/deploy/kubernetes/istio/vLlama3.yaml index 562bbbe32..9724c0a53 100644 --- a/deploy/kubernetes/istio/vLlama3.yaml +++ b/deploy/kubernetes/istio/vLlama3.yaml @@ -10,7 +10,7 @@ spec: resources: requests: storage: 40Gi -# storageClassName: default + # storageClassName: default volumeMode: Filesystem --- apiVersion: apps/v1 @@ -35,10 +35,10 @@ spec: persistentVolumeClaim: claimName: llama-8b # vLLM needs to access the host's shared memory for tensor parallel inference. - # - name: shm - # emptyDir: - # medium: Memory - # sizeLimit: "2Gi" + # - name: shm + # emptyDir: + # medium: Memory + # sizeLimit: "2Gi" containers: - name: llama-8b image: vllm/vllm-openai:latest @@ -66,8 +66,8 @@ spec: volumeMounts: - mountPath: /root/.cache/huggingface name: cache-volume - # - name: shm - # mountPath: /dev/shm + # - name: shm + # mountPath: /dev/shm livenessProbe: httpGet: path: /health diff --git a/deploy/kubernetes/istio/vPhi4.yaml b/deploy/kubernetes/istio/vPhi4.yaml index 303378a86..b5e3d2b17 100644 --- a/deploy/kubernetes/istio/vPhi4.yaml +++ b/deploy/kubernetes/istio/vPhi4.yaml @@ -10,7 +10,7 @@ spec: resources: requests: storage: 20Gi -# storageClassName: default + # storageClassName: default volumeMode: Filesystem --- apiVersion: apps/v1 @@ -35,10 +35,10 @@ spec: persistentVolumeClaim: claimName: phi4-mini # vLLM needs to access the host's shared memory for tensor parallel inference. - # - name: shm - # emptyDir: - # medium: Memory - # sizeLimit: "2Gi" + # - name: shm + # emptyDir: + # medium: Memory + # sizeLimit: "2Gi" containers: - name: phi4-mini image: vllm/vllm-openai:latest @@ -66,8 +66,8 @@ spec: volumeMounts: - mountPath: /root/.cache/huggingface name: cache-volume - # - name: shm - # mountPath: /dev/shm + # - name: shm + # mountPath: /dev/shm livenessProbe: httpGet: path: /health diff --git a/deploy/openshift/openwebui/pvc.yaml b/deploy/openshift/openwebui/pvc.yaml index b395b2fbe..e37233570 100644 --- a/deploy/openshift/openwebui/pvc.yaml +++ b/deploy/openshift/openwebui/pvc.yaml @@ -12,5 +12,5 @@ spec: resources: requests: storage: 2Gi - # Use default storage class for OpenShift - # storageClassName: "" +# Use default storage class for OpenShift +# storageClassName: "" diff --git a/e2e/cmd/e2e/main.go b/e2e/cmd/e2e/main.go index c7c484b1c..9d35102cc 100644 --- a/e2e/cmd/e2e/main.go +++ b/e2e/cmd/e2e/main.go @@ -12,6 +12,7 @@ import ( aigateway "github.com/vllm-project/semantic-router/e2e/profiles/ai-gateway" aibrix "github.com/vllm-project/semantic-router/e2e/profiles/aibrix" dynamicconfig "github.com/vllm-project/semantic-router/e2e/profiles/dynamic-config" + dynamo "github.com/vllm-project/semantic-router/e2e/profiles/dynamo" istio "github.com/vllm-project/semantic-router/e2e/profiles/istio" llmd "github.com/vllm-project/semantic-router/e2e/profiles/llm-d" productionstack "github.com/vllm-project/semantic-router/e2e/profiles/production-stack" @@ -20,6 +21,7 @@ import ( // Import profiles to register test cases _ "github.com/vllm-project/semantic-router/e2e/profiles/ai-gateway" _ "github.com/vllm-project/semantic-router/e2e/profiles/aibrix" + _ "github.com/vllm-project/semantic-router/e2e/profiles/dynamo" _ "github.com/vllm-project/semantic-router/e2e/profiles/istio" _ "github.com/vllm-project/semantic-router/e2e/profiles/llm-d" _ "github.com/vllm-project/semantic-router/e2e/profiles/production-stack" @@ -31,7 +33,7 @@ const version = "v1.0.0" func main() { // Parse command line flags var ( - profile = flag.String("profile", "ai-gateway", "Test profile to run (ai-gateway, istio, etc.)") + profile = flag.String("profile", "ai-gateway", "Test profile to run (ai-gateway, dynamo, istio, etc.)") clusterName = flag.String("cluster", "semantic-router-e2e", "Kind cluster name") imageTag = flag.String("image-tag", "e2e-test", "Docker image tag") keepCluster = flag.Bool("keep-cluster", false, "Keep cluster after tests complete") @@ -109,6 +111,8 @@ func getProfile(name string) (framework.Profile, error) { return aigateway.NewProfile(), nil case "dynamic-config": return dynamicconfig.NewProfile(), nil + case "dynamo": + return dynamo.NewProfile(), nil case "aibrix": return aibrix.NewProfile(), nil case "istio": diff --git a/e2e/pkg/cluster/kind.go b/e2e/pkg/cluster/kind.go index 4cddf229b..c1f01971f 100644 --- a/e2e/pkg/cluster/kind.go +++ b/e2e/pkg/cluster/kind.go @@ -11,8 +11,9 @@ import ( // KindCluster manages Kind cluster lifecycle type KindCluster struct { - Name string - Verbose bool + Name string + Verbose bool + GPUEnabled bool // Enable GPU support for the cluster } // NewKindCluster creates a new Kind cluster manager @@ -23,6 +24,11 @@ func NewKindCluster(name string, verbose bool) *KindCluster { } } +// SetGPUEnabled enables GPU support for the cluster +func (k *KindCluster) SetGPUEnabled(enabled bool) { + k.GPUEnabled = enabled +} + // Create creates a new Kind cluster func (k *KindCluster) Create(ctx context.Context) error { k.log("Creating Kind cluster: %s", k.Name) @@ -38,37 +44,33 @@ func (k *KindCluster) Create(ctx context.Context) error { return nil } - // Mount /mnt from host into Kind node so storage provisioner can use it (more disk space) - configContent := fmt.Sprintf(`kind: Cluster -apiVersion: kind.x-k8s.io/v1alpha4 -name: %s -nodes: - - role: control-plane - extraMounts: - - hostPath: /mnt - containerPath: /mnt - - role: worker - extraMounts: - - hostPath: /mnt - containerPath: /mnt -`, k.Name) + // If GPU enabled, verify Docker nvidia runtime first + if k.GPUEnabled { + if err := k.verifyNvidiaRuntime(ctx); err != nil { + return err + } + } - configFile, err := os.CreateTemp("", "kind-config-*.yaml") + // Create cluster config with /mnt mount for storage (and GPU support if enabled) + var cmd *exec.Cmd + configFile, err := k.createClusterConfig() if err != nil { - return fmt.Errorf("failed to create temp config file: %w", err) + return fmt.Errorf("failed to create cluster config: %w", err) } - defer os.Remove(configFile.Name()) - - if _, err := configFile.WriteString(configContent); err != nil { - configFile.Close() - return fmt.Errorf("failed to write config file: %w", err) + defer os.Remove(configFile) + + if k.GPUEnabled { + k.log("Creating cluster with GPU support and /mnt mount for storage...") + cmd = exec.CommandContext(ctx, "kind", "create", "cluster", + "--name", k.Name, + "--config", configFile, + "--wait", "5m") + } else { + k.log("Using Kind config with /mnt mount for storage") + cmd = exec.CommandContext(ctx, "kind", "create", "cluster", + "--name", k.Name, + "--config", configFile) } - configFile.Close() - - k.log("Using Kind config with /mnt mount for storage") - - // Create cluster with config file - cmd := exec.CommandContext(ctx, "kind", "create", "cluster", "--name", k.Name, "--config", configFile.Name()) if k.Verbose { cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr @@ -100,6 +102,13 @@ nodes: exec.CommandContext(ctx, "kubectl", "--kubeconfig", kubeConfig, "rollout", "restart", "deployment/local-path-provisioner", "-n", "local-path-storage").Run() + // If GPU enabled, setup NVIDIA libraries + if k.GPUEnabled { + if err := k.setupGPULibraries(ctx); err != nil { + return fmt.Errorf("failed to setup GPU libraries: %w", err) + } + } + k.log("Cluster %s created successfully", k.Name) return nil } @@ -192,3 +201,235 @@ func (k *KindCluster) log(format string, args ...interface{}) { fmt.Printf("[Kind] "+format+"\n", args...) } } + +// verifyNvidiaRuntime checks if Docker's default runtime is nvidia +func (k *KindCluster) verifyNvidiaRuntime(ctx context.Context) error { + k.log("Verifying Docker nvidia runtime...") + cmd := exec.CommandContext(ctx, "docker", "info") + output, err := cmd.Output() + if err != nil { + return fmt.Errorf("failed to get docker info: %w", err) + } + + if !strings.Contains(string(output), "Default Runtime: nvidia") { + k.log("ERROR: Docker default runtime is not nvidia!") + k.log("Run: sudo nvidia-ctk runtime configure --runtime=docker --set-as-default") + k.log("Then restart Docker: sudo systemctl restart docker") + return fmt.Errorf("docker default runtime must be nvidia for GPU support") + } + k.log("✅ Docker default runtime is nvidia") + return nil +} + +// createClusterConfig creates a Kind config file with /mnt mount for storage +// and optionally GPU support if GPUEnabled is true +func (k *KindCluster) createClusterConfig() (string, error) { + // Base config with /mnt mount for storage (always included) + kindConfig := fmt.Sprintf(`kind: Cluster +apiVersion: kind.x-k8s.io/v1alpha4 +name: %s +nodes: + - role: control-plane + extraMounts: + - hostPath: /mnt + containerPath: /mnt`, k.Name) + + // Add GPU mount to worker if GPU is enabled + if k.GPUEnabled { + kindConfig += ` + - role: worker + extraMounts: + - hostPath: /mnt + containerPath: /mnt + - hostPath: /dev/null + containerPath: /var/run/nvidia-container-devices/all +` + } else { + kindConfig += ` + - role: worker + extraMounts: + - hostPath: /mnt + containerPath: /mnt +` + } + + configFile, err := os.CreateTemp("", "kind-config-*.yaml") + if err != nil { + return "", fmt.Errorf("failed to create temp file: %w", err) + } + + if _, err := configFile.WriteString(kindConfig); err != nil { + configFile.Close() + os.Remove(configFile.Name()) + return "", fmt.Errorf("failed to write config: %w", err) + } + configFile.Close() + + return configFile.Name(), nil +} + +// setupGPULibraries copies NVIDIA libraries to the Kind worker +func (k *KindCluster) setupGPULibraries(ctx context.Context) error { + workerName := k.Name + "-worker" + + // Get driver version (same as script: nvidia-smi ... | head -1) + k.log("Detecting NVIDIA driver version...") + driverCmd := exec.CommandContext(ctx, "bash", "-c", "nvidia-smi --query-gpu=driver_version --format=csv,noheader | head -1") + driverOutput, err := driverCmd.Output() + if err != nil { + k.log("nvidia-smi not available, skipping GPU library setup") + return nil + } + driverVersion := strings.TrimSpace(string(driverOutput)) + // Remove any extra newlines/spaces + driverVersion = strings.Split(driverVersion, "\n")[0] + k.log("Detected NVIDIA driver version: %s", driverVersion) + + // Verify GPU devices exist in worker + checkGPU := exec.CommandContext(ctx, "docker", "exec", workerName, "ls", "/dev/nvidia0") + if err := checkGPU.Run(); err != nil { + return fmt.Errorf("GPU devices not found in Kind worker - cluster may not have GPU support") + } + k.log("✅ GPU devices found in Kind worker") + + // Check if libraries already exist + checkLibs := exec.CommandContext(ctx, "docker", "exec", workerName, "ls", "/nvidia-driver-libs/nvidia-smi") + if checkLibs.Run() == nil { + k.log("GPU libraries already set up") + return k.deployDevicePlugin(ctx) + } + + k.log("Setting up NVIDIA libraries in Kind worker...") + + // Create directory + mkdirCmd := exec.CommandContext(ctx, "docker", "exec", workerName, "mkdir", "-p", "/nvidia-driver-libs") + if err := mkdirCmd.Run(); err != nil { + return fmt.Errorf("failed to create nvidia-driver-libs directory: %w", err) + } + + // Copy nvidia-smi + copyNvidiaSmi := exec.CommandContext(ctx, "bash", "-c", + fmt.Sprintf("tar -cf - -C /usr/bin nvidia-smi | docker exec -i %s tar -xf - -C /nvidia-driver-libs/", workerName)) + if err := copyNvidiaSmi.Run(); err != nil { + return fmt.Errorf("failed to copy nvidia-smi: %w", err) + } + + // Copy NVIDIA libraries (same as all-in-one script from docs) + k.log("Copying NVIDIA libraries from /usr/lib64...") + copyLibsScript := "tar -cf - -C /usr/lib64 libnvidia-ml.so." + driverVersion + " libcuda.so." + driverVersion + " | docker exec -i " + workerName + " tar -xf - -C /nvidia-driver-libs/" + copyLibs := exec.CommandContext(ctx, "bash", "-c", copyLibsScript) + if k.Verbose { + k.log("Running: %s", copyLibsScript) + } + if output, err := copyLibs.CombinedOutput(); err != nil { + return fmt.Errorf("failed to copy NVIDIA libraries: %w\nOutput: %s", err, string(output)) + } + + // Create symlinks + symlinkCmd := exec.CommandContext(ctx, "docker", "exec", workerName, "bash", "-c", + fmt.Sprintf("cd /nvidia-driver-libs && ln -sf libnvidia-ml.so.%s libnvidia-ml.so.1 && ln -sf libcuda.so.%s libcuda.so.1 && chmod +x nvidia-smi", + driverVersion, driverVersion)) + if err := symlinkCmd.Run(); err != nil { + return fmt.Errorf("failed to create symlinks: %w", err) + } + + // Verify nvidia-smi works + verifyCmd := exec.CommandContext(ctx, "docker", "exec", workerName, "bash", "-c", + "LD_LIBRARY_PATH=/nvidia-driver-libs /nvidia-driver-libs/nvidia-smi") + if output, err := verifyCmd.CombinedOutput(); err != nil { + return fmt.Errorf("nvidia-smi verification failed: %w\nOutput: %s", err, string(output)) + } + k.log("✅ nvidia-smi verified in Kind worker") + + // Deploy device plugin + return k.deployDevicePlugin(ctx) +} + +// deployDevicePlugin deploys the NVIDIA device plugin +func (k *KindCluster) deployDevicePlugin(ctx context.Context) error { + // Check if already deployed + checkCmd := exec.CommandContext(ctx, "kubectl", "get", "daemonset", + "nvidia-device-plugin-daemonset", "-n", "kube-system") + if checkCmd.Run() == nil { + k.log("NVIDIA device plugin already deployed") + return nil + } + + k.log("Deploying NVIDIA device plugin...") + + devicePluginYAML := `apiVersion: apps/v1 +kind: DaemonSet +metadata: + name: nvidia-device-plugin-daemonset + namespace: kube-system +spec: + selector: + matchLabels: + name: nvidia-device-plugin-ds + template: + metadata: + labels: + name: nvidia-device-plugin-ds + spec: + tolerations: + - key: nvidia.com/gpu + operator: Exists + effect: NoSchedule + containers: + - image: nvcr.io/nvidia/k8s-device-plugin:v0.14.1 + name: nvidia-device-plugin-ctr + env: + - name: LD_LIBRARY_PATH + value: "/nvidia-driver-libs" + securityContext: + privileged: true + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + - name: dev + mountPath: /dev + - name: nvidia-driver-libs + mountPath: /nvidia-driver-libs + readOnly: true + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins + - name: dev + hostPath: + path: /dev + - name: nvidia-driver-libs + hostPath: + path: /nvidia-driver-libs` + + tmpFile, err := os.CreateTemp("", "nvidia-device-plugin-*.yaml") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + defer os.Remove(tmpFile.Name()) + + if _, err := tmpFile.WriteString(devicePluginYAML); err != nil { + return fmt.Errorf("failed to write device plugin manifest: %w", err) + } + tmpFile.Close() + + applyCmd := exec.CommandContext(ctx, "kubectl", "apply", "-f", tmpFile.Name()) + if output, err := applyCmd.CombinedOutput(); err != nil { + return fmt.Errorf("failed to apply device plugin: %w\nOutput: %s", err, string(output)) + } + + k.log("NVIDIA device plugin deployed, waiting for it to be ready...") + time.Sleep(20 * time.Second) + + // Verify GPUs are allocatable + verifyCmd := exec.CommandContext(ctx, "kubectl", "get", "nodes", + "-o", "custom-columns=NAME:.metadata.name,GPU:.status.allocatable.nvidia\\.com/gpu") + if output, err := verifyCmd.CombinedOutput(); err != nil { + k.log("Warning: Could not verify GPU allocatable: %v", err) + } else { + k.log("GPU allocatable status:\n%s", string(output)) + } + + k.log("✅ GPU setup complete") + return nil +} diff --git a/e2e/pkg/framework/runner.go b/e2e/pkg/framework/runner.go index 0856b439b..8c6b80861 100644 --- a/e2e/pkg/framework/runner.go +++ b/e2e/pkg/framework/runner.go @@ -209,6 +209,13 @@ func (r *Runner) Run(ctx context.Context) error { func (r *Runner) setupCluster(ctx context.Context) error { r.log("Setting up Kind cluster: %s", r.opts.ClusterName) + + // Enable GPU support for dynamo profile + if r.profile.Name() == "dynamo" { + r.log("Enabling GPU support for Dynamo profile") + r.cluster.SetGPUEnabled(true) + } + return r.cluster.Create(ctx) } diff --git a/e2e/pkg/helm/deployer.go b/e2e/pkg/helm/deployer.go index 595d5fcf6..53c1da161 100644 --- a/e2e/pkg/helm/deployer.go +++ b/e2e/pkg/helm/deployer.go @@ -94,11 +94,13 @@ func (d *Deployer) WaitForDeployment(ctx context.Context, namespace, deploymentN ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() + // Convert timeout to seconds for kubectl + timeoutSeconds := int(timeout.Seconds()) cmd := exec.CommandContext(ctx, "kubectl", "wait", "--for=condition=Available", fmt.Sprintf("deployment/%s", deploymentName), "-n", namespace, - "--timeout=600s", + fmt.Sprintf("--timeout=%ds", timeoutSeconds), "--kubeconfig", d.KubeConfig) if d.Verbose { diff --git a/e2e/profiles/dynamo/profile.go b/e2e/profiles/dynamo/profile.go new file mode 100644 index 000000000..ed507f0c5 --- /dev/null +++ b/e2e/profiles/dynamo/profile.go @@ -0,0 +1,661 @@ +package dynamo + +import ( + "context" + "fmt" + "os" + "os/exec" + "time" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" + + "github.com/vllm-project/semantic-router/e2e/pkg/framework" + "github.com/vllm-project/semantic-router/e2e/pkg/helm" + "github.com/vllm-project/semantic-router/e2e/pkg/helpers" + + // Import testcases package to register all test cases via their init() functions + _ "github.com/vllm-project/semantic-router/e2e/testcases" +) + +// Profile implements the Dynamo test profile +type Profile struct { + verbose bool +} + +// NewProfile creates a new Dynamo profile +func NewProfile() *Profile { + return &Profile{} +} + +// Name returns the profile name +func (p *Profile) Name() string { + return "dynamo" +} + +// Description returns the profile description +func (p *Profile) Description() string { + return "Tests Semantic Router with Nvidia Dynamo integration (GPU-enabled disaggregated vLLM deployment)" +} + +// Setup deploys all required components for Dynamo testing +// Note: GPU setup is handled by the cluster creation in e2e/pkg/cluster/kind.go +func (p *Profile) Setup(ctx context.Context, opts *framework.SetupOptions) error { + p.verbose = opts.Verbose + p.log("Setting up Dynamo test environment") + + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + + // Step 1: Deploy Nvidia Dynamo components (includes workers) + p.log("Step 1/5: Deploying Nvidia Dynamo components") + if err := p.deployDynamo(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy dynamo: %w", err) + } + + // Step 2: Deploy Envoy Gateway (must be before Semantic Router to install Gateway API CRDs) + p.log("Step 2/5: Deploying Envoy Gateway") + if err := p.deployEnvoyGateway(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy envoy gateway: %w", err) + } + + // Step 3: Deploy Semantic Router with Dynamo integration + p.log("Step 3/5: Deploying Semantic Router with Dynamo integration") + if err := p.deploySemanticRouter(ctx, deployer, opts); err != nil { + return fmt.Errorf("failed to deploy semantic router: %w", err) + } + + // Step 4: Configure Gateway API routing + p.log("Step 4/5: Configuring Gateway API routing") + if err := p.configureDynamoSettings(ctx, opts); err != nil { + return fmt.Errorf("failed to configure dynamo settings: %w", err) + } + + // Step 5: Verify all components are ready + p.log("Step 5/5: Verifying all components are ready") + if err := p.verifyEnvironment(ctx, opts); err != nil { + return fmt.Errorf("failed to verify environment: %w", err) + } + + p.log("Dynamo test environment setup complete") + return nil +} + +// Teardown cleans up all deployed resources +func (p *Profile) Teardown(ctx context.Context, opts *framework.TeardownOptions) error { + p.verbose = opts.Verbose + p.log("Tearing down Dynamo test environment") + + deployer := helm.NewDeployer(opts.KubeConfig, opts.Verbose) + + // Clean up in reverse order + p.log("Cleaning up worker resources") + p.cleanupWorkerResources(ctx, opts) + + p.log("Uninstalling Envoy Gateway") + deployer.Uninstall(ctx, "eg", "envoy-gateway-system") + + p.log("Uninstalling Semantic Router") + deployer.Uninstall(ctx, "semantic-router", "vllm-semantic-router-system") + + p.log("Uninstalling Dynamo components") + p.cleanupDynamo(ctx, deployer, opts) + + p.log("Dynamo test environment teardown complete") + return nil +} + +// GetTestCases returns the list of test cases for this profile +func (p *Profile) GetTestCases() []string { + return []string{ + // Dynamo-specific test cases + "dynamo-health-check", + "dynamo-optimized-inference", + "dynamo-performance-comparison", + "dynamo-dynamic-batching", + "dynamo-gpu-utilization", + } +} + +// GetServiceConfig returns the service configuration for accessing the deployed service +func (p *Profile) GetServiceConfig() framework.ServiceConfig { + return framework.ServiceConfig{ + LabelSelector: "gateway.envoyproxy.io/owning-gateway-namespace=default,gateway.envoyproxy.io/owning-gateway-name=semantic-router", + Namespace: "envoy-gateway-system", + PortMapping: "8080:80", + } +} + +func (p *Profile) deployDynamo(ctx context.Context, deployer *helm.Deployer, opts *framework.SetupOptions) error { + // Deploy Dynamo using official Helm charts + // This installs CRDs, platform (etcd, NATS, operator), then we deploy custom frontend + // Reference: https://github.com/ai-dynamo/dynamo/blob/main/docs/kubernetes/README.md + + namespace := "dynamo-system" + releaseVersion := "0.6.1" // Using v0.6.1 with minimal spec (no dynamoComponent needed) + + // Step 1: Install Dynamo CRDs (cluster-scoped, install to default namespace) + p.log(" Step 1/5: Installing Dynamo CRDs (version: %s)", releaseVersion) + crdsChartURL := fmt.Sprintf("https://helm.ngc.nvidia.com/nvidia/ai-dynamo/charts/dynamo-crds-%s.tgz", releaseVersion) + + crdsInstallOpts := helm.InstallOptions{ + ReleaseName: "dynamo-crds", + Chart: crdsChartURL, + Namespace: "default", // CRDs are cluster-scoped + Wait: true, + Timeout: "5m", + } + if err := deployer.Install(ctx, crdsInstallOpts); err != nil { + return fmt.Errorf("failed to install Dynamo CRDs: %w", err) + } + p.log("Dynamo CRDs installed successfully") + + // Wait for CRDs to be registered in the API server + p.log("Waiting for DynamoGraphDeployment CRD to be registered...") + if err := p.waitForCRD(ctx, opts.KubeConfig, "dynamographdeployments.nvidia.com", 2*time.Minute); err != nil { + return fmt.Errorf("DynamoGraphDeployment CRD not registered: %w", err) + } + p.log("✅ DynamoGraphDeployment CRD is ready") + + // Step 2: Install Dynamo Platform (includes etcd, NATS, operator) + p.log(" Step 2/5: Installing Dynamo Platform (includes etcd, NATS, operator)") + platformChartURL := fmt.Sprintf("https://helm.ngc.nvidia.com/nvidia/ai-dynamo/charts/dynamo-platform-%s.tgz", releaseVersion) + + platformInstallOpts := helm.InstallOptions{ + ReleaseName: "dynamo-platform", + Chart: platformChartURL, + Namespace: namespace, + Wait: true, + Timeout: "10m", + } + if err := deployer.Install(ctx, platformInstallOpts); err != nil { + return fmt.Errorf("failed to install Dynamo Platform: %w", err) + } + p.log("Dynamo Platform installed successfully") + + // Step 3: List what was actually deployed (for debugging) + if p.verbose { + p.log("Listing resources in %s namespace...", namespace) + p.runKubectl(ctx, opts.KubeConfig, "get", "all", "-n", namespace) + p.log("Listing StatefulSets in %s namespace...", namespace) + p.runKubectl(ctx, opts.KubeConfig, "get", "statefulsets", "-n", namespace) + } + + // Step 3: Wait for platform components to be ready + p.log(" Step 3/5: Waiting for Dynamo platform components to be ready...") + + // The platform chart may deploy etcd/NATS with different names or as StatefulSets + // Check for common resource names and types + etcdReady := false + natsReady := false + + // Try to find etcd (could be Deployment, StatefulSet, or different name) + etcdNames := []string{"dynamo-platform-etcd", "etcd", "dynamo-etcd", "etcd-server"} + for _, name := range etcdNames { + if err := deployer.WaitForDeployment(ctx, namespace, name, 2*time.Minute); err == nil { + p.log("etcd is ready (found as deployment: %s)", name) + etcdReady = true + break + } + // Try StatefulSet + if err := p.waitForStatefulSet(ctx, opts.KubeConfig, namespace, name, 2*time.Minute); err == nil { + p.log("etcd is ready (found as statefulset: %s)", name) + etcdReady = true + break + } + } + if !etcdReady { + p.log("Warning: etcd not found with common names, platform chart may use different naming") + p.log(" Continuing anyway - etcd may be deployed differently or not required") + } + + // Try to find NATS (could be Deployment, StatefulSet, or different name) + natsNames := []string{"dynamo-platform-nats", "nats", "dynamo-nats", "nats-server"} + for _, name := range natsNames { + if err := deployer.WaitForDeployment(ctx, namespace, name, 2*time.Minute); err == nil { + p.log("NATS is ready (found as deployment: %s)", name) + natsReady = true + break + } + // Try StatefulSet + if err := p.waitForStatefulSet(ctx, opts.KubeConfig, namespace, name, 2*time.Minute); err == nil { + p.log("NATS is ready (found as statefulset: %s)", name) + natsReady = true + break + } + } + if !natsReady { + p.log("Warning: NATS not found with common names, platform chart may use different naming") + p.log(" Continuing anyway - NATS may be deployed differently or not required") + } + + // Wait for Dynamo operator (if deployed) + operatorNames := []string{"dynamo-platform-dynamo-operator-controller-manager", "dynamo-operator"} + operatorFound := false + for _, name := range operatorNames { + if err := deployer.WaitForDeployment(ctx, namespace, name, 2*time.Minute); err == nil { + p.log("Dynamo operator is ready (found as deployment: %s)", name) + operatorFound = true + break + } + } + if !operatorFound { + p.log("Warning: dynamo-operator not found or not ready (may not be included in platform chart)") + } + + // Additional wait for NATS JetStream to be fully initialized (if NATS was found) + if natsReady { + p.log("Waiting for NATS JetStream to be fully initialized...") + time.Sleep(10 * time.Second) + } + + // Step 4: Deploy DynamoGraphDeployment CRD + // This tests proper Dynamo integration (Frontend coordinates with workers via etcd/NATS) + p.log(" Step 4/5: Deploying DynamoGraphDeployment CRD") + dynamoManifestsPath := "deploy/kubernetes/dynamo/dynamo-resources" + + // Deploy DynamoGraphDeployment (Frontend + Workers) with backendFramework specified + crdPath := fmt.Sprintf("%s/dynamo-graph-deployment.yaml", dynamoManifestsPath) + p.log("Deploying DynamoGraphDeployment (Frontend + Prefill Worker + Decode Worker with GPU)...") + if err := p.kubectlApply(ctx, opts.KubeConfig, crdPath); err != nil { + return fmt.Errorf("failed to deploy DynamoGraphDeployment: %w", err) + } + + // Step 5: Wait for Dynamo operator to create resources + p.log(" Step 5/5: Waiting for Dynamo operator to create Frontend and Workers...") + + // Wait for operator to create deployments (may take time) + time.Sleep(15 * time.Second) + + // Wait for Frontend to be ready + p.log("Waiting for Frontend deployment...") + if err := deployer.WaitForDeployment(ctx, namespace, "vllm-frontend", 5*time.Minute); err != nil { + return fmt.Errorf("frontend deployment not ready: %w", err) + } + p.log("✅ Frontend is ready") + + // Wait for Prefill Worker (disaggregated deployment) + p.log("Waiting for Prefill Worker deployment...") + prefillNames := []string{"vllm-vllmprefillworker", "vllm-prefillworker"} + prefillFound := false + for _, name := range prefillNames { + if err := deployer.WaitForDeployment(ctx, namespace, name, 10*time.Minute); err == nil { + p.log("✅ Prefill Worker is ready (found as deployment: %s)", name) + prefillFound = true + break + } + } + if !prefillFound { + p.log("⚠️ Prefill Worker not found (may be using aggregated deployment)") + } + + // Wait for Decode Worker + p.log("Waiting for Decode Worker deployment...") + decodeNames := []string{"vllm-vllmdecodeworker", "vllm-decodeworker"} + decodeFound := false + for _, name := range decodeNames { + if err := deployer.WaitForDeployment(ctx, namespace, name, 10*time.Minute); err == nil { + p.log("✅ Decode Worker is ready (found as deployment: %s)", name) + decodeFound = true + break + } + } + if !decodeFound { + return fmt.Errorf("decode worker deployment not ready: no decode worker deployment found") + } + + p.log("✅ DynamoGraphDeployment created successfully!") + p.log(" Disaggregated Deployment: Frontend + Prefill Worker + Decode Worker") + p.log(" All components register with ETCD/NATS for KV-aware routing") + + return nil +} + +func (p *Profile) deploySemanticRouter(ctx context.Context, deployer *helm.Deployer, opts *framework.SetupOptions) error { + // Use local Helm chart instead of remote OCI registry (path relative to project root) + chartPath := "deploy/helm/semantic-router" + valuesFile := "deploy/kubernetes/dynamo/semantic-router-values/values.yaml" + + // Check if Dynamo-specific values file exists, otherwise use default + if _, err := os.Stat(valuesFile); err != nil { + p.log("Dynamo-specific values file not found, using default values") + valuesFile = "deploy/kubernetes/ai-gateway/semantic-router-values/values.yaml" + } + + // Override image to use locally built image + imageRepo := "ghcr.io/vllm-project/semantic-router/extproc" + imageTag := opts.ImageTag + + installOpts := helm.InstallOptions{ + ReleaseName: "semantic-router", + Chart: chartPath, + Namespace: "vllm-semantic-router-system", + ValuesFiles: []string{valuesFile}, + Set: map[string]string{ + "image.repository": imageRepo, + "image.tag": imageTag, + "image.pullPolicy": "Never", // Use local image, don't pull from registry + }, + Wait: true, + Timeout: "20m", // Increased timeout for Semantic Router (model downloads can take time) + } + + if err := deployer.Install(ctx, installOpts); err != nil { + // If Helm install fails but pod is running, continue anyway + // This can happen if Helm times out but the deployment is actually ready + p.log("Warning: Helm install reported error, but checking if deployment is ready anyway: %v", err) + // Don't return error immediately - check if deployment is actually ready + } + + // Wait for deployment separately with longer timeout + // This ensures we wait even if Helm's wait timed out + return deployer.WaitForDeployment(ctx, "vllm-semantic-router-system", "semantic-router", 20*time.Minute) +} + +func (p *Profile) deployEnvoyGateway(ctx context.Context, deployer *helm.Deployer, _ *framework.SetupOptions) error { + // Use Dynamo-specific values file that enables EnvoyPatchPolicy + // This is required for Semantic Router ExtProc integration + valuesFile := "deploy/kubernetes/dynamo/dynamo-resources/envoy-gateway-values.yaml" + + installOpts := helm.InstallOptions{ + ReleaseName: "eg", + Chart: "oci://docker.io/envoyproxy/gateway-helm", + Namespace: "envoy-gateway-system", + Version: "v0.0.0-latest", + ValuesFiles: []string{valuesFile}, // Enable extensionAPIs.enableEnvoyPatchPolicy + Wait: true, + Timeout: "5m", + } + + if err := deployer.Install(ctx, installOpts); err != nil { + return err + } + + return deployer.WaitForDeployment(ctx, "envoy-gateway-system", "envoy-gateway", 5*time.Minute) +} + +func (p *Profile) configureDynamoSettings(ctx context.Context, opts *framework.SetupOptions) error { + dynamoManifestsPath := "deploy/kubernetes/dynamo/dynamo-resources" + + // Deploy RBAC for Semantic Router to access Dynamo CRDs + // Must be done AFTER Semantic Router deployment (which creates vllm-semantic-router-system namespace) + rbacPath := fmt.Sprintf("%s/rbac.yaml", dynamoManifestsPath) + p.log("Deploying RBAC for Semantic Router to access Dynamo CRDs...") + if err := p.kubectlApply(ctx, opts.KubeConfig, rbacPath); err != nil { + return fmt.Errorf("failed to deploy RBAC: %w", err) + } + p.log("RBAC configured successfully") + + // Configure Dynamo optimization settings via ConfigMap + // This includes: + // - KV cache management settings + // - Worker pool configuration + // - Routing optimization parameters (KV-aware routing, load balancing) + // - NATS configuration for message queuing + + // Apply Dynamo configuration ConfigMap + configPath := fmt.Sprintf("%s/dynamo-config.yaml", dynamoManifestsPath) + if _, err := os.Stat(configPath); err == nil { + p.log("Applying Dynamo optimization settings (KV cache, routing, worker pool config)") + if err := p.kubectlApply(ctx, opts.KubeConfig, configPath); err != nil { + return fmt.Errorf("failed to apply dynamo config: %w", err) + } + p.log("Dynamo optimization settings configured") + } else { + p.log("Warning: Dynamo config not found at %s, using platform defaults", configPath) + } + + // Configure Envoy to route to Dynamo backend + // This must be done AFTER Envoy Gateway is deployed (which installs Gateway API CRDs) + gatewayResourcesPath := fmt.Sprintf("%s/gwapi-resources.yaml", dynamoManifestsPath) + if _, err := os.Stat(gatewayResourcesPath); err == nil { + p.log("Applying Gateway API resources for Dynamo routing (CRDs should be installed by Envoy Gateway)") + // Wait a bit to ensure CRDs are fully registered + time.Sleep(5 * time.Second) + if err := p.kubectlApply(ctx, opts.KubeConfig, gatewayResourcesPath); err != nil { + return fmt.Errorf("failed to apply gateway resources: %w", err) + } + p.log("Gateway API resources configured for Dynamo routing") + } else { + p.log("Warning: Gateway API resources not found at %s", gatewayResourcesPath) + } + + return nil +} + +func (p *Profile) deployWorkerResources(ctx context.Context, opts *framework.SetupOptions) error { + // Deploy demo worker pools (vLLM workers) for testing + // These would be model-specific worker deployments (path relative to project root) + + workerResourcesPath := "deploy/kubernetes/dynamo/dynamo-resources/worker-pool.yaml" + if _, err := os.Stat(workerResourcesPath); err == nil { + p.log("Deploying worker resources from %s", workerResourcesPath) + if err := p.kubectlApply(ctx, opts.KubeConfig, workerResourcesPath); err != nil { + return fmt.Errorf("failed to apply worker resources: %w", err) + } + } else { + p.log("Worker resources not found at %s, skipping worker deployment", workerResourcesPath) + p.log("Note: Worker pools should be deployed separately for E2E testing") + } + + return nil +} + +func (p *Profile) verifyEnvironment(ctx context.Context, opts *framework.SetupOptions) error { + // Create Kubernetes client + config, err := clientcmd.BuildConfigFromFlags("", opts.KubeConfig) + if err != nil { + return fmt.Errorf("failed to build kubeconfig: %w", err) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create kube client: %w", err) + } + + // Wait for Envoy Gateway service to be ready with retry + retryTimeout := 10 * time.Minute + retryInterval := 5 * time.Second + startTime := time.Now() + + p.log("Waiting for Envoy Gateway service to be ready...") + + // Label selector for the semantic-router gateway service + labelSelector := "gateway.envoyproxy.io/owning-gateway-namespace=default,gateway.envoyproxy.io/owning-gateway-name=semantic-router" + + var envoyService string + for { + // Try to get Envoy service name + envoyService, err = helpers.GetEnvoyServiceName(ctx, client, labelSelector, p.verbose) + if err == nil { + // Verify that the service has exactly 1 pod running with all containers ready + podErr := helpers.VerifyServicePodsRunning(ctx, client, "envoy-gateway-system", envoyService, p.verbose) + if podErr == nil { + p.log("Envoy Gateway service is ready: %s", envoyService) + break + } + if p.verbose { + p.log("Envoy service found but pods not ready: %v", podErr) + } + err = fmt.Errorf("service pods not ready: %w", podErr) + } + + if time.Since(startTime) >= retryTimeout { + return fmt.Errorf("failed to get Envoy service with running pods after %v: %w", retryTimeout, err) + } + + if p.verbose { + p.log("Envoy service not ready, retrying in %v... (elapsed: %v)", + retryInterval, time.Since(startTime).Round(time.Second)) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(retryInterval): + // Continue retry + } + } + + // Check all deployments are healthy + p.log("Verifying all deployments are healthy...") + + // Check semantic-router deployment + if err := helpers.CheckDeployment(ctx, client, "vllm-semantic-router-system", "semantic-router", p.verbose); err != nil { + return fmt.Errorf("semantic-router deployment not healthy: %w", err) + } + + // Check envoy-gateway deployment + if err := helpers.CheckDeployment(ctx, client, "envoy-gateway-system", "envoy-gateway", p.verbose); err != nil { + return fmt.Errorf("envoy-gateway deployment not healthy: %w", err) + } + + // Check Dynamo operator (managed by platform chart) + if err := helpers.CheckDeployment(ctx, client, "dynamo-system", "dynamo-platform-dynamo-operator-controller-manager", p.verbose); err != nil { + if p.verbose { + p.log("Dynamo operator deployment not found or not healthy: %v", err) + } + } + + p.log("All deployments are healthy") + + return nil +} + +func (p *Profile) cleanupDynamo(ctx context.Context, deployer *helm.Deployer, opts *framework.TeardownOptions) error { + // Clean up Dynamo resources + namespace := "dynamo-system" + + // Step 1: Clean up DynamoGraphDeployment and resources + dynamoManifestsPath := "deploy/kubernetes/dynamo/dynamo-resources" + cleanupFiles := []string{ + "dynamo-graph-deployment.yaml", // DynamoGraphDeployment CRD + "rbac.yaml", // RBAC for Dynamo CRDs + } + + for _, file := range cleanupFiles { + filePath := fmt.Sprintf("%s/%s", dynamoManifestsPath, file) + if _, err := os.Stat(filePath); err == nil { + p.log("Cleaning up %s", file) + p.kubectlDelete(ctx, opts.KubeConfig, filePath) + } + } + + // Step 2: Uninstall Dynamo Platform Helm release (includes etcd, NATS, operator) + p.log("Uninstalling Dynamo Platform Helm release") + if err := deployer.Uninstall(ctx, "dynamo-platform", namespace); err != nil { + p.log("Warning: Failed to uninstall dynamo-platform: %v", err) + } + + // Step 3: Uninstall Dynamo CRDs Helm release (cluster-scoped) + p.log("Uninstalling Dynamo CRDs Helm release") + if err := deployer.Uninstall(ctx, "dynamo-crds", "default"); err != nil { + p.log("Warning: Failed to uninstall dynamo-crds: %v", err) + } + + return nil +} + +func (p *Profile) cleanupWorkerResources(ctx context.Context, opts *framework.TeardownOptions) error { + // Clean up DynamoGraphDeployment and RBAC + dynamoManifestsPath := "deploy/kubernetes/dynamo/dynamo-resources" + + cleanupFiles := []string{ + "dynamo-graph-deployment.yaml", // DynamoGraphDeployment CRD + "rbac.yaml", // RBAC for Dynamo CRDs + } + + for _, file := range cleanupFiles { + filePath := fmt.Sprintf("%s/%s", dynamoManifestsPath, file) + if _, err := os.Stat(filePath); err == nil { + p.log("Cleaning up %s", file) + p.kubectlDelete(ctx, opts.KubeConfig, filePath) + } else { + p.log("%s not found, skipping cleanup", file) + } + } + + // Clean up Gateway API resources + gatewayResourcesPath := "deploy/kubernetes/dynamo/dynamo-resources/gwapi-resources.yaml" + if _, err := os.Stat(gatewayResourcesPath); err == nil { + p.log("Cleaning up Gateway API resources") + p.kubectlDelete(ctx, opts.KubeConfig, gatewayResourcesPath) + } + + return nil +} + +func (p *Profile) kubectlApply(ctx context.Context, kubeConfig, manifest string) error { + return p.runKubectl(ctx, kubeConfig, "apply", "-f", manifest) +} + +// waitForStatefulSet waits for a StatefulSet to be ready +func (p *Profile) waitForStatefulSet(ctx context.Context, kubeConfig, namespace, name string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + timeoutSeconds := int(timeout.Seconds()) + cmd := exec.CommandContext(ctx, "kubectl", "wait", + "--for=condition=Ready", + fmt.Sprintf("statefulset/%s", name), + "-n", namespace, + fmt.Sprintf("--timeout=%ds", timeoutSeconds), + "--kubeconfig", kubeConfig) + + if p.verbose { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + + if err := cmd.Run(); err != nil { + return fmt.Errorf("statefulset failed to become ready: %w", err) + } + + return nil +} + +// waitForCRD waits for a CRD to be registered in the API server +func (p *Profile) waitForCRD(ctx context.Context, kubeConfig, crdName string, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for CRD %s to be registered", crdName) + case <-ticker.C: + // Try to get the CRD + cmd := exec.CommandContext(ctx, "kubectl", "get", "crd", crdName, "--kubeconfig", kubeConfig) + if err := cmd.Run(); err == nil { + // CRD exists and is registered + return nil + } + // CRD not ready yet, continue waiting + if p.verbose { + p.log("Waiting for CRD %s to be registered...", crdName) + } + } + } +} + +func (p *Profile) kubectlDelete(ctx context.Context, kubeConfig, manifest string) error { + return p.runKubectl(ctx, kubeConfig, "delete", "-f", manifest) +} + +func (p *Profile) runKubectl(ctx context.Context, kubeConfig string, args ...string) error { + args = append(args, "--kubeconfig", kubeConfig) + cmd := exec.CommandContext(ctx, "kubectl", args...) + if p.verbose { + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + } + return cmd.Run() +} + +func (p *Profile) log(format string, args ...interface{}) { + if p.verbose { + fmt.Printf("[Dynamo] "+format+"\n", args...) + } +} diff --git a/e2e/testcases/dynamo_dynamic_batching.go b/e2e/testcases/dynamo_dynamic_batching.go new file mode 100644 index 000000000..d95f1e74e --- /dev/null +++ b/e2e/testcases/dynamo_dynamic_batching.go @@ -0,0 +1,202 @@ +package testcases + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("dynamo-dynamic-batching", pkgtestcases.TestCase{ + Description: "Test Dynamo's dynamic batching functionality with concurrent requests", + Tags: []string{"dynamo", "batching", "concurrency"}, + Fn: testDynamoDynamicBatching, + }) +} + +func testDynamoDynamicBatching(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + if opts.Verbose { + fmt.Println("[Test] Testing Dynamo dynamic batching") + } + + // Setup service connection and get local port + localPort, stopPortForward, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPortForward() + + // Send concurrent requests to test batching + const concurrentRequests = 5 + testContent := "What is 2+2? Provide a brief answer." + + if opts.Verbose { + fmt.Printf("[Test] Sending %d concurrent requests to test batching\n", concurrentRequests) + } + + var wg sync.WaitGroup + var mu sync.Mutex + var results []BatchResult + + startTime := time.Now() + + for i := 0; i < concurrentRequests; i++ { + wg.Add(1) + go func(requestID int) { + defer wg.Done() + + result := sendBatchedRequest(ctx, localPort, testContent, requestID, opts.Verbose) + + mu.Lock() + results = append(results, result) + mu.Unlock() + }(i) + } + + wg.Wait() + totalTime := time.Since(startTime) + + // Analyze results + successCount := 0 + var totalLatency time.Duration + for _, result := range results { + if result.Success { + successCount++ + totalLatency += result.Latency + } + } + + avgLatency := time.Duration(0) + if successCount > 0 { + avgLatency = totalLatency / time.Duration(successCount) + } + + // Calculate batching efficiency + // If requests are batched, the total time should be less than sum of individual latencies + sequentialTime := totalLatency + batchingEfficiency := float64(sequentialTime) / float64(totalTime) + if batchingEfficiency > 1.0 { + batchingEfficiency = 1.0 / batchingEfficiency + } + + // Set details for reporting + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "concurrent_requests": concurrentRequests, + "successful": successCount, + "success_rate": fmt.Sprintf("%.2f%%", float64(successCount)/float64(concurrentRequests)*100), + "avg_latency_ms": avgLatency.Milliseconds(), + "total_time_ms": totalTime.Milliseconds(), + "batching_efficiency": fmt.Sprintf("%.2f", batchingEfficiency), + "sequential_time_ms": sequentialTime.Milliseconds(), + }) + } + + if opts.Verbose { + separator := strings.Repeat("=", 80) + fmt.Println("\n" + separator) + fmt.Println("Dynamic Batching Test Results") + fmt.Println(separator) + fmt.Printf("Concurrent Requests: %d\n", concurrentRequests) + fmt.Printf("Successful: %d (%.2f%%)\n", successCount, float64(successCount)/float64(concurrentRequests)*100) + fmt.Printf("Average Latency: %v (%.2f ms)\n", avgLatency, float64(avgLatency.Milliseconds())) + fmt.Printf("Total Time: %v (%.2f ms)\n", totalTime, float64(totalTime.Milliseconds())) + fmt.Printf("Sequential Time: %v (%.2f ms)\n", sequentialTime, float64(sequentialTime.Milliseconds())) + fmt.Printf("Batching Efficiency: %.2f%%\n", batchingEfficiency*100) + fmt.Println(separator) + + if batchingEfficiency > 0.5 { + fmt.Println("[Test] ✅ Dynamic batching appears to be working (efficiency > 50%)") + } else { + fmt.Println("[Test] ⚠️ Batching efficiency is low - may need tuning") + } + } + + return nil +} + +type BatchResult struct { + RequestID int + Success bool + Latency time.Duration + Error string +} + +func sendBatchedRequest(ctx context.Context, localPort, content string, requestID int, verbose bool) BatchResult { + result := BatchResult{ + RequestID: requestID, + Success: false, + } + + start := time.Now() + + requestBody := map[string]interface{}{ + "model": "MoM", + "messages": []map[string]string{ + { + "role": "user", + "content": fmt.Sprintf("[Request %d] %s", requestID, content), + }, + }, + } + + jsonData, err := json.Marshal(requestBody) + if err != nil { + result.Error = fmt.Sprintf("marshal error: %v", err) + result.Latency = time.Since(start) + return result + } + + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + result.Error = fmt.Sprintf("create request error: %v", err) + result.Latency = time.Since(start) + return result + } + + req.Header.Set("Content-Type", "application/json") + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := httpClient.Do(req) + if err != nil { + result.Error = fmt.Sprintf("send request error: %v", err) + result.Latency = time.Since(start) + return result + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + result.Error = fmt.Sprintf("read response error: %v", err) + result.Latency = time.Since(start) + result.Success = resp.StatusCode == http.StatusOK + return result + } + + result.Latency = time.Since(start) + result.Success = resp.StatusCode == http.StatusOK + + if !result.Success { + result.Error = fmt.Sprintf("status %d: %s", resp.StatusCode, string(body)) + } + + if verbose { + fmt.Printf("[Test] Request %d: %s (latency: %v)\n", requestID, + map[bool]string{true: "success", false: "failed"}[result.Success], result.Latency) + } + + return result +} diff --git a/e2e/testcases/dynamo_gpu_utilization.go b/e2e/testcases/dynamo_gpu_utilization.go new file mode 100644 index 000000000..d74207a31 --- /dev/null +++ b/e2e/testcases/dynamo_gpu_utilization.go @@ -0,0 +1,147 @@ +package testcases + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" +) + +func init() { + pkgtestcases.Register("dynamo-gpu-utilization", pkgtestcases.TestCase{ + Description: "Monitor GPU utilization and efficiency for Dynamo workers", + Tags: []string{"dynamo", "gpu", "monitoring"}, + Fn: testDynamoGPUUtilization, + }) +} + +func testDynamoGPUUtilization(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + if opts.Verbose { + fmt.Println("[Test] Monitoring GPU utilization for Dynamo workers") + } + + namespace := "dynamo-system" + + // Get worker pods - try Dynamo labels first, then fallback + workerPods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "nvidia.com/dynamo-graph-deployment-name=vllm", + }) + if err != nil { + // If no workers found, that's okay for E2E testing + if opts.Verbose { + fmt.Printf("[Test] No worker pods found (this may be expected in E2E): %v\n", err) + } + return nil + } + + // Filter to only include worker pods (exclude frontend) + var filteredPods []corev1.Pod + for _, pod := range workerPods.Items { + for key := range pod.Labels { + if key == "nvidia.com/dynamo-component" { + componentType := pod.Labels["nvidia.com/dynamo-component-type"] + if componentType == "worker" { + filteredPods = append(filteredPods, pod) + } + } + } + } + workerPods.Items = filteredPods + + if len(workerPods.Items) == 0 { + if opts.Verbose { + fmt.Println("[Test] No worker pods found - skipping GPU utilization check") + } + return nil + } + + var gpuInfo []map[string]interface{} + totalGPUs := 0 + workersWithGPU := 0 + + for _, pod := range workerPods.Items { + if pod.Status.Phase != corev1.PodRunning { + continue + } + + podInfo := map[string]interface{}{ + "pod_name": pod.Name, + "namespace": pod.Namespace, + "node_name": pod.Spec.NodeName, + "phase": string(pod.Status.Phase), + } + + // Check for GPU resources in pod spec + gpuCount := int64(0) + for _, container := range pod.Spec.Containers { + if container.Resources.Limits != nil { + if gpuLimit, ok := container.Resources.Limits["nvidia.com/gpu"]; ok { + gpuCount = gpuLimit.Value() + totalGPUs += int(gpuCount) + workersWithGPU++ + podInfo["gpu_count"] = gpuCount + } + } + if container.Resources.Requests != nil { + if gpuRequest, ok := container.Resources.Requests["nvidia.com/gpu"]; ok { + podInfo["gpu_requested"] = gpuRequest.Value() + } + } + } + + // Try to get GPU metrics from pod (if metrics server is available) + // This is a best-effort check + if opts.Verbose { + fmt.Printf("[Test] Pod %s: %d GPU(s) allocated\n", pod.Name, gpuCount) + } + + // Check container status + var containerStatuses []string + for _, status := range pod.Status.ContainerStatuses { + containerStatuses = append(containerStatuses, fmt.Sprintf("%s:%v", status.Name, status.Ready)) + } + podInfo["containers"] = containerStatuses + + gpuInfo = append(gpuInfo, podInfo) + } + + // Set details for reporting + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "worker_pods": len(workerPods.Items), + "workers_with_gpu": workersWithGPU, + "total_gpus": totalGPUs, + "gpu_info": gpuInfo, + "monitoring_note": "GPU utilization metrics require nvidia-smi or metrics server", + }) + } + + if opts.Verbose { + fmt.Println("\n" + strings.Repeat("=", 80)) + fmt.Println("GPU Utilization Summary") + fmt.Println(strings.Repeat("=", 80)) + fmt.Printf("Worker Pods: %d\n", len(workerPods.Items)) + fmt.Printf("Workers with GPU: %d\n", workersWithGPU) + fmt.Printf("Total GPUs: %d\n", totalGPUs) + + if totalGPUs > 0 { + fmt.Printf("\nGPU Allocation per Pod:\n") + for _, info := range gpuInfo { + if gpuCount, ok := info["gpu_count"].(int64); ok { + fmt.Printf(" - %s: %d GPU(s)\n", info["pod_name"], gpuCount) + } + } + } + + fmt.Println(strings.Repeat("=", 80)) + fmt.Println("[Test] ✅ GPU utilization check completed") + fmt.Println("[Test] Note: Detailed GPU metrics require nvidia-smi or Kubernetes metrics server") + } + + return nil +} diff --git a/e2e/testcases/dynamo_health_check.go b/e2e/testcases/dynamo_health_check.go new file mode 100644 index 000000000..bdd06f3bc --- /dev/null +++ b/e2e/testcases/dynamo_health_check.go @@ -0,0 +1,263 @@ +package testcases + +import ( + "context" + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" +) + +func init() { + pkgtestcases.Register("dynamo-health-check", pkgtestcases.TestCase{ + Description: "Verify Dynamo runtime components are healthy", + Tags: []string{"dynamo", "health", "functional"}, + Fn: testDynamoHealthCheck, + }) +} + +func testDynamoHealthCheck(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + if opts.Verbose { + fmt.Println("[Test] Checking Dynamo health") + } + + namespace := "dynamo-system" + + // Check Dynamo Platform components (deployed via Helm) + // The platform chart deploys: etcd, NATS, and operator + if opts.Verbose { + fmt.Println("[Test] Checking Dynamo Platform components...") + } + + // Check Dynamo operator (controller manager) + operatorDeployment, err := client.AppsV1().Deployments(namespace).Get(ctx, "dynamo-platform-dynamo-operator-controller-manager", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get dynamo-operator deployment: %w", err) + } + + if operatorDeployment.Status.ReadyReplicas == 0 { + return fmt.Errorf("dynamo-operator has 0 ready replicas") + } + + if opts.Verbose { + fmt.Printf("[Test] Dynamo Operator: %d/%d replicas ready\n", + operatorDeployment.Status.ReadyReplicas, operatorDeployment.Status.Replicas) + } + + // Check etcd StatefulSet + etcdStatefulSet, err := client.AppsV1().StatefulSets(namespace).Get(ctx, "dynamo-platform-etcd", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get etcd statefulset: %w", err) + } + + if etcdStatefulSet.Status.ReadyReplicas == 0 { + return fmt.Errorf("etcd has 0 ready replicas") + } + + if opts.Verbose { + fmt.Printf("[Test] etcd: %d/%d replicas ready\n", + etcdStatefulSet.Status.ReadyReplicas, etcdStatefulSet.Status.Replicas) + } + + // Check NATS StatefulSet + natsStatefulSet, err := client.AppsV1().StatefulSets(namespace).Get(ctx, "dynamo-platform-nats", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get nats statefulset: %w", err) + } + + if natsStatefulSet.Status.ReadyReplicas == 0 { + return fmt.Errorf("NATS has 0 ready replicas") + } + + if opts.Verbose { + fmt.Printf("[Test] NATS: %d/%d replicas ready\n", + natsStatefulSet.Status.ReadyReplicas, natsStatefulSet.Status.Replicas) + } + + // Check operator pods + operatorPods, err := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: "control-plane=controller-manager", + }) + if err != nil { + return fmt.Errorf("failed to list dynamo-operator pods: %w", err) + } + + healthyOperatorPods := 0 + for _, pod := range operatorPods.Items { + if pod.Status.Phase == corev1.PodRunning { + allContainersReady := true + for _, containerStatus := range pod.Status.ContainerStatuses { + if !containerStatus.Ready { + allContainersReady = false + break + } + } + if allContainersReady { + healthyOperatorPods++ + } + } + } + + if healthyOperatorPods == 0 { + return fmt.Errorf("no healthy dynamo-operator pods found") + } + + if opts.Verbose { + fmt.Printf("[Test] Dynamo Operator: %d healthy pod(s)\n", healthyOperatorPods) + } + + // Check for Dynamo Frontend (if operator created it from DynamoGraphDeployment) + if opts.Verbose { + fmt.Println("[Test] Checking for Dynamo Frontend (operator-created)...") + } + + // Check Frontend deployment + frontendDeployment, err := client.AppsV1().Deployments(namespace).Get(ctx, "vllm-frontend", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get frontend deployment: %w", err) + } + if frontendDeployment.Status.ReadyReplicas == 0 { + return fmt.Errorf("frontend has 0 ready replicas") + } + if opts.Verbose { + fmt.Printf("[Test] ✅ Frontend: %d/%d replicas ready\n", + frontendDeployment.Status.ReadyReplicas, frontendDeployment.Status.Replicas) + } + + // Check for Prefill Worker (disaggregated deployment) + prefillWorkerNames := []string{"vllm-vllmprefillworker", "vllm-prefillworker"} + prefillFound := false + for _, name := range prefillWorkerNames { + deployment, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err == nil && deployment.Status.ReadyReplicas > 0 { + if opts.Verbose { + fmt.Printf("[Test] ✅ Prefill Worker %s: %d/%d replicas ready\n", + name, deployment.Status.ReadyReplicas, deployment.Status.Replicas) + } + prefillFound = true + break + } + } + if !prefillFound && opts.Verbose { + fmt.Println("[Test] ⚠️ Prefill Worker not found (may be using aggregated deployment)") + } + + // Check for Decode Worker + decodeWorkerNames := []string{"vllm-vllmdecodeworker", "vllm-decodeworker"} + decodeFound := false + for _, name := range decodeWorkerNames { + deployment, err := client.AppsV1().Deployments(namespace).Get(ctx, name, metav1.GetOptions{}) + if err == nil && deployment.Status.ReadyReplicas > 0 { + if opts.Verbose { + fmt.Printf("[Test] ✅ Decode Worker %s: %d/%d replicas ready\n", + name, deployment.Status.ReadyReplicas, deployment.Status.Replicas) + } + decodeFound = true + break + } + } + if !decodeFound { + return fmt.Errorf("decode worker deployment not found or not ready") + } + + // Check worker pods (these coordinate via Dynamo's etcd/NATS) + // Dynamo operator creates pods with labels like nvidia.com/dynamo-component=VLLMDecodeWorker + if opts.Verbose { + fmt.Println("[Test] Checking worker pods...") + } + + // Try multiple label selectors that Dynamo operator might use + // Includes both Prefill and Decode workers for disaggregated deployment + workerLabelSelectors := []string{ + "nvidia.com/dynamo-component=VLLMDecodeWorker", + "nvidia.com/dynamo-component=VLLMPrefillWorker", + "nvidia.com/dynamo-component-type=worker", + "nvidia.com/dynamo-graph-deployment-name=vllm", + } + + var workerPods *corev1.PodList + for _, selector := range workerLabelSelectors { + workerPods, err = client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: selector, + }) + if err == nil && len(workerPods.Items) > 0 { + if opts.Verbose { + fmt.Printf("[Test] Found worker pods using selector: %s\n", selector) + } + break + } + } + + if workerPods == nil || len(workerPods.Items) == 0 { + // If no worker pods found with specific labels, try listing all pods in namespace + // and filter by name pattern + allPods, listErr := client.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if listErr != nil { + return fmt.Errorf("failed to list pods: %w", listErr) + } + + workerPods = &corev1.PodList{} + for _, pod := range allPods.Items { + // Match pods with "vllmdecodeworker" in the name (Dynamo operator naming) + if strings.Contains(strings.ToLower(pod.Name), "vllmdecodeworker") || + strings.Contains(strings.ToLower(pod.Name), "worker") { + workerPods.Items = append(workerPods.Items, pod) + } + } + + if len(workerPods.Items) == 0 { + return fmt.Errorf("no worker pods found in namespace %s", namespace) + } + + if opts.Verbose { + fmt.Printf("[Test] Found %d worker pods by name pattern\n", len(workerPods.Items)) + } + } + + healthyWorkerPods := 0 + for _, pod := range workerPods.Items { + if pod.Status.Phase == corev1.PodRunning { + allContainersReady := true + for _, containerStatus := range pod.Status.ContainerStatuses { + if !containerStatus.Ready { + allContainersReady = false + break + } + } + if allContainersReady { + healthyWorkerPods++ + } + } + } + + if healthyWorkerPods == 0 { + return fmt.Errorf("no healthy worker pods found") + } + + if opts.Verbose { + fmt.Printf("[Test] Worker pods: %d/%d healthy\n", healthyWorkerPods, len(workerPods.Items)) + } + + // Set details for reporting + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "operator_replicas_ready": operatorDeployment.Status.ReadyReplicas, + "operator_replicas_total": operatorDeployment.Status.Replicas, + "etcd_replicas_ready": etcdStatefulSet.Status.ReadyReplicas, + "etcd_replicas_total": etcdStatefulSet.Status.Replicas, + "nats_replicas_ready": natsStatefulSet.Status.ReadyReplicas, + "nats_replicas_total": natsStatefulSet.Status.Replicas, + "healthy_operator_pods": healthyOperatorPods, + }) + } + + if opts.Verbose { + fmt.Println("[Test] ✅ Dynamo health check passed") + } + + return nil +} diff --git a/e2e/testcases/dynamo_optimized_inference.go b/e2e/testcases/dynamo_optimized_inference.go new file mode 100644 index 000000000..f602c0a4b --- /dev/null +++ b/e2e/testcases/dynamo_optimized_inference.go @@ -0,0 +1,190 @@ +package testcases + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("dynamo-optimized-inference", pkgtestcases.TestCase{ + Description: "Test inference with Dynamo optimizations enabled (KV cache, dynamic batching)", + Tags: []string{"dynamo", "inference", "optimization"}, + Fn: testDynamoOptimizedInference, + }) +} + +func testDynamoOptimizedInference(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + if opts.Verbose { + fmt.Println("[Test] Testing optimized inference with Dynamo") + } + + // Setup service connection and get local port + localPort, stopPortForward, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPortForward() + + // Send multiple requests to test dynamic batching and KV cache + testCases := []struct { + name string + content string + }{ + {"math", "What is 15 * 23? Show your work."}, + {"science", "Explain how photosynthesis works in plants."}, + {"general", "What are the main benefits of renewable energy?"}, + } + + var results []InferenceResult + totalLatency := time.Duration(0) + successCount := 0 + + for i, tc := range testCases { + if opts.Verbose { + fmt.Printf("[Test] Sending request %d/%d: %s\n", i+1, len(testCases), tc.name) + } + + result := sendInferenceRequest(ctx, localPort, tc.content, opts.Verbose) + results = append(results, result) + totalLatency += result.Latency + + if result.Success { + successCount++ + } else if opts.Verbose { + fmt.Printf("[Test] Request %d (%s) failed: %s\n", i+1, tc.name, result.Error) + } + + // Small delay between requests to allow batching + if i < len(testCases)-1 { + time.Sleep(100 * time.Millisecond) + } + } + + // Calculate metrics + actualSuccessRate := float64(successCount) / float64(len(testCases)) + avgLatency := time.Duration(0) + if successCount > 0 { + avgLatency = totalLatency / time.Duration(successCount) + } + + // Collect error messages for debugging + var errors []string + for _, r := range results { + if !r.Success && r.Error != "" { + errors = append(errors, r.Error) + } + } + + // Set details for reporting BEFORE checking success rate (so we can see what happened on failure) + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "total_requests": len(testCases), + "successful": successCount, + "success_rate": fmt.Sprintf("%.0f%%", actualSuccessRate*100), + "avg_latency_ms": avgLatency.Milliseconds(), + "optimizations": []string{"kv_cache", "dynamic_batching"}, + "errors": errors, + }) + } + + if opts.Verbose { + fmt.Printf("[Test] Requests successful: %d/%d (%.0f%%)\n", successCount, len(testCases), actualSuccessRate*100) + if len(errors) > 0 { + fmt.Printf("[Test] Errors: %v\n", errors) + } + } + + // Require at least 50% success rate (allows for transient failures) + minSuccessRate := 0.5 + if actualSuccessRate < minSuccessRate { + return fmt.Errorf("success rate %.0f%% below minimum %.0f%% (%d/%d requests succeeded)", + actualSuccessRate*100, minSuccessRate*100, successCount, len(testCases)) + } + + if opts.Verbose { + fmt.Printf("[Test] ✅ Optimized inference test completed\n") + fmt.Printf("[Test] Average latency: %v\n", avgLatency) + } + + return nil +} + +type InferenceResult struct { + Success bool + Latency time.Duration + Error string +} + +func sendInferenceRequest(ctx context.Context, localPort, content string, verbose bool) InferenceResult { + result := InferenceResult{ + Success: false, + } + + start := time.Now() + + requestBody := map[string]interface{}{ + "model": "MoM", + "messages": []map[string]string{ + { + "role": "user", + "content": content, + }, + }, + // Request streaming to test batching + "stream": false, + } + + jsonData, err := json.Marshal(requestBody) + if err != nil { + result.Error = fmt.Sprintf("marshal error: %v", err) + result.Latency = time.Since(start) + return result + } + + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + result.Error = fmt.Sprintf("create request error: %v", err) + result.Latency = time.Since(start) + return result + } + + req.Header.Set("Content-Type", "application/json") + + httpClient := &http.Client{ + Timeout: 60 * time.Second, + } + + resp, err := httpClient.Do(req) + if err != nil { + result.Error = fmt.Sprintf("send request error: %v", err) + result.Latency = time.Since(start) + return result + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + result.Error = fmt.Sprintf("read response error: %v", err) + result.Latency = time.Since(start) + result.Success = resp.StatusCode == http.StatusOK + return result + } + + result.Latency = time.Since(start) + result.Success = resp.StatusCode == http.StatusOK + + if !result.Success { + result.Error = fmt.Sprintf("status %d: %s", resp.StatusCode, string(body)) + } + + return result +} diff --git a/e2e/testcases/dynamo_performance_comparison.go b/e2e/testcases/dynamo_performance_comparison.go new file mode 100644 index 000000000..699294711 --- /dev/null +++ b/e2e/testcases/dynamo_performance_comparison.go @@ -0,0 +1,183 @@ +package testcases + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + pkgtestcases "github.com/vllm-project/semantic-router/e2e/pkg/testcases" + "k8s.io/client-go/kubernetes" +) + +func init() { + pkgtestcases.Register("dynamo-performance-comparison", pkgtestcases.TestCase{ + Description: "Compare performance metrics with Dynamo optimizations (latency, throughput)", + Tags: []string{"dynamo", "performance", "benchmark"}, + Fn: testDynamoPerformanceComparison, + }) +} + +func testDynamoPerformanceComparison(ctx context.Context, client *kubernetes.Clientset, opts pkgtestcases.TestCaseOptions) error { + if opts.Verbose { + fmt.Println("[Test] Running performance comparison test") + } + + // Setup service connection and get local port + localPort, stopPortForward, err := setupServiceConnection(ctx, client, opts) + if err != nil { + return err + } + defer stopPortForward() + + // Test with a batch of requests to measure throughput + const batchSize = 10 + testContent := "What is the capital of France? Provide a brief explanation." + + if opts.Verbose { + fmt.Printf("[Test] Sending %d requests to measure performance\n", batchSize) + } + + var latencies []time.Duration + successCount := 0 + startTime := time.Now() + + for i := 0; i < batchSize; i++ { + reqStart := time.Now() + + requestBody := map[string]interface{}{ + "model": "MoM", + "messages": []map[string]string{ + { + "role": "user", + "content": testContent, + }, + }, + } + + jsonData, err := json.Marshal(requestBody) + if err != nil { + if opts.Verbose { + fmt.Printf("[Test] Request %d: marshal error: %v\n", i+1, err) + } + continue + } + + url := fmt.Sprintf("http://localhost:%s/v1/chat/completions", localPort) + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + if opts.Verbose { + fmt.Printf("[Test] Request %d: create request error: %v\n", i+1, err) + } + continue + } + + req.Header.Set("Content-Type", "application/json") + + httpClient := &http.Client{ + Timeout: 30 * time.Second, + } + + resp, err := httpClient.Do(req) + if err != nil { + if opts.Verbose { + fmt.Printf("[Test] Request %d: send error: %v\n", i+1, err) + } + continue + } + + _, err = io.ReadAll(resp.Body) + resp.Body.Close() + + latency := time.Since(reqStart) + latencies = append(latencies, latency) + + if resp.StatusCode == http.StatusOK && err == nil { + successCount++ + } + + // Small delay between requests + if i < batchSize-1 { + time.Sleep(50 * time.Millisecond) + } + } + + totalTime := time.Since(startTime) + throughput := float64(successCount) / totalTime.Seconds() // requests per second + + // Calculate latency statistics + var totalLatency time.Duration + minLatency := time.Hour + maxLatency := time.Duration(0) + + for _, lat := range latencies { + totalLatency += lat + if lat < minLatency { + minLatency = lat + } + if lat > maxLatency { + maxLatency = lat + } + } + + avgLatency := totalLatency / time.Duration(len(latencies)) + p50Latency := calculatePercentile(latencies, 50) + p95Latency := calculatePercentile(latencies, 95) + p99Latency := calculatePercentile(latencies, 99) + + // Set details for reporting + if opts.SetDetails != nil { + opts.SetDetails(map[string]interface{}{ + "total_requests": batchSize, + "successful": successCount, + "success_rate": fmt.Sprintf("%.2f%%", float64(successCount)/float64(batchSize)*100), + "throughput_rps": fmt.Sprintf("%.2f", throughput), + "avg_latency_ms": avgLatency.Milliseconds(), + "min_latency_ms": minLatency.Milliseconds(), + "max_latency_ms": maxLatency.Milliseconds(), + "p50_latency_ms": p50Latency.Milliseconds(), + "p95_latency_ms": p95Latency.Milliseconds(), + "p99_latency_ms": p99Latency.Milliseconds(), + "total_time_seconds": totalTime.Seconds(), + }) + } + + // Print summary + if opts.Verbose { + separator := strings.Repeat("=", 80) + fmt.Println("\n" + separator) + fmt.Println("Performance Comparison Results") + fmt.Println(separator) + fmt.Printf("Total Requests: %d\n", batchSize) + fmt.Printf("Successful: %d (%.2f%%)\n", successCount, float64(successCount)/float64(batchSize)*100) + fmt.Printf("Throughput: %.2f requests/second\n", throughput) + fmt.Printf("Average Latency: %v (%.2f ms)\n", avgLatency, float64(avgLatency.Milliseconds())) + fmt.Printf("Min Latency: %v (%.2f ms)\n", minLatency, float64(minLatency.Milliseconds())) + fmt.Printf("Max Latency: %v (%.2f ms)\n", maxLatency, float64(maxLatency.Milliseconds())) + fmt.Printf("P50 Latency: %v (%.2f ms)\n", p50Latency, float64(p50Latency.Milliseconds())) + fmt.Printf("P95 Latency: %v (%.2f ms)\n", p95Latency, float64(p95Latency.Milliseconds())) + fmt.Printf("P99 Latency: %v (%.2f ms)\n", p99Latency, float64(p99Latency.Milliseconds())) + fmt.Printf("Total Time: %v\n", totalTime) + fmt.Println(separator) + fmt.Println("[Test] ✅ Performance comparison test completed") + } + + return nil +} + +func calculatePercentile(latencies []time.Duration, percentile int) time.Duration { + if len(latencies) == 0 { + return 0 + } + + // Simple percentile calculation (not exact but good enough for testing) + index := (len(latencies) * percentile) / 100 + if index >= len(latencies) { + index = len(latencies) - 1 + } + return latencies[index] +} diff --git a/tools/make/e2e.mk b/tools/make/e2e.mk index 9627a9c5d..f2a57de32 100644 --- a/tools/make/e2e.mk +++ b/tools/make/e2e.mk @@ -44,6 +44,11 @@ e2e-test-ai-gateway: ## Run E2E tests with AI Gateway profile e2e-test-ai-gateway: E2E_PROFILE=ai-gateway e2e-test-ai-gateway: e2e-test +# Run E2E tests with Dynamo profile (requires GPU) +e2e-test-dynamo: ## Run E2E tests with Dynamo profile (requires 3+ GPUs) +e2e-test-dynamo: E2E_PROFILE=dynamo +e2e-test-dynamo: e2e-test + # Run E2E tests and keep cluster for debugging e2e-test-debug: ## Run E2E tests and keep cluster for debugging e2e-test-debug: E2E_KEEP_CLUSTER=true @@ -96,6 +101,7 @@ e2e-help: ## Show help for E2E testing @echo "Available Profiles:" @echo " ai-gateway - Test Semantic Router with Envoy AI Gateway" @echo " aibrix - Test Semantic Router with vLLM AIBrix" + @echo " dynamo - Test Semantic Router with Nvidia Dynamo (requires 3+ GPUs)" @echo " istio - Test Semantic Router with Istio service mesh" @echo " llm-d - Test Semantic Router with LLM-D" @echo " production-stack - Test Semantic Router in production-like stack (HA/LB/Obs)" @@ -115,6 +121,7 @@ e2e-help: ## Show help for E2E testing @echo "Common Commands:" @echo " make e2e-test # Run all tests with default profile" @echo " make e2e-test E2E_PROFILE=ai-gateway # Run AI Gateway tests" + @echo " make e2e-test-dynamo # Run Dynamo tests (requires GPU)" @echo " make e2e-test-debug # Run tests and keep cluster" @echo " make e2e-test-specific E2E_TESTS=\"test1,test2\" # Run specific tests" @echo " make e2e-cleanup # Clean up test cluster" diff --git a/tools/observability/prometheus.yaml b/tools/observability/prometheus.yaml index 741721248..437ae8579 100644 --- a/tools/observability/prometheus.yaml +++ b/tools/observability/prometheus.yaml @@ -22,9 +22,9 @@ scrape_configs: service: semantic-router environment: docker - # Optional: Envoy proxy metrics - # Uncomment if Envoy is running - # - job_name: envoy +# Optional: Envoy proxy metrics +# Uncomment if Envoy is running +# - job_name: envoy # metrics_path: /stats/prometheus # static_configs: # - targets: ["${ENVOY_TARGET:-envoy-proxy:19000}"]