diff --git a/tools/openwebui-pipe/vllm_semantic_router_pipe.py b/tools/openwebui-pipe/vllm_semantic_router_pipe.py new file mode 100644 index 00000000..730418b8 --- /dev/null +++ b/tools/openwebui-pipe/vllm_semantic_router_pipe.py @@ -0,0 +1,545 @@ +""" +title: vLLM Semantic Router Pipe +author: open-webui +date: 2025-10-01 +version: 1.0 +license: Apache-2.0 +description: A pipe for proxying requests to vLLM Semantic Router and displaying decision headers (category, reasoning, model, injection). +requirements: requests, pydantic +""" + +import json +from typing import Generator, Iterator, List, Union + +import requests +from pydantic import BaseModel + + +class Pipeline: + class Valves(BaseModel): + # vLLM Semantic Router endpoint URL + vsr_base_url: str = "http://localhost:8000" + + # API key for authentication (if required) + api_key: str = "" + + # Enable/disable displaying VSR headers in the UI + show_vsr_info: bool = True + + # Enable/disable logging VSR headers to console + log_vsr_info: bool = True + + # Enable/disable debug logging + debug: bool = True + + # Request timeout in seconds + timeout: int = 300 + + def __init__(self): + # Important: type should be "manifold" instead of "pipe" + # manifold type Pipeline will be displayed in the model list + self.type = "manifold" + self.id = "vllm_semantic_router" + self.name = "vllm-semantic-router/" + + # Initialize valves + self.valves = self.Valves( + **{ + "vsr_base_url": "http://localhost:8000", + "api_key": "", + "show_vsr_info": True, + "log_vsr_info": True, + "debug": True, + "timeout": 300, + } + ) + + # Store VSR headers from the last request + self.last_vsr_headers = {} + + print("=" * 80) + print("šŸš€ vLLM Semantic Router Pipe - Initialization") + print("=" * 80) + print(f" Type: {self.type}") + print(f" ID: {self.id}") + print(f" Name: {self.name}") + print(f" VSR Base URL: {self.valves.vsr_base_url}") + print(f" Debug Mode: {self.valves.debug}") + print("=" * 80) + + async def on_startup(self): + print("\n" + "=" * 80) + print("šŸ”„ on_startup: vLLM Semantic Router Pipe initialized") + print("=" * 80) + print(f" VSR Base URL: {self.valves.vsr_base_url}") + print(f" API Key: {'***' if self.valves.api_key else '(not set)'}") + print(f" Show VSR Info: {self.valves.show_vsr_info}") + print(f" Log VSR Info: {self.valves.log_vsr_info}") + print(f" Debug: {self.valves.debug}") + print(f" Timeout: {self.valves.timeout}s") + + # Test if pipes() is being called + pipes_list = self.pipes() + print(f"\nšŸ“‹ Available Pipes/Models:") + for pipe in pipes_list: + print(f" - ID: {pipe['id']}") + print(f" Name: {pipe['name']}") + print("=" * 80 + "\n") + + async def on_shutdown(self): + print("\n" + "=" * 80) + print("šŸ›‘ on_shutdown: vLLM Semantic Router Pipe") + print("=" * 80 + "\n") + + async def on_valves_updated(self): + print("\n" + "=" * 80) + print("āš™ļø on_valves_updated: vLLM Semantic Router Pipe valves updated") + print("=" * 80) + print(f" VSR Base URL: {self.valves.vsr_base_url}") + print(f" API Key: {'***' if self.valves.api_key else '(not set)'}") + print(f" Show VSR Info: {self.valves.show_vsr_info}") + print(f" Log VSR Info: {self.valves.log_vsr_info}") + print(f" Debug: {self.valves.debug}") + print(f" Timeout: {self.valves.timeout}s") + print("=" * 80 + "\n") + + def pipelines(self) -> List[dict]: + """ + Important: manifold type uses pipelines() method instead of pipes() + The returned model list will be displayed in Open WebUI's model selector + """ + pipelines_list = [ + { + "id": "vllm-semantic-router-auto", + "name": "vllm-semantic-router/auto", + } + ] + + if self.valves.debug: + print("\n" + "=" * 80) + print("šŸ“ž pipelines() method called - Returning available models") + print("=" * 80) + for pipeline in pipelines_list: + print(f" - ID: {pipeline['id']}") + print(f" Name: {pipeline['name']}") + print("=" * 80 + "\n") + + return pipelines_list + + def _extract_vsr_headers(self, headers: dict) -> dict: + """ + Extract VSR-specific headers from response headers. + """ + vsr_headers = {} + + # List of VSR headers to extract + vsr_header_keys = [ + "x-vsr-selected-category", + "x-vsr-selected-reasoning", + "x-vsr-selected-model", + "x-vsr-injected-system-prompt", + "x-vsr-cache-hit", + ] + + # Extract headers (case-insensitive) + for key in vsr_header_keys: + # Try lowercase + value = headers.get(key) + if not value: + # Try uppercase + value = headers.get(key.upper()) + if not value: + # Try title case + value = headers.get(key.title()) + + if value: + vsr_headers[key] = value + + return vsr_headers + + def _format_vsr_info(self, vsr_headers: dict, position: str = "prefix") -> str: + """ + Format VSR headers into a readable message for display. + + Args: + vsr_headers: VSR decision headers + position: "prefix" (before response) or "suffix" (after response) + """ + if not vsr_headers: + return "" + + vsr_message_parts = [] + + if vsr_headers.get("x-vsr-selected-category"): + vsr_message_parts.append( + f"šŸ“‚ **User Intent Category**: {vsr_headers['x-vsr-selected-category']}" + ) + + if vsr_headers.get("x-vsr-selected-reasoning"): + reasoning = vsr_headers["x-vsr-selected-reasoning"] + reasoning_emoji = "🧠" if reasoning == "on" else "⚔" + vsr_message_parts.append( + f"{reasoning_emoji} **Chain-of-Thought**: {reasoning}" + ) + + if vsr_headers.get("x-vsr-selected-model"): + vsr_message_parts.append( + f"🄷 **Hidden Model**: {vsr_headers['x-vsr-selected-model']}" + ) + + if vsr_headers.get("x-vsr-injected-system-prompt"): + injection = vsr_headers["x-vsr-injected-system-prompt"] + injection_emoji = "šŸŽÆ" if injection == "true" else "🚫" + vsr_message_parts.append( + f"{injection_emoji} **System Prompt Optimized**: {injection}" + ) + + # Add cache hit information + if vsr_headers.get("x-vsr-cache-hit"): + cache_hit = vsr_headers["x-vsr-cache-hit"].lower() + if cache_hit == "true": + vsr_message_parts.append(f"šŸ”„ **Semantic Cache**: Hit (Fast Response)") + + if vsr_message_parts: + if position == "prefix": + # Before response: VSR info + separator + response content + return ( + "**šŸ”€ vLLM Semantic Router Decision šŸ”€**\n\n" + + "\n\n".join(vsr_message_parts) + + "\n\n---\n\n" + ) + else: + # After response: response content + separator + VSR info + return ( + "\n\n---\n\n**šŸ”€ vLLM Semantic Router Decision šŸ”€**\n\n" + + "\n\n".join(vsr_message_parts) + ) + + return "" + + def _log_vsr_info(self, vsr_headers: dict): + """ + Log VSR information to console. + """ + if not vsr_headers or not self.valves.log_vsr_info: + return + + print("=" * 60) + print("vLLM Semantic Router Decision:") + print("=" * 60) + + if vsr_headers.get("x-vsr-selected-category"): + print(f" Category: {vsr_headers['x-vsr-selected-category']}") + + if vsr_headers.get("x-vsr-selected-reasoning"): + print(f" Reasoning Mode: {vsr_headers['x-vsr-selected-reasoning']}") + + if vsr_headers.get("x-vsr-selected-model"): + print(f" Selected Model: {vsr_headers['x-vsr-selected-model']}") + + if vsr_headers.get("x-vsr-injected-system-prompt"): + print( + f" System Prompt Injected: {vsr_headers['x-vsr-injected-system-prompt']}" + ) + + if vsr_headers.get("x-vsr-cache-hit"): + cache_hit = vsr_headers["x-vsr-cache-hit"].lower() + print(f" Cache Hit: {cache_hit}") + + print("=" * 60) + + def pipe( + self, user_message: str, model_id: str, messages: List[dict], body: dict + ) -> Union[str, Generator, Iterator]: + """ + Main pipe function that handles the request/response flow. + + Manifold type pipe() method signature: + - user_message: User's last message + - model_id: Selected model ID + - messages: Complete message history + - body: Complete request body + """ + + if self.valves.debug: + print("\n" + "=" * 80) + print("šŸ”„ pipe() method called - Processing request") + print("=" * 80) + print( + f" User message: {user_message[:100]}..." + if len(user_message) > 100 + else f" User message: {user_message}" + ) + print(f" Model ID: {model_id}") + print(f" Model requested: {body.get('model', 'N/A')}") + print(f" Stream mode: {body.get('stream', False)}") + print(f" Messages count: {len(messages)}") + print("=" * 80) + + # Prepare the request to vLLM Semantic Router + url = f"{self.valves.vsr_base_url}/v1/chat/completions" + + if self.valves.debug: + print(f"\nšŸ“” Sending request to: {url}") + + headers = { + "Content-Type": "application/json", + } + + if self.valves.api_key: + headers["Authorization"] = f"Bearer {self.valves.api_key}" + if self.valves.debug: + print(f" Authorization: Bearer ***") + + # Important: Change model in body to "auto" + # VSR backend only accepts model="auto", then automatically selects model based on request content + request_body = body.copy() + original_model = request_body.get("model", "N/A") + request_body["model"] = "auto" + + if self.valves.debug: + print(f"\nšŸ”„ Model mapping:") + print(f" Original model: {original_model}") + print(f" Sending to VSR: auto") + + # Check if streaming is requested + is_streaming = request_body.get("stream", False) + + if self.valves.debug: + print(f" Streaming: {is_streaming}") + print(f" Timeout: {self.valves.timeout}s") + + try: + if self.valves.debug: + print(f"\nšŸ”Œ Connecting to vLLM Semantic Router...") + + response = requests.post( + url, + json=request_body, # Use modified request_body + headers=headers, + timeout=self.valves.timeout, + stream=request_body.get("stream", False), + ) + + if self.valves.debug: + print(f"āœ… Response received - Status: {response.status_code}") + print(f" Response headers count: {len(response.headers)}") + + # Check for HTTP errors + if response.status_code != 200: + error_msg = f"Error: vLLM Semantic Router returned status {response.status_code}" + if self.valves.debug: + print(f"\nāŒ {error_msg}") + print(f" Response text: {response.text[:500]}") + print("=" * 80 + "\n") + return f"{error_msg}: {response.text}" + + # Extract VSR headers from response + vsr_headers = self._extract_vsr_headers(dict(response.headers)) + self.last_vsr_headers = vsr_headers + + if self.valves.debug: + print(f" VSR headers found: {len(vsr_headers)}") + for key, value in vsr_headers.items(): + print(f" {key}: {value}") + + # Print all response headers for debugging + print(f"\n All response headers:") + for key, value in response.headers.items(): + if key.lower().startswith("x-vsr"): + print(f" {key}: {value}") + + # Log VSR information + self._log_vsr_info(vsr_headers) + + if is_streaming: + if self.valves.debug: + print(f"\nšŸ“ŗ Handling streaming response...") + # Handle streaming response + return self._handle_streaming_response(response, vsr_headers) + else: + if self.valves.debug: + print(f"\nšŸ“„ Handling non-streaming response...") + print(f" Response status: {response.status_code}") + print(f" Response content length: {len(response.content)}") + print( + f" Response content type: {response.headers.get('content-type', 'unknown')}" + ) + + # Check if response is empty + if not response.content: + error_msg = "Error: Empty response from vLLM Semantic Router" + if self.valves.debug: + print(f"\nāŒ {error_msg}") + print("=" * 80 + "\n") + return error_msg + + # Try to parse JSON response + try: + response_data = response.json() + except json.JSONDecodeError as e: + error_msg = ( + f"Error: Invalid JSON response from vLLM Semantic Router" + ) + if self.valves.debug: + print(f"\nāŒ {error_msg}") + print(f" JSON error: {str(e)}") + print( + f" Response text (first 500 chars): {response.text[:500]}" + ) + print("=" * 80 + "\n") + return f"{error_msg}: {str(e)}" + + if self.valves.debug: + print(f" Response data keys: {list(response_data.keys())}") + if "choices" in response_data: + print(f" Choices count: {len(response_data['choices'])}") + + # Add VSR info to the response if enabled + if self.valves.show_vsr_info and vsr_headers: + vsr_info = self._format_vsr_info(vsr_headers, position="prefix") + + if self.valves.debug: + print( + f" Adding VSR info to response (length: {len(vsr_info)})" + ) + + # Prepend to the assistant's message + if "choices" in response_data and len(response_data["choices"]) > 0: + for choice in response_data["choices"]: + if "message" in choice and "content" in choice["message"]: + choice["message"]["content"] = ( + vsr_info + choice["message"]["content"] + ) + if self.valves.debug: + print(f" āœ… VSR info prepended to response") + + if self.valves.debug: + print(f"\nāœ… Request completed successfully") + print("=" * 80 + "\n") + + return response_data + + except requests.exceptions.Timeout: + error_msg = f"Error: Request to vLLM Semantic Router timed out after {self.valves.timeout} seconds" + if self.valves.debug: + print(f"\nāŒ {error_msg}") + print("=" * 80 + "\n") + return error_msg + except Exception as e: + error_msg = ( + f"Error: Failed to communicate with vLLM Semantic Router: {str(e)}" + ) + if self.valves.debug: + print(f"\nāŒ {error_msg}") + print(f" Exception type: {type(e).__name__}") + print(f" Exception details: {str(e)}") + print("=" * 80 + "\n") + return error_msg + + def _handle_streaming_response( + self, response: requests.Response, vsr_headers: dict + ) -> Generator: + """ + Handle streaming SSE response from vLLM Semantic Router. + Manually parse SSE stream, no need for sseclient-py dependency. + + Strategy: + 1. Add VSR info before the first content chunk (if enabled) + 2. Detect VSR header updates during streaming (via SSE events) + 3. Ensure it's only added once + """ + vsr_info_added = False + first_content_chunk = True # Mark whether it's the first content chunk + # Use initial vsr_headers, but may be updated during streaming + current_vsr_headers = vsr_headers.copy() + + if self.valves.debug: + print(f"\nšŸ“ Initial VSR headers:") + for key, value in current_vsr_headers.items(): + print(f" {key}: {value}") + + # Read streaming response line by line + for line in response.iter_lines(decode_unicode=True): + if not line: + continue + + # SSE format: data: {...} + if line.startswith("data: "): + data_str = line[6:].strip() # Remove "data: " prefix + + if data_str == "[DONE]": + yield f"data: [DONE]\n\n" + + if self.valves.debug: + print( + f"āœ… Streaming completed, VSR info added: {vsr_info_added}" + ) + else: + try: + chunk_data = json.loads(data_str) + + # Check if chunk contains updated VSR header information + # Some SSE implementations may include updated headers in chunk metadata + if "vsr_headers" in chunk_data: + if self.valves.debug: + print(f"šŸ”„ VSR headers updated in stream:") + for key, value in chunk_data["vsr_headers"].items(): + full_key = ( + f"x-vsr-{key}" + if not key.startswith("x-vsr-") + else key + ) + if current_vsr_headers.get(full_key) != value: + if self.valves.debug: + print( + f" {full_key}: {current_vsr_headers.get(full_key)} → {value}" + ) + current_vsr_headers[full_key] = value + + # Add VSR info before the first content chunk + if ( + first_content_chunk + and self.valves.show_vsr_info + and not vsr_info_added + ): + if ( + "choices" in chunk_data + and len(chunk_data["choices"]) > 0 + ): + choice = chunk_data["choices"][0] + delta = choice.get("delta", {}) + + # Check if there is content (role or content) + if "role" in delta or "content" in delta: + if self.valves.debug: + print( + f"āœ… Adding VSR info at first content chunk" + ) + print(f" VSR headers:") + for key, value in current_vsr_headers.items(): + print(f" {key}: {value}") + + # Format VSR info (using prefix mode) + vsr_info = self._format_vsr_info( + current_vsr_headers, position="prefix" + ) + + # Add VSR info before the first content + current_content = delta.get("content", "") + delta["content"] = vsr_info + current_content + chunk_data["choices"][0]["delta"] = delta + vsr_info_added = True + first_content_chunk = False + + # If not the first chunk, mark as False + if "choices" in chunk_data and len(chunk_data["choices"]) > 0: + choice = chunk_data["choices"][0] + delta = choice.get("delta", {}) + if "role" in delta or "content" in delta: + first_content_chunk = False + + yield f"data: {json.dumps(chunk_data)}\n\n" + except json.JSONDecodeError: + # If not valid JSON, pass through as-is + yield f"data: {data_str}\n\n" diff --git a/website/docs/tutorials/observability/open-webui-integration.md b/website/docs/tutorials/observability/open-webui-integration.md new file mode 100644 index 00000000..e9a40289 --- /dev/null +++ b/website/docs/tutorials/observability/open-webui-integration.md @@ -0,0 +1,102 @@ +# Open WebUI Integration + +This guide shows how to integrate vLLM Semantic Router with Open WebUI using the provided pipe. + +## Architecture + +```mermaid +graph LR + A[Open WebUI] --> B[vLLM Semantic Router Pipe] + B --> C[vLLM Semantic Router] + C --> D{Route Decision} + D --> E[Model A] + D --> F[Model B] + D --> G[Model C] + + E --> H[Response] + F --> H + G --> H + H --> B + B --> A + + style B fill:#FFB6C1 + style C fill:#87CEEB + style H fill:#90EE90 +``` + +## Prerequisites + +- **vLLM Semantic Router** deployed and accessible (recommended: Kubernetes deployment via `kubectl apply -k deploy/kubernetes/`) +- **Open WebUI** installed and running + +## Installation + +### Step 1: Install the Pipe in Open WebUI + +1. Open your Open WebUI instance and go to **Admin Panel** → **Settings** → **Pipelines** +2. Click **"+"** to add a new pipeline +3. Import the pipe from URL: + + ```text + https://raw.githubusercontent.com/vllm-project/semantic-router/main/tools/openwebui-pipe/vllm_semantic_router_pipe.py + ``` + +4. Toggle the pipeline to **"Enabled"** and click **"Save"** + +### Step 2: Configure the Pipe + +Click the **gear icon** next to the pipeline to configure settings: + +| Setting | Description | Example | +|---------|-------------|---------| +| `vsr_base_url` | Semantic Router endpoint URL | `http://semantic-router.vllm-semantic-router-system.svc.cluster.local:8000` | +| `show_vsr_info` | Display routing decisions in chat | `true` | +| `timeout` | Request timeout in seconds | `300` | + +**For Kubernetes deployments**, use the service DNS name: + +```text +http://semantic-router.vllm-semantic-router-system.svc.cluster.local:8000 +``` + +Click **"Save"** to apply the configuration. + +### Step 3: Use the Model + +1. Go to the **Chat** interface +2. Select **"vllm-semantic-router/auto"** from the model dropdown +3. Start chatting! + +## Usage + +The pipe will display routing information in the chat interface: + +```text +šŸ”€ vLLM Semantic Router Decision šŸ”€ + +šŸ“‚ User Intent Category: general-qa +⚔ Chain-of-Thought: off +🄷 Hidden Model: qwen2.5-7b-instruct +šŸŽÆ System Prompt Optimized: true +šŸ”„ Semantic Cache: Hit (Fast Response) +``` + +To hide this information, set `show_vsr_info` to `false` in the pipe configuration. + +## Troubleshooting + +### Connection Issues + +If you see connection errors: + +1. Verify the Semantic Router is running +2. Check the `vsr_base_url` is correct +3. For Kubernetes, ensure the service DNS name is accessible from Open WebUI pod + +### Model Not Appearing + +If the model doesn't appear in the selector: + +1. Verify the pipe is **enabled** in Admin Panel → Pipelines +2. Refresh the Open WebUI page +3. Restart Open WebUI if needed