diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 361acc6..c0405d4 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -9,18 +9,24 @@ "Bash(/bashes)", "Bash(taskkill:*)", "Bash(uv add:*)", - "Bash(.scriptsbuild.ps1)", + "Bash(.\\scripts\\build.ps1)", "Bash(del test_idle_functionality.py)", "Bash(powershell:*)", - "Bash(.distfluid-server.exe --help)", + "Bash(.\\dist\\fluid-server.exe --help)", "Bash(timeout:*)", - "Bash(dir C:Usersbrandsourcereposfluid-server-2tests)", + "Bash(dir C:\\Users\\brand\\source\\repos\\fluid-server-2\\tests)", "Bash(dir:*)", "Bash(uv sync:*)", "Bash(uv pip install:*)", - "Bash(.distfluid-server.exe --whisper-model whisper-large-v3-turbo-qnn --log-level DEBUG --host 127.0.0.1 --port 8080)" + "Bash(.\\dist\\fluid-server.exe --whisper-model whisper-large-v3-turbo-qnn --log-level DEBUG --host 127.0.0.1 --port 8080)", + "Bash(.\\dist\\fluid-server.exe:*)", + "Bash(./dist/fluid-server.exe:*)", + "Bash(del:*)", + "Bash(gh release:*)", + "Bash(uv pip:*)", + "Bash(findstr:*)" ], "deny": [], - "ask": [], + "ask": [] } -} +} \ No newline at end of file diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml deleted file mode 100644 index b421117..0000000 --- a/.github/workflows/main.yml +++ /dev/null @@ -1,155 +0,0 @@ -name: Main Branch CI - -on: - push: - branches: [ main ] - paths: - - 'src/**' - - 'tests/**' - - 'pyproject.toml' - - '.github/workflows/main.yml' - -jobs: - auto-format: - name: Auto-format and Commit - runs-on: ubuntu-latest - permissions: - contents: write - - steps: - - uses: actions/checkout@v5 - with: - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Run ruff format - run: | - uv run ruff format src/ tests/ - uv run ruff check --fix src/ tests/ - - - name: Check for changes - id: verify-diff - run: | - git diff --quiet || echo "changed=true" >> $GITHUB_OUTPUT - - - name: Commit formatting changes - if: steps.verify-diff.outputs.changed == 'true' - run: | - git config --local user.email "github-actions[bot]@users.noreply.github.com" - git config --local user.name "github-actions[bot]" - git add -A - git commit -m "Auto-format code with ruff [skip ci]" - git push - - type-check: - name: Type Check - runs-on: ubuntu-latest - needs: auto-format - - steps: - - uses: actions/checkout@v5 - with: - ref: main - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Run type checking - run: | - if [[ "${{ runner.os }}" == "Windows" ]]; then - pwsh -File scripts/typecheck.ps1 - else - bash scripts/typecheck.sh - fi - shell: bash - - build-and-release: - name: Build Release Artifacts (Windows x64) - runs-on: windows-latest - needs: [auto-format, type-check] - - steps: - - uses: actions/checkout@v5 - with: - ref: main - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Build executable with PyInstaller (Windows x64) - shell: pwsh - run: | - # Use the build.ps1 script which detects and builds for current architecture - # GitHub Actions runners are x64, so this will build for x64 - .\scripts\build.ps1 - - - name: Create release archive (Windows x64) - shell: pwsh - run: | - Compress-Archive -Path dist\* -DestinationPath fluid-server-windows-x64.zip - - - name: Upload build artifact - uses: actions/upload-artifact@v4 - with: - name: fluid-server-windows-x64 - path: fluid-server-windows-x64.zip - retention-days: 30 - - - name: Get version from pyproject.toml - id: get-version - shell: bash - run: | - VERSION=$(grep '^version = ' pyproject.toml | sed 's/version = "\(.*\)"/\1/') - echo "version=$VERSION" >> $GITHUB_OUTPUT - - - name: Create draft release (if tag pushed) - if: startsWith(github.ref, 'refs/tags/v') - uses: softprops/action-gh-release@v2 - with: - draft: true - prerelease: false - files: fluid-server-windows-x64.zip - tag_name: ${{ github.ref_name }} - name: Release ${{ github.ref_name }} - body: | - ## Changes in ${{ github.ref_name }} - - ### Downloads - - Windows x64: `fluid-server-windows-x64.zip` - - ### Installation - Extract the zip archive and run the `fluid-server.exe` executable. - - ### System Requirements - - Windows 10/11 x64 - - .NET Framework may be required for some dependencies - generate_release_notes: true \ No newline at end of file diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml deleted file mode 100644 index c333c32..0000000 --- a/.github/workflows/pr.yml +++ /dev/null @@ -1,117 +0,0 @@ -name: PR Checks - -on: - pull_request: - branches: [ main ] - paths: - - 'src/**' - - 'tests/**' - - 'pyproject.toml' - - '.github/workflows/pr.yml' - -jobs: - lint-and-type-check: - name: Lint and Type Check - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Run ruff format check - run: uv run ruff format --check src/ tests/ - - - name: Run ruff linter - run: uv run ruff check src/ tests/ - - - name: Run type checking - run: | - if [[ "${{ runner.os }}" == "Windows" ]]; then - pwsh -File scripts/typecheck.ps1 - else - bash scripts/typecheck.sh - fi - shell: bash - - test-build: - name: Test Build (Windows x64) - runs-on: windows-latest - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Verify imports - run: uv run python -c "from fluid_server import app; print('Import successful')" - - - name: Run server health check (Windows x64) - run: | - Start-Process -FilePath "uv" -ArgumentList "run", "python", "-m", "fluid_server" -PassThru -WindowStyle Hidden | Out-Null - Start-Sleep -Seconds 5 - Invoke-WebRequest -Uri http://localhost:8080/health -UseBasicParsing - Get-Process python | Stop-Process -Force - shell: pwsh - - build-executable: - name: Build Executable (Windows x64) - runs-on: windows-latest - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Build executable with PyInstaller (Windows x64) - shell: pwsh - run: | - # Use the build.ps1 script which detects and builds for current architecture - # GitHub Actions runners are x64, so this will build for x64 - .\scripts\build.ps1 - - - name: Test executable (Windows x64) - shell: pwsh - run: | - .\dist\fluid-server.exe --help - Start-Process -FilePath ".\dist\fluid-server.exe" -PassThru -WindowStyle Hidden | Out-Null - Start-Sleep -Seconds 5 - Invoke-WebRequest -Uri http://localhost:8080/health -UseBasicParsing - Get-Process fluid-server | Stop-Process -Force - - - name: Upload artifact - uses: actions/upload-artifact@v4 - with: - name: fluid-server-windows-x64 - path: dist/ - retention-days: 7 \ No newline at end of file diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml deleted file mode 100644 index 58a15cd..0000000 --- a/.github/workflows/release.yml +++ /dev/null @@ -1,81 +0,0 @@ -name: Release Builds - -on: - workflow_dispatch: - inputs: - version: - description: 'Version to release (e.g., v0.1.0)' - required: true - type: string - release: - types: [created] - -jobs: - build-matrix: - name: Build Windows x64 - runs-on: windows-latest - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Build executable with PyInstaller (Windows x64) - shell: pwsh - run: | - # Use the build.ps1 script which detects and builds for current architecture - # GitHub Actions runners are x64, so this will build for x64 - .\scripts\build.ps1 - - - name: Create archive (Windows x64) - shell: pwsh - run: | - Compress-Archive -Path dist\* -DestinationPath "fluid-server-windows-x64.zip" - - - name: Upload build artifact - uses: actions/upload-artifact@v4 - with: - name: fluid-server-windows-x64 - path: fluid-server-windows-x64.zip - retention-days: 30 - - - name: Upload to Release - if: github.event_name == 'release' - uses: softprops/action-gh-release@v2 - with: - files: fluid-server-windows-x64.zip - fail_on_unmatched_files: false - - verify-builds: - name: Verify All Builds - runs-on: ubuntu-latest - needs: build-matrix - if: always() - - steps: - - name: Check build results - run: | - echo "Build completed" - echo "Artifacts available:" - echo "- Windows x64" - - - name: Download all artifacts - uses: actions/download-artifact@v5 - with: - path: artifacts/ - - - name: List artifacts - run: | - echo "Downloaded artifacts:" - dir artifacts\ /s - shell: cmd \ No newline at end of file diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml deleted file mode 100644 index e217793..0000000 --- a/.github/workflows/test.yml +++ /dev/null @@ -1,96 +0,0 @@ -name: Test Workflow - -on: - workflow_dispatch: - inputs: - test-type: - description: 'Type of test to run' - required: true - default: 'quick' - type: choice - options: - - quick - - full - - build-only - -jobs: - quick-test: - if: inputs.test-type == 'quick' || inputs.test-type == 'full' - name: Quick Tests - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Run ruff checks - run: | - uv run ruff format --check src/ tests/ - uv run ruff check src/ tests/ - - - name: Run type checking - run: | - if [[ "${{ runner.os }}" == "Windows" ]]; then - pwsh -File scripts/typecheck.ps1 - else - bash scripts/typecheck.sh - fi - shell: bash - - - name: Test server startup - run: | - timeout 10 uv run python -m fluid_server || true - - build-test: - if: inputs.test-type == 'build-only' || inputs.test-type == 'full' - name: Build Test (${{ matrix.os }}) - runs-on: ${{ matrix.os }} - strategy: - matrix: - os: [ubuntu-latest, windows-latest, macos-latest] - - steps: - - uses: actions/checkout@v5 - - - name: Install uv - uses: astral-sh/setup-uv@v6 - with: - enable-cache: true - cache-dependency-glob: "pyproject.toml" - - - name: Set up Python - run: uv python install 3.10 - - - name: Install dependencies - run: uv sync --all-extras --dev - - - name: Build with PyInstaller - shell: ${{ runner.os == 'Windows' && 'pwsh' || 'bash' }} - run: | - if [[ "${{ runner.os }}" == "Windows" ]]; then - # Use the build script on Windows which detects architecture - .\scripts\build.ps1 - else - # Direct PyInstaller call for non-Windows platforms - uv run pyinstaller fluid-server.spec --noconfirm --clean - fi - - - name: Check executable exists - shell: bash - run: | - if [[ "${{ runner.os }}" == "Windows" ]]; then - test -f dist/fluid-server.exe - else - test -f dist/fluid-server - fi \ No newline at end of file diff --git a/.github/workflows/typecheck.yml b/.github/workflows/typecheck.yml new file mode 100644 index 0000000..b293d1b --- /dev/null +++ b/.github/workflows/typecheck.yml @@ -0,0 +1,62 @@ +name: Type Check + +on: + push: + branches: [ main ] + paths: + - 'src/**' + - 'tests/**' + - 'pyproject.toml' + - '.github/workflows/typecheck.yml' + pull_request: + branches: [ main ] + paths: + - 'src/**' + - 'tests/**' + - 'pyproject.toml' + - '.github/workflows/typecheck.yml' + +jobs: + type-check: + name: Type Check + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [windows-latest, windows-11-arm] + include: + - os: windows-latest + arch: x64 + - os: windows-11-arm + arch: arm64 + + steps: + - uses: actions/checkout@v5 + + - name: Install uv + uses: astral-sh/setup-uv@v6 + with: + enable-cache: true + cache-dependency-glob: "pyproject.toml" + + - name: Set up Python + run: uv python install 3.10 + + - name: Cache Python installation + uses: actions/cache@v4 + with: + path: ~/.cache/uv + key: ${{ runner.os }}-${{ matrix.arch }}-python-3.10-${{ hashFiles('pyproject.toml') }} + restore-keys: | + ${{ runner.os }}-${{ matrix.arch }}-python-3.10- + + - name: Install dependencies + run: uv sync --all-extras --dev + + - name: Run type checking + run: | + if [ "${{ matrix.arch }}" = "x64" ]; then + uv run ty check --exclude src/fluid_server/runtimes/qnn_whisper.py + else + uv run ty check + fi + shell: bash \ No newline at end of file diff --git a/.gitignore b/.gitignore index faac6ee..6125179 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,8 @@ venv/ .venv/ .env +data/ + # PyInstaller build/ dist/ diff --git a/CLAUDE.md b/CLAUDE.md index bd5a25c..e31f1f0 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Project Overview -Fluid Server is an OpenAI-compatible API server designed to run AI models with OpenVINO backend on Windows. The server provides REST endpoints for chat completions and audio transcription, with model management capabilities including automatic downloading, caching, and memory-efficient runtime switching. +Fluid Server is an OpenAI-compatible API server designed to run AI models with OpenVINO backend on Windows. The server provides REST endpoints for chat completions, audio transcription, text embeddings, and vector storage with model management capabilities including automatic downloading, caching, and memory-efficient runtime switching. ## Development Commands @@ -15,6 +15,9 @@ uv sync # Install development dependencies uv add --dev ty + +# Fix OpenVINO tokenizers compatibility issues +uv pip install -U openvino openvino_tokenizers ``` ### Running the Server @@ -64,6 +67,13 @@ uv run ruff check --fix . curl http://localhost:8080/health curl http://localhost:8080/v1/models +# Test embeddings endpoint +curl -X POST http://localhost:8080/v1/embeddings -H "Content-Type: application/json" -d '{"input": ["test text"], "model": "sentence-transformers/all-MiniLM-L6-v2"}' + +# Test vector store operations +curl -X GET http://localhost:8080/v1/vector_store/collections +curl -X POST http://localhost:8080/v1/vector_store/insert -H "Content-Type: application/json" -d '{"collection": "test_docs", "documents": [{"text": "sample document", "metadata": {"source": "test"}}]}' + # Kill server if needed .\scripts\kill_server.ps1 ``` @@ -94,30 +104,52 @@ curl http://localhost:8080/v1/models - `v1/chat.py` - OpenAI-compatible chat completions with streaming support - `v1/audio.py` - Audio transcription endpoints - `v1/models.py` - Model listing and management +- `v1/embeddings.py` - Text and multimodal embeddings generation +- `v1/vector_store.py` - LanceDB vector database operations - `health.py` - Health check with OpenVINO status +**Storage and Embeddings (`storage/`, `managers/`)**: +- `LanceDBClient` - Vector database client for multimodal storage +- `EmbeddingManager` - Manages text and multimodal embedding models +- `BaseEmbeddingRuntime` - Abstract base for embedding runtimes +- `OpenVINOEmbeddingRuntime` - Text embeddings using OpenVINO backend +- `WhisperEmbeddingRuntime` - Audio embeddings for semantic search + ### Model Organization Expected directory structure under `model_path`: ``` models/ ├── llm/ -│ ├── qwen3-8b-int8-ov/ # LLM model directories +│ ├── qwen3-8b-int8-ov/ # LLM model directories │ └── phi-4-mini/ ├── whisper/ -│ ├── whisper-tiny/ # Whisper model directories +│ ├── whisper-tiny/ # Whisper model directories │ └── whisper-large-v3/ -└── cache/ # Compiled model cache +├── embeddings/ # Text embedding models +│ └── sentence-transformers/ +└── cache/ # Compiled model cache +``` + +**Data Directory Structure** under `data_root`: +``` +data/ +├── models/ # Model files (see above) +├── cache/ # Runtime cache and compiled models +└── databases/ # LanceDB vector databases + └── embeddings/ # Default embeddings database ``` ### Key Design Patterns -- **Dual Runtime Architecture**: Separate LLM and Whisper models can be loaded simultaneously for optimal performance +- **Multi-Runtime Architecture**: Separate LLM, Whisper, and embedding models can be loaded simultaneously for optimal performance - **Background Model Management**: Automatic downloading, warm-up, and idle cleanup with progress tracking - **Device-Specific Runtime Selection**: QNN backend automatically used on ARM64, OpenVINO/llama.cpp on other architectures - **OpenAI Compatibility**: Request/response formats match OpenAI API for drop-in replacement - **PyInstaller Ready**: Handles frozen executable detection for simplified deployment - **Graceful Degradation**: Falls back to alternative runtimes if primary backend unavailable +- **Vector Database Integration**: LanceDB provides multimodal storage for embeddings, text, and metadata +- **Memory Management**: Configurable idle timeout and memory limits prevent resource exhaustion ### Configuration @@ -125,15 +157,44 @@ Server behavior controlled via `ServerConfig` dataclass: - Model paths and device selection (CPU/GPU/NPU) - Memory limits and idle timeout settings - Generation defaults (max_tokens, temperature, top_p) -- Feature flags (warm_up, idle cleanup) +- Feature flags (warm_up, idle cleanup, embeddings) +- Embedding model configuration and vector database settings +- Default models: `qwen3-8b-int4-ov` (LLM), `whisper-large-v3-turbo-fp16-ov-npu` (Whisper), `sentence-transformers/all-MiniLM-L6-v2` (embeddings) Command-line arguments override configuration defaults. The server validates model availability on startup and provides informative warnings for missing models. ## Important Development Notes - **Python Version**: Project requires exactly Python 3.10 (`==3.10.*`) +- **OpenVINO Compatibility**: OpenVINO and OpenVINO Tokenizers versions must be binary compatible. Update both packages together if encountering DLL loading errors - **Architecture Support**: QNN backend only available on ARM64 with conditional imports to prevent PyInstaller issues -- **Model Management**: Use `RuntimeManager` for all model operations - it handles downloading, loading, and resource management -- **Memory Optimization**: Prefer the dual runtime architecture over single model switching for production use +- **Model Management**: Use `RuntimeManager` and `EmbeddingManager` for all model operations - they handle downloading, loading, and resource management +- **Memory Optimization**: Prefer the multi-runtime architecture over single model switching for production use - **Error Handling**: All runtimes implement graceful loading/unloading with proper resource cleanup -- Check the system architecture before making assumptions. ARM we test QNN, x64 intel we test openvino \ No newline at end of file +- **Testing Architecture**: ARM64 systems test QNN backend, x64 Intel systems test OpenVINO backend +- **Vector Database**: LanceDB integration requires embedding models to be loaded before vector operations +- **Multimodal Support**: Text embeddings via sentence-transformers, image embeddings via CLIP, audio embeddings via Whisper +- **Build System**: PyInstaller creates single-file executable with architecture detection and runtime selection +- **Model Loading Issues**: If models fail to load with "Failed to create llama_context", check GPU memory availability and consider reducing model size or switching to CPU backend + +## Code Style Guidelines + +**Type Hints and Imports**: +- Use absolute imports from `fluid_server` package +- Required type hints for all function signatures +- Use `Path` objects for filesystem paths, `Optional[T]` for nullable types + +**Async and Threading**: +- Use `async/await` for I/O operations +- Run OpenVINO inference in ThreadPoolExecutor for CPU-bound operations +- Handle device selection gracefully (CPU/GPU/NPU) + +**Error Handling and Logging**: +- Use module-level `logger = logging.getLogger(__name__)` +- Log errors with `logger.error()` before raising exceptions +- Use specific exception types, avoid generic `Exception` + +**FastAPI Patterns**: +- Use Pydantic models for request/response validation +- Leverage dependency injection for shared state (managers, clients) +- Implement proper lifespan management for resource cleanup \ No newline at end of file diff --git a/README.md b/README.md index 07a3f7b..5d0d34d 100644 --- a/README.md +++ b/README.md @@ -1,183 +1,106 @@ -# Fluid Server — AI server integrated into your Windows apps - -**THIS PROJECT IS UNDER ACTIVE DEVELOPMENT** +# Fluid Server: Local AI server for your Windows apps [![Discord](https://img.shields.io/badge/Discord-Join%20Chat-7289da.svg)](https://discord.gg/WNsvaCtmDe) [![Models](https://img.shields.io/badge/%F0%9F%A4%97%20Hugging%20Face-Model-blue)](https://huggingface.co/collections/FluidInference) -The goal is to provide a portable, packaged OpenAI‑like server that any Windows desktop application can integrate with, offering optimal model configurations for each chipset. We prioritize AI accelerators where possible; for LLM inference we currently use llama.cpp. - -We plan to provide features including LLM, transcription, text‑to‑speech, speaker diarization, VAD, and more. - -It is designed to bundle into a single binary for easy integration into existing desktop applications. - -**Currently supported NPU runtimes for transcription:** -- **Intel NPU** via OpenVINO backend -- **Qualcomm NPU** via QNN (Snapdragon X Elite) - -The server automatically detects your model format and selects the appropriate runtime for optimal performance. For macOS, see [FluidAudio](https://github.com/FluidInference/FluidAudio). +**THIS PROJECT IS UNDER ACTIVE DEVELOPMENT** Its not ready for production usage but serves as a good reference for hwo to run whisper on Qualcomm and Intel NPUs -We built this due to fragmented support across Windows devices. There is no clear standard for running local inference across chipsets—especially on AI accelerators. +A portable, packaged OpenAI-compatible server for Windows desktop applications. LLM, Transcription, embeddings, and vector DB, all out of the box. -## NPU Support +Note that this does require you to run the .exe as a sepearte async process, like a local serving server in your application, and you will need to make requests to serve inference. -Fluid Server supports multiple NPU runtimes for optimal performance on different hardware: +## Features -### Intel NPU (OpenVINO) -- **Models**: Uses OpenVINO IR format (.xml/.bin files) -- **Location**: `models/whisper/whisper-large-v3-turbo-fp16-ov-npu/` -- **Performance**: Optimized for Intel NPU and integrated graphics +**Core Capabilities** +- **LLM Chat Completions** - OpenAI-compatible API with streaming, backed by llama.cpp and OpenVINO +- **Audio Transcription** - Whisper models with NPU acceleration, backed by OpenVINO and Qualcomm QNN +- **Text Embeddings** - Vector embeddings for search and RAG +- **Vector Database** - LanceDB integration for multimodal storage -### Qualcomm NPU (QNN) -- **Models**: Uses ONNX format with device-specific compilation -- **Location**: `models/whisper/whisper-large-v3-turbo-qnn/snapdragon-x-elite/` -- **Performance**: 16× real‑time transcription on Snapdragon X Elite -- **Hardware**: Snapdragon X Elite devices with HTP (Hexagon Tensor Processor) +**Hardware Acceleration** +- **Intel NPU** via OpenVINO backend +- **Qualcomm NPU** via QNN (Snapdragon X Elite) +- **Vulkan GPU** via llama-cpp ## Quick Start -### Prerequisites - -- Windows 10/11 -- Python 3.10+ with `uv` package manager -- **For Intel NPU:** OpenVINO 2025.2.0+ runtime -- **For Qualcomm NPU:** ONNX Runtime QNN (bundled with dependencies) -- 8GB+ RAM (16GB recommended for 8B models) -- **Recommended:** Snapdragon X Elite device for optimal QNN performance +### 1. Download or Build -### Development - -1. Install dependencies: +**Option A: Download Release** +- Download `fluid-server.exe` from [releases](https://github.com/FluidInference/fluid-server/releases) +**Option B: Run from Source** ```powershell +# Install dependencies and run uv sync +uv run ``` -1. Run the development server: - -```powershell -uv run python src/main.py -``` - -1. Test endpoints: - -- - Welcome page -- - Health check with OpenVINO status -- - Interactive API documentation -- - List models (mock) -- POST - Test OpenVINO operation - -### Build Executable - -Run the build script: - -```powershell -.\build.ps1 -``` - -This creates `dist/fluid-server.exe` (approximately 276 MB with OpenVINO + QNN bundled). - -### Test Executable - -Quick test with the provided script: - -```powershell -.\test_exe.ps1 -``` - -Or run manually: +### 2. Run the Server ```powershell +# Run with default settings .\dist\fluid-server.exe -``` - -Command-line options: -```powershell +# Or with custom options .\dist\fluid-server.exe --host 127.0.0.1 --port 8080 ``` -## Example Usage +### 3. Test the API -### Testing with curl +- **Health Check**: http://localhost:8080/health +- **API Docs**: http://localhost:8080/docs +- **Models**: http://localhost:8080/v1/models -```powershell -# Check server health -curl http://localhost:8080/health - -# Chat completion (non‑streaming) -curl -X POST http://localhost:8080/v1/chat/completions ` - -H "Content-Type: application/json" ` - -d '{"model": "qwen3-8b-int8-ov", "messages": [{"role": "user", "content": "Hello!"}], "max_tokens": 100}' - -# Audio transcription with QNN model -curl -X POST http://localhost:8080/v1/audio/transcriptions ` - -F "file=@audio.wav" ` - -F "model=whisper-large-v3-turbo-qnn" ` - -F "response_format=json" +## Usage Examples + +### Basic Chat Completion + +```bash +curl -X POST http://localhost:8080/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{"model": "qwen3-8b-int8-ov", "messages": [{"role": "user", "content": "Hello!"}]}' ``` -### Integration with OpenAI SDK +### Python Integration ```python from openai import OpenAI -# Point to local server -client = OpenAI( - base_url="http://localhost:8080/v1", - api_key="local" # Can be anything for local server -) +client = OpenAI(base_url="http://localhost:8080/v1", api_key="local") -# Chat completion -response = client.chat.completions.create( +# Chat with streaming +for chunk in client.chat.completions.create( model="qwen3-8b-int8-ov", messages=[{"role": "user", "content": "Hello!"}], - stream=True # Streaming supported -) - -for chunk in response: + stream=True +): print(chunk.choices[0].delta.content or "", end="") - -# Audio transcription with QNN -with open("audio.wav", "rb") as audio_file: - transcript = client.audio.transcriptions.create( - model="whisper-large-v3-turbo-qnn", - file=audio_file - ) - print(transcript.text) -``` - -### Integration with .NET Application - -```csharp -// Use with OpenAI SDK for .NET -var client = new OpenAIClient( - new Uri("http://localhost:8080/v1"), - new ApiKeyCredential("local") -); - -var response = await client.GetChatCompletionsAsync( - "qwen3-8b-int8-ov", - new ChatCompletionsOptions { - Messages = { new ChatRequestUserMessage("Hello!") }, - MaxTokens = 100 - } -); ``` -### FAQ +### Audio Transcription -### Why Python? +```bash +curl -X POST http://localhost:8080/v1/audio/transcriptions \ + -F "file=@audio.wav" \ + -F "model=whisper-large-v3-turbo-qnn" +``` -Good question. It is the easiest to support. Most ML work is done in Python, so it is the best supported across the runtimes we target. PyInstaller lets us bundle everything into a single .exe, which is very helpful. +## Documentation -C++ and Rust are options we have considered, but they require more investment, and the team is not yet familiar enough with Rust to make that jump. We may build a C++ server later, but we want to avoid heavy lifting on the inference side where possible. +📖 **Comprehensive Guides** +- [NPU Support Guide](docs/npu-support.md) - Intel & Qualcomm NPU configuration +- [Integration Guide](docs/integration-guide.md) - Python, .NET, Node.js examples +- [Development Guide](docs/development.md) - Setup, building, and contributing +- [LanceDB Integration](docs/lancedb.md) - Vector database and embeddings +- [GGUF Model Support](docs/GGUF-model-support.md) - Using any GGUF model +- [Compilation Guide](docs/compilation-guide.md) - Build system details -Tools like `uv`, `ty`, `FastAPI`, and `Pydantic` have also made Python much more manageable. +## FAQ -### Why not just llama.cpp or whisper.cpp? +**Why Python?** Best ML ecosystem support and PyInstaller packaging. -Those are solid options, but the goal is to support other runtimes and model formats beyond GGML. We want to leverage AI accelerators available on various devices, and this is the simplest way to achieve that. +**Why not llama.cpp?** We support multiple runtimes and AI accelerators beyond GGML. ## Acknowledgements @@ -187,5 +110,3 @@ Built using `ty`, `FastAPI`, `Pydantic`, `ONNX Runtime`, `OpenAI Whisper`, and v - `OpenVINO` - Intel NPU and GPU acceleration - `Qualcomm QNN` - Snapdragon NPU optimization with HTP backend - `ONNX Runtime` - Cross-platform AI inference - -[**SearchSavior/OpenArc**](https://github.com/SearchSavior/OpenArc) - for the idea! diff --git a/docs/README.md b/docs/README.md index 1cf1137..2dc07c0 100644 --- a/docs/README.md +++ b/docs/README.md @@ -4,21 +4,14 @@ This directory contains comprehensive documentation for the Fluid Server project ## Documents -### 📋 [Compilation Guide](./compilation-guide.md) +### [Compilation Guide](./compilation-guide.md) Complete guide for building PyInstaller executables, including: - Critical import fixes for PyInstaller compatibility - Build performance optimization - Common compilation issues and solutions - Build environment setup requirements -### 🚀 [Streaming Improvements](./streaming-improvements.md) -Documentation of streaming fixes that resolve client connection issues: -- SSE keepalive heartbeat implementation -- Progressive token streaming with async queues -- Sentence-based buffering and timeout handling -- Performance impact analysis and testing results - -### 🤖 [GGUF Model Support](./GGUF-model-support.md) +### [GGUF Model Support](./GGUF-model-support.md) Complete guide for using any GGUF model from HuggingFace Hub: - Flexible model format support (repo, repo/file, legacy names) - Popular model recommendations and quantization guidance @@ -44,30 +37,6 @@ curl -X POST http://localhost:3847/v1/chat/completions \ -d '{"model": "current", "messages": [{"role": "user", "content": "Hello"}], "stream": true}' ``` -## Key Improvements Made - -### ✅ Fixed Client Connection Drops -- Added SSE keepalive heartbeat every 20 seconds -- Implemented progressive token streaming -- Added sentence-based buffering with smart flushing -- Improved error handling and connection monitoring - -### ✅ Resolved PyInstaller Import Issues -- Implemented try/except import pattern for development vs executable modes -- Documented all critical compilation considerations -- Provided multiple build methods and troubleshooting guides - -### ✅ Enhanced Model Support -- Support for any GGUF model from HuggingFace without predefined mappings -- Flexible model identifier formats (repo, repo/file, legacy) -- Automatic download and caching with resume capability -- Hardware-optimized configurations (GPU/CPU, memory management) - -### ✅ Improved Default Configuration -- Changed default port from 8080 to 3847 to avoid conflicts -- Optimized streaming parameters for better performance -- Enhanced logging and monitoring capabilities - ## Architecture Overview ``` @@ -88,47 +57,3 @@ Fluid Server ├── Model downloading └── Platform detection ``` - -## Deployment Ready Features - -The current implementation includes all production-ready features: - -- ✅ **Standalone Executable**: Complete PyInstaller build (309.8 MB) -- ✅ **OpenAI API Compatibility**: Drop-in replacement for OpenAI API -- ✅ **Streaming Support**: Fixed progressive streaming with keepalive -- ✅ **GGUF Model Support**: Use any quantized model from HuggingFace -- ✅ **GPU Acceleration**: Vulkan backend for LlamaCpp, OpenVINO optimization -- ✅ **Memory Management**: Single model in memory with idle timeout -- ✅ **Error Handling**: Graceful connection handling and recovery -- ✅ **Configuration**: Flexible command-line and environment configuration - -## Support and Troubleshooting - -### Common Issues -1. **Import Errors**: See [Compilation Guide](./compilation-guide.md#critical-import-issue-and-solution) -2. **Connection Drops**: See [Streaming Improvements](./streaming-improvements.md#solutions-implemented) -3. **Model Loading**: See [GGUF Model Support](./GGUF-model-support.md#error-handling) - -### Getting Help -- Review the relevant documentation file for your issue -- Check the build logs and warnings -- Verify system requirements and dependencies -- Test in development mode before building executable - -## Contributing - -When making changes to the Fluid Server: - -1. **Update Documentation**: Keep these docs current with any changes -2. **Test Executable**: Always test PyInstaller builds after changes -3. **Verify Streaming**: Test streaming functionality with real clients -4. **Document Issues**: Add new compilation or runtime issues to relevant docs - -## Version History - -### Current Version -- **Streaming Fixes**: SSE keepalive, progressive tokens, sentence buffering -- **GGUF Support**: Any HuggingFace GGUF model without mappings -- **PyInstaller Compatibility**: Fixed import issues for executable builds -- **Port Change**: Default port 3847 instead of 8080 -- **Production Ready**: Complete standalone executable with all dependencies \ No newline at end of file diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 0000000..b371cf3 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,329 @@ +# Development Guide + +This guide covers everything you need to know for developing with Fluid Server, from initial setup to building and testing. + +## Prerequisites + +### System Requirements +- **OS**: Windows 10/11 +- **Python**: 3.10+ with `uv` package manager +- **Memory**: 8GB+ RAM (16GB recommended for 8B models) +- **Storage**: 10GB+ free space for models + +### Hardware-Specific Requirements + +#### Intel NPU Support +- **Runtime**: OpenVINO 2025.2.0+ runtime +- **Hardware**: Intel Arc graphics or Intel NPU + +#### Qualcomm NPU Support +- **Runtime**: ONNX Runtime QNN (bundled with dependencies) +- **Hardware**: Snapdragon X Elite device with HTP (Hexagon Tensor Processor) + +### Installing Prerequisites + +#### Install uv Package Manager +```powershell +# Install uv (if not already installed) +curl -LsSf https://astral.sh/uv/install.ps1 | powershell +``` + +#### Install OpenVINO (for Intel Devices) +```powershell +# Download and install OpenVINO runtime from Intel's website +# https://docs.openvino.ai/2025/get-started/install-openvino.html +``` + +## Initial Setup + +### 1. Clone the Repository +```bash +git clone https://github.com/FluidInference/fluid-server.git +cd fluid-server +``` + +### 2. Install Dependencies +```powershell +# Install all dependencies including development tools +uv sync + +# Install development dependencies separately (if needed) +uv add --dev ty +``` + +### 3. Verify Installation +```powershell +# Check that dependencies are installed correctly +uv run python -c "import fluid_server; print('Setup successful')" +``` + +## Development Workflow + +### Running the Development Server + +#### Basic Development Mode +```powershell +# Run with auto-reload for development +uv run python -m fluid_server --reload +``` + +#### Development with Custom Options +```powershell +# Run with custom model path and debug logging +uv run python -m fluid_server --model-path ./models --log-level DEBUG --reload +``` + +#### Using Convenience Scripts +```powershell +# Use the convenience script +.\scripts\start_server.ps1 +``` + +### Development Server Options +- `--reload` - Auto-reload on code changes (development only) +- `--model-path` - Custom path to model directory +- `--log-level` - Set logging level (DEBUG, INFO, WARNING, ERROR) +- `--host` - Server host (default: 127.0.0.1) +- `--port` - Server port (default: 8080) + +## Code Quality and Testing + +### Type Checking +```powershell +# Run type checking with ty +.\scripts\typecheck.ps1 + +# Or run directly +uv run ty +``` + +### Code Formatting and Linting +```powershell +# Format code with ruff +uv run ruff format . + +# Check for linting issues +uv run ruff check . + +# Fix auto-fixable linting issues +uv run ruff check --fix . +``` + +### Testing the Server + +#### Development Testing +```powershell +# Test health endpoint +curl http://localhost:8080/health + +# Test models endpoint +curl http://localhost:8080/v1/models + +# Test basic chat completion +curl -X POST http://localhost:8080/v1/chat/completions ` + -H "Content-Type: application/json" ` + -d '{\"model\": \"current\", \"messages\": [{\"role\": \"user\", \"content\": \"Hello\"}]}' +``` + +#### Automated Testing Scripts +```powershell +# Test with actual models (requires model downloads) +.\scripts\test_with_models.ps1 + +# Kill server if needed +.\scripts\kill_server.ps1 +``` + +## Building and Distribution + +### Building the Executable + +#### Standard Build +```powershell +# Build standalone .exe with PyInstaller +.\scripts\build.ps1 +``` + +The build script creates `dist/fluid-server.exe` (approximately 276 MB with OpenVINO + QNN bundled). + +#### Build Configuration +The build process: +1. Installs dependencies with `uv sync` +2. Runs type checking with `ty` +3. Creates executable with PyInstaller +4. Includes all necessary runtime libraries + +### Testing the Built Executable + +#### Quick Test +```powershell +# Test the built executable +.\scripts\test_exe.ps1 +``` + +#### Manual Testing +```powershell +# Run the executable directly +.\dist\fluid-server.exe + +# Run with custom options +.\dist\fluid-server.exe --host 127.0.0.1 --port 8080 --log-level DEBUG +``` + +## Project Structure + +### Source Code Organization +``` +src/fluid_server/ +├── __main__.py # CLI entry point +├── app.py # FastAPI application factory +├── config.py # Server configuration +├── api/ # API endpoints +│ ├── v1/ +│ │ ├── chat.py # Chat completions +│ │ ├── audio.py # Audio transcription +│ │ ├── models.py # Model management +│ │ └── embeddings.py # Text embeddings +│ └── health.py # Health checks +├── managers/ # Core business logic +│ ├── runtime_manager.py # Model loading/unloading +│ └── embedding_manager.py # Embedding generation +├── runtimes/ # Model runtime implementations +│ ├── base.py # Abstract base runtime +│ ├── openvino_llm.py # OpenVINO LLM runtime +│ ├── openvino_whisper.py # OpenVINO Whisper runtime +│ ├── llamacpp.py # Llama.cpp runtime +│ └── qnn_whisper.py # QNN Whisper runtime +├── storage/ # Data persistence +│ └── lancedb_client.py # LanceDB vector storage +└── utils/ # Utilities + ├── model_utils.py # Model discovery/downloading + └── platform_utils.py # Platform detection +``` + +### Model Directory Structure +``` +models/ +├── llm/ # Language models +│ ├── qwen3-8b-int8-ov/ # OpenVINO LLM models +│ └── phi-4-mini/ # Additional LLM models +├── whisper/ # Audio transcription models +│ ├── whisper-large-v3-turbo-ov-npu/ # OpenVINO Whisper +│ ├── whisper-large-v3-turbo-qnn/ # QNN Whisper +│ └── whisper-tiny/ # Smaller models +├── embeddings/ # Text embedding models +│ └── sentence-transformers_all-MiniLM-L6-v2/ +└── cache/ # Compiled model cache +``` + +## Development Configuration + +### Environment Variables +```powershell +# Set development environment variables +$env:PYTHONPATH = "src" +$env:FLUID_LOG_LEVEL = "DEBUG" +$env:FLUID_MODEL_PATH = "./models" +``` + +### IDE Configuration + +#### VS Code Settings +Create `.vscode/settings.json`: +```json +{ + "python.defaultInterpreterPath": ".venv/Scripts/python.exe", + "python.linting.enabled": true, + "python.linting.pylintEnabled": false, + "python.linting.flake8Enabled": false, + "python.formatting.provider": "black", + "python.formatting.blackPath": ".venv/Scripts/black.exe" +} +``` + +## Debugging and Troubleshooting + +### Common Development Issues + +#### Module Import Errors +```powershell +# Ensure PYTHONPATH includes src directory +$env:PYTHONPATH = "src" +uv run python -m fluid_server +``` + +#### Model Loading Issues +```powershell +# Run with debug logging to see model loading details +uv run python -m fluid_server --log-level DEBUG +``` + +#### Port Already in Use +```powershell +# Kill any existing server processes +.\scripts\kill_server.ps1 + +# Or use a different port +uv run python -m fluid_server --port 8081 +``` + +### Debug Logging +```python +import logging + +# Enable debug logging for specific components +logging.getLogger("fluid_server.managers.runtime_manager").setLevel(logging.DEBUG) +logging.getLogger("fluid_server.runtimes").setLevel(logging.DEBUG) +``` + +### Performance Profiling +```powershell +# Run with performance profiling +uv run python -m cProfile -o profile_output.pstats -m fluid_server + +# Analyze profile results +uv run python -c "import pstats; pstats.Stats('profile_output.pstats').sort_stats('cumulative').print_stats(20)" +``` + +## Contributing Guidelines + +### Code Style +- Follow PEP 8 style guidelines +- Use type hints for all function parameters and return values +- Format code with `ruff format` +- Ensure all linting checks pass with `ruff check` + +### Commit Guidelines +- Use conventional commit messages +- Include relevant tests for new features +- Ensure all existing tests pass +- Update documentation for API changes + +### Pull Request Process +1. Fork the repository +2. Create a feature branch from `main` +3. Make your changes with proper testing +4. Ensure all checks pass (linting, type checking, tests) +5. Submit a pull request with a clear description + +## Advanced Development Topics + +### Adding New Model Runtimes +1. Implement the `BaseRuntime` abstract class +2. Add the runtime to the `RuntimeManager` +3. Update configuration options +4. Add appropriate tests + +### Extending API Endpoints +1. Create new endpoint modules in `api/v1/` +2. Register routes in the main application +3. Add request/response models using Pydantic +4. Include comprehensive error handling + +### Performance Optimization +- Use async/await for I/O operations +- Implement connection pooling for external services +- Cache frequently accessed data +- Monitor memory usage with large models + +This development guide provides the foundation for contributing to and extending Fluid Server. For specific implementation details, refer to the existing codebase and follow the established patterns. \ No newline at end of file diff --git a/docs/integration-guide.md b/docs/integration-guide.md new file mode 100644 index 0000000..bed2138 --- /dev/null +++ b/docs/integration-guide.md @@ -0,0 +1,361 @@ +# Integration Guide + +This guide provides comprehensive examples for integrating Fluid Server with various programming languages and frameworks. Fluid Server provides OpenAI-compatible APIs, making it a drop-in replacement for OpenAI services in your applications. + +## API Endpoints Overview + +- **Base URL**: `http://localhost:8080/v1` +- **Health Check**: `http://localhost:8080/health` +- **API Documentation**: `http://localhost:8080/docs` + +### Core Endpoints +- `POST /v1/chat/completions` - Chat completions with streaming support +- `POST /v1/audio/transcriptions` - Audio transcription +- `POST /v1/embeddings` - Text embeddings generation +- `GET /v1/models` - List available models + +## Command Line Testing (curl) + +### Health Check +```bash +curl http://localhost:8080/health +``` + +### Chat Completion (Non-streaming) +```powershell +curl -X POST http://localhost:8080/v1/chat/completions ` + -H "Content-Type: application/json" ` + -d '{ + "model": "qwen3-8b-int8-ov", + "messages": [{"role": "user", "content": "Hello!"}], + "max_tokens": 100 + }' +``` + +### Chat Completion (Streaming) +```powershell +curl -X POST http://localhost:8080/v1/chat/completions ` + -H "Content-Type: application/json" ` + -d '{ + "model": "qwen3-8b-int8-ov", + "messages": [{"role": "user", "content": "Tell me a story"}], + "stream": true, + "max_tokens": 200 + }' +``` + +### Audio Transcription +```powershell +# Using QNN model (Snapdragon) +curl -X POST http://localhost:8080/v1/audio/transcriptions ` + -F "file=@audio.wav" ` + -F "model=whisper-large-v3-turbo-qnn" ` + -F "response_format=json" + +# Using OpenVINO model (Intel) +curl -X POST http://localhost:8080/v1/audio/transcriptions ` + -F "file=@audio.wav" ` + -F "model=whisper-large-v3-turbo-ov-npu" ` + -F "response_format=verbose_json" +``` + +### Text Embeddings +```powershell +curl -X POST http://localhost:8080/v1/embeddings ` + -H "Content-Type: application/json" ` + -d '{ + "input": ["Hello world", "Vector database"], + "model": "sentence-transformers/all-MiniLM-L6-v2" + }' +``` + +### List Models +```bash +curl http://localhost:8080/v1/models +``` + +## Python Integration + +### Using OpenAI SDK + +Install the OpenAI Python SDK: +```bash +pip install openai +``` + +#### Basic Setup +```python +from openai import OpenAI + +# Point to local Fluid Server +client = OpenAI( + base_url="http://localhost:8080/v1", + api_key="local" # Can be anything for local server +) +``` + +#### Chat Completions +```python +# Non-streaming completion +response = client.chat.completions.create( + model="qwen3-8b-int8-ov", + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "Explain quantum computing in simple terms."} + ], + max_tokens=150, + temperature=0.7 +) + +print(response.choices[0].message.content) +``` + +#### Streaming Chat Completions +```python +# Streaming completion +response = client.chat.completions.create( + model="qwen3-8b-int8-ov", + messages=[{"role": "user", "content": "Write a short poem about AI"}], + stream=True, + max_tokens=100 +) + +print("AI Response:") +for chunk in response: + if chunk.choices[0].delta.content: + print(chunk.choices[0].delta.content, end="", flush=True) +print() # New line at end +``` + +#### Audio Transcription +```python +# Transcribe audio file +with open("audio.wav", "rb") as audio_file: + transcript = client.audio.transcriptions.create( + model="whisper-large-v3-turbo-qnn", # or whisper-large-v3-turbo-ov-npu + file=audio_file, + response_format="verbose_json" + ) + +print(f"Transcribed text: {transcript.text}") +print(f"Language: {transcript.language}") +print(f"Duration: {transcript.duration}s") +``` + +#### Text Embeddings +```python +# Generate embeddings +embeddings = client.embeddings.create( + model="sentence-transformers/all-MiniLM-L6-v2", + input=["Text to embed", "Another piece of text", "Vector search query"] +) + +for i, embedding in enumerate(embeddings.data): + print(f"Text {i+1} embedding dimensions: {len(embedding.embedding)}") + print(f"First 5 values: {embedding.embedding[:5]}") +``` + +### Error Handling +```python +from openai import OpenAI, APIError, APIConnectionError + +client = OpenAI(base_url="http://localhost:8080/v1", api_key="local") + +try: + response = client.chat.completions.create( + model="qwen3-8b-int8-ov", + messages=[{"role": "user", "content": "Hello!"}] + ) + print(response.choices[0].message.content) + +except APIConnectionError: + print("Failed to connect to Fluid Server. Is it running?") +except APIError as e: + print(f"API Error: {e}") +except Exception as e: + print(f"Unexpected error: {e}") +``` + +## .NET Integration + +### Using OpenAI SDK for .NET + +Install the Azure OpenAI SDK for .NET: +```xml + +``` + +#### Basic Setup +```csharp +using Azure.AI.OpenAI; +using Azure; + +var client = new OpenAIClient( + new Uri("http://localhost:8080/v1"), + new AzureKeyCredential("local") // Can be anything for local server +); +``` + +#### Chat Completions +```csharp +var chatOptions = new ChatCompletionsOptions() +{ + DeploymentName = "qwen3-8b-int8-ov", + Messages = { + new ChatRequestSystemMessage("You are a helpful assistant."), + new ChatRequestUserMessage("Explain machine learning briefly.") + }, + MaxTokens = 150, + Temperature = 0.7f +}; + +var response = await client.GetChatCompletionsAsync(chatOptions); +Console.WriteLine(response.Value.Choices[0].Message.Content); +``` + +#### Streaming Chat Completions +```csharp +var chatOptions = new ChatCompletionsOptions() +{ + DeploymentName = "qwen3-8b-int8-ov", + Messages = { new ChatRequestUserMessage("Write a haiku about programming") }, + MaxTokens = 100 +}; + +await foreach (var choice in client.GetChatCompletionsStreaming(chatOptions)) +{ + if (choice.ContentUpdate != null) + { + Console.Write(choice.ContentUpdate); + } +} +Console.WriteLine(); +``` + +#### Audio Transcription +```csharp +using var audioStream = File.OpenRead("audio.wav"); + +var transcriptionOptions = new AudioTranscriptionOptions() +{ + DeploymentName = "whisper-large-v3-turbo-qnn", + AudioData = BinaryData.FromStream(audioStream), + ResponseFormat = AudioTranscriptionFormat.VerboseJson +}; + +var transcription = await client.GetAudioTranscriptionAsync(transcriptionOptions); +Console.WriteLine($"Transcribed: {transcription.Value.Text}"); +Console.WriteLine($"Language: {transcription.Value.Language}"); +``` + +## Node.js Integration + +### Using OpenAI SDK for Node.js + +Install the OpenAI Node.js SDK: +```bash +npm install openai +``` + +#### Basic Setup +```javascript +import OpenAI from 'openai'; + +const openai = new OpenAI({ + baseURL: 'http://localhost:8080/v1', + apiKey: 'local', // Can be anything for local server +}); +``` + +#### Chat Completions +```javascript +async function chatCompletion() { + try { + const completion = await openai.chat.completions.create({ + model: 'qwen3-8b-int8-ov', + messages: [ + { role: 'system', content: 'You are a helpful assistant.' }, + { role: 'user', content: 'Explain async/await in JavaScript' } + ], + max_tokens: 200, + temperature: 0.7 + }); + + console.log(completion.choices[0].message.content); + } catch (error) { + console.error('Chat completion error:', error); + } +} +``` + +#### Streaming Chat Completions +```javascript +async function streamingChat() { + try { + const stream = await openai.chat.completions.create({ + model: 'qwen3-8b-int8-ov', + messages: [{ role: 'user', content: 'Tell me about Node.js' }], + stream: true, + max_tokens: 150 + }); + + for await (const chunk of stream) { + const content = chunk.choices[0]?.delta?.content; + if (content) { + process.stdout.write(content); + } + } + console.log(); // New line + } catch (error) { + console.error('Streaming error:', error); + } +} +``` + +#### Audio Transcription +```javascript +import fs from 'fs'; + +async function transcribeAudio() { + try { + const transcription = await openai.audio.transcriptions.create({ + file: fs.createReadStream('audio.wav'), + model: 'whisper-large-v3-turbo-qnn', + response_format: 'verbose_json' + }); + + console.log('Transcription:', transcription.text); + console.log('Language:', transcription.language); + console.log('Duration:', transcription.duration); + } catch (error) { + console.error('Transcription error:', error); + } +} +``` + +## Best Practices + +### Connection Management +- Use connection pooling for high-throughput applications +- Implement proper retry logic with exponential backoff +- Monitor connection health and implement graceful degradation + +### Performance Optimization +- Use streaming for long responses to improve perceived performance +- Batch multiple requests when possible +- Consider model warm-up time for the first request + +### Error Handling +- Implement comprehensive error handling for network issues +- Handle model loading delays during server startup +- Provide fallback mechanisms for service unavailability + +### Security Considerations +- Run Fluid Server on localhost for development +- Use proper network security for production deployments +- Validate all inputs before sending to the API + +### Model Selection +- Choose appropriate models based on your hardware capabilities +- Consider the trade-off between model size and performance +- Test different models to find the best fit for your use case \ No newline at end of file diff --git a/docs/lancedb.md b/docs/lancedb.md new file mode 100644 index 0000000..ad75885 --- /dev/null +++ b/docs/lancedb.md @@ -0,0 +1,228 @@ +# LanceDB Integration Guide for Fluid Server + +## Overview + +Fluid Server integrates **LanceDB** as its primary vector database solution for storing and retrieving high-dimensional embeddings. LanceDB provides a modern, embedded vector database specifically designed for AI applications with native multimodal support and superior performance characteristics. + +## Why LanceDB Over Chroma? + +### 1. **Native .NET Client Support** +LanceDB offers comprehensive client libraries including full .NET support, making it ideal for Windows desktop applications that need to integrate with C# and .NET frameworks. Chroma only provides a client-side solution for .NET environments. + +### 2. **Native Multimodal Embeddings** +LanceDB supports multimodal embeddings (text, image, audio) natively without requiring additional configuration or separate collections. This allows unified storage and cross-modal search capabilities. + +### 3. **Superior Performance** +- **Embedded Architecture**: LanceDB runs as an embedded solution with lower latency and no network overhead +- **Columnar Storage**: Uses Apache Arrow and Lance format for efficient storage and retrieval +- **Optimized Indexing**: Advanced indexing algorithms specifically designed for high-dimensional vectors + +### 4. **Simplified Deployment** +As an embedded solution, LanceDB eliminates the need for separate database server infrastructure, making deployment and management significantly simpler. + +## Architecture Overview + +### Core Components + +``` +Fluid Server Architecture +├── API Layer (FastAPI) +│ ├── /v1/embeddings # OpenAI-compatible embeddings +│ ├── /v1/embeddings/multimodal # Multimodal embedding support +│ └── /v1/vector_store/* # Vector storage operations +├── Embedding Manager +│ ├── Text Embeddings (OpenVINO) +│ ├── Image Embeddings (CLIP-based) +│ └── Audio Embeddings (Whisper-based) +└── LanceDB Storage Layer + ├── Collections (Tables) + ├── Vector Search Engine + └── Document Storage +``` + +### Model Directory Structure + +``` +models/ +├── embeddings/ +│ ├── sentence-transformers_all-MiniLM-L6-v2/ # Text models +│ ├── openai_clip-vit-base-patch32/ # Multimodal models +│ └── openai_whisper-base/ # Audio models +└── cache/ # Compiled model cache +``` + +## Installation and Configuration + +### Dependencies + +LanceDB is automatically installed with Fluid Server: + +```toml +# pyproject.toml +dependencies = [ + "lancedb>=0.14.0", + "sentence-transformers>=2.2.0", + "pillow>=10.0.0", +] +``` + +### Configuration + +Enable embeddings in your server configuration: + +```python +# Server startup +config = ServerConfig( + enable_embeddings=True, + embedding_model="sentence-transformers/all-MiniLM-L6-v2", + multimodal_model="openai/clip-vit-base-patch32", + embedding_device="CPU", # or "GPU" + embeddings_db_path=Path("./data/embeddings"), + embeddings_db_name="vectors" +) +``` + +## API Usage Examples + +### 1. Text Embeddings + +#### Generate Text Embeddings +```bash +curl -X POST "http://localhost:8080/v1/embeddings" \ + -H "Content-Type: application/json" \ + -d '{ + "input": ["Hello world", "Machine learning with Python"], + "model": "sentence-transformers/all-MiniLM-L6-v2" + }' +``` + +#### Store Documents with Automatic Embedding +```bash +curl -X POST "http://localhost:8080/v1/vector_store/insert" \ + -H "Content-Type: application/json" \ + -d '{ + "collection": "documents", + "documents": [ + { + "content": "LanceDB provides efficient vector storage", + "metadata": {"source": "documentation", "category": "database"} + }, + { + "content": "Fluid Server enables AI model deployment on Windows", + "metadata": {"source": "readme", "category": "deployment"} + } + ], + "model": "sentence-transformers/all-MiniLM-L6-v2" + }' +``` + +### 2. Multimodal Embeddings + +#### Image Embeddings +```bash +curl -X POST "http://localhost:8080/v1/embeddings/multimodal" \ + -F "input_type=image" \ + -F "model=openai/clip-vit-base-patch32" \ + -F "file=@image.jpg" +``` + +### 3. Vector Search + +#### Text-based Search +```bash +curl -X POST "http://localhost:8080/v1/vector_store/search" \ + -H "Content-Type: application/json" \ + -d '{ + "collection": "documents", + "query": "vector database performance", + "query_type": "text", + "limit": 5, + "model": "sentence-transformers/all-MiniLM-L6-v2" + }' +``` + +#### Cross-modal Search (Image to Text) +```bash +curl -X POST "http://localhost:8080/v1/vector_store/search/multimodal" \ + -F "collection=documents" \ + -F "query_type=image" \ + -F "limit=10" \ + -F "model=openai/clip-vit-base-patch32" \ + -F "file_query=@query_image.jpg" +``` + +## Collection Management + +### Create Collections +```bash +curl -X POST "http://localhost:8080/v1/vector_store/collections" \ + -H "Content-Type: application/json" \ + -d '{ + "name": "my_collection", + "dimension": 384, + "content_type": "text", + "overwrite": false + }' +``` + +### List Collections +```bash +curl -X GET "http://localhost:8080/v1/vector_store/collections" +``` + +### Get Collection Statistics +```bash +curl -X GET "http://localhost:8080/v1/vector_store/my_collection/stats" +``` + +## Programmatic Usage (Python) + +## Advanced Features + +### 1. Filtering + +LanceDB supports SQL-like filtering expressions: + +```python +# Filter by metadata +results = await lancedb_client.search_vectors( + collection_name="documents", + query_vector=query_vector, + limit=10, + filter_condition="metadata->>'category' = 'technical'" +) +``` + +### 2. Batch Operations + +```python +# Batch insert +documents = [VectorDocument(...) for _ in range(1000)] +await lancedb_client.insert_documents("large_collection", documents) + +# Batch embedding generation +texts = ["text " + str(i) for i in range(100)] +embeddings = await embedding_manager.get_text_embeddings(texts) +``` + +### 3. Memory Management + +The server automatically manages embedding model memory: + +```python +# Models are automatically loaded/unloaded based on usage +config = ServerConfig( + idle_timeout_minutes=30, # Unload models after 30 minutes of inactivity + max_memory_gb=8.0 # Maximum memory usage +) +``` + +### Debug Commands + +```bash +# Check available models +curl -X GET "http://localhost:8080/v1/embeddings/models" + +# Verify collection status +curl -X GET "http://localhost:8080/v1/vector_store/collections" +``` \ No newline at end of file diff --git a/docs/npu-support.md b/docs/npu-support.md new file mode 100644 index 0000000..a992169 --- /dev/null +++ b/docs/npu-support.md @@ -0,0 +1,142 @@ +# NPU Support Guide + +Fluid Server supports multiple NPU (Neural Processing Unit) runtimes for optimal performance on different hardware architectures. This guide covers the supported NPU backends and their specific configurations. + +## Supported NPU Runtimes + +### Intel NPU (OpenVINO) + +Intel NPU support is provided through the OpenVINO runtime, optimized for Intel NPU and integrated graphics. + +#### Model Format +- **Format**: OpenVINO IR format (.xml/.bin files) +- **Location**: `models/whisper/whisper-large-v3-turbo-fp16-ov-npu/` +- **Optimization**: Optimized for Intel NPU and integrated graphics + +#### Performance Characteristics +- Excellent performance on Intel Arc graphics and NPU +- Low power consumption +- Optimized for Intel's AI acceleration hardware + +#### Model Directory Structure +``` +models/whisper/whisper-large-v3-turbo-fp16-ov-npu/ +├── openvino_model.xml +├── openvino_model.bin +└── config.json +``` + +### Qualcomm NPU (QNN) + +Qualcomm NPU support uses the Qualcomm Neural Network (QNN) SDK with device-specific compilation for Snapdragon processors. + +#### Model Format +- **Format**: ONNX format with device-specific compilation +- **Location**: `models/whisper/whisper-large-v3-turbo-qnn/snapdragon-x-elite/` +- **Performance**: 16× real-time transcription on Snapdragon X Elite +- **Hardware**: Snapdragon X Elite devices with HTP (Hexagon Tensor Processor) + +#### Performance Characteristics +- Exceptional performance on Snapdragon X Elite devices +- Leverages Hexagon Tensor Processor (HTP) for AI acceleration +- 16× real-time transcription performance +- Optimized for ARM64 architecture + +#### Model Directory Structure +``` +models/whisper/whisper-large-v3-turbo-qnn/snapdragon-x-elite/ +├── whisper_encoder.onnx +├── whisper_decoder.onnx +└── config.json +``` + +## Runtime Selection + +The server automatically detects your hardware and selects the appropriate NPU runtime: + +### Automatic Detection +- **ARM64 Architecture**: QNN backend is automatically preferred +- **Intel x64 Architecture**: OpenVINO backend is automatically preferred +- **Fallback**: CPU-based inference if NPU is unavailable + +### Manual Runtime Selection +You can explicitly specify the runtime through command-line arguments: + +```powershell +# Force OpenVINO runtime +.\dist\fluid-server.exe --whisper-model whisper-large-v3-turbo-ov-npu + +# Force QNN runtime (ARM64 only) +.\dist\fluid-server.exe --whisper-model whisper-large-v3-turbo-qnn +``` + +## Hardware Requirements + +### Intel NPU Requirements +- **OS**: Windows 10/11 +- **Hardware**: Intel Arc graphics or Intel NPU +- **Runtime**: OpenVINO 2025.2.0+ runtime +- **Memory**: 8GB+ RAM recommended + +### Qualcomm NPU Requirements +- **OS**: Windows 11 (ARM64) +- **Hardware**: Snapdragon X Elite with HTP +- **Runtime**: ONNX Runtime QNN (bundled with dependencies) +- **Memory**: 8GB+ RAM recommended + +## Performance Optimization + +### Intel NPU Optimization +1. **Driver Updates**: Ensure latest Intel graphics drivers +2. **OpenVINO Version**: Use OpenVINO 2025.2.0 or later +3. **Model Precision**: FP16 models provide best performance/accuracy balance + +### Qualcomm NPU Optimization +1. **Device Compatibility**: Verify Snapdragon X Elite compatibility +2. **Power Settings**: Use high-performance power profile +3. **Memory Management**: Close unnecessary applications for optimal memory usage + +## Troubleshooting + +### Common Issues + +#### Intel NPU Issues +- **Driver Problems**: Update Intel graphics drivers +- **OpenVINO Installation**: Verify OpenVINO runtime is properly installed +- **Model Loading Errors**: Check model file integrity and paths + +#### Qualcomm NPU Issues +- **Architecture Mismatch**: Verify ARM64 Windows environment +- **QNN Availability**: Ensure Snapdragon X Elite with HTP support +- **Model Compilation**: Check ONNX Runtime QNN installation + +### Debug Commands + +```powershell +# Check NPU availability +.\dist\fluid-server.exe --log-level DEBUG + +# Test specific runtime +curl -X POST http://localhost:8080/v1/test -H "Content-Type: application/json" + +# Verify model loading +curl http://localhost:8080/v1/models +``` + +### Performance Monitoring + +```powershell +# Monitor transcription performance +curl -X POST http://localhost:8080/v1/audio/transcriptions \ + -F "file=@test_audio.wav" \ + -F "model=whisper-large-v3-turbo-qnn" \ + -F "response_format=verbose_json" +``` + +## Best Practices + +1. **Hardware Matching**: Use the NPU runtime that matches your hardware +2. **Model Selection**: Choose the appropriate model size for your use case +3. **Memory Management**: Monitor memory usage with larger models +4. **Performance Testing**: Benchmark different runtimes on your specific hardware +5. **Regular Updates**: Keep NPU drivers and runtimes updated \ No newline at end of file diff --git a/docs/streaming-improvements.md b/docs/streaming-improvements.md deleted file mode 100644 index 37905a4..0000000 --- a/docs/streaming-improvements.md +++ /dev/null @@ -1,271 +0,0 @@ -# Streaming Improvements Documentation - -This document details the streaming improvements implemented to resolve client connection drop issues. - -## Problem Statement - -The original issue reported by the client: -``` -ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response')) -``` - -This occurred during LLM inference streaming, causing the client application to crash when trying to stream responses. - -## Root Causes Identified - -1. **No Connection Keepalive**: Long pauses between tokens caused connection timeouts -2. **Token Blocking**: Synchronous token collection before streaming caused delays -3. **No Error Recovery**: Connection drops weren't handled gracefully -4. **No Timeout Handling**: Hung generators could block indefinitely - -## Solutions Implemented - -### 1. SSE Keepalive Heartbeat - -**Implementation**: Added Server-Sent Events (SSE) compatible keepalive messages every 20 seconds. - -**Location**: `src/fluid_server/api/v1/chat.py` - -```python -# Send heartbeat if needed (every 20 seconds) -if current_time - last_heartbeat_time >= 20.0: - yield ": keepalive\\n\\n" - last_heartbeat_time = current_time -``` - -**Key Points**: -- Uses SSE comment syntax (`: keepalive\n\n`) to avoid confusion with actual content -- Prevents connection timeouts during model processing -- Client-safe (ignored by SSE parsers as comments) - -### 2. Progressive Token Streaming - -**Problem**: Original implementation collected all tokens before streaming: -```python -# OLD: Blocking approach -full_response = "" -for token in stream: - full_response += token -# Then stream all at once -``` - -**Solution**: Implemented async queue pattern for immediate token streaming: - -**Location**: `src/fluid_server/runtimes/llamacpp_llm.py` and `openvino_llm.py` - -```python -def generate_stream_sync(self, prompt: str, max_tokens: int, temperature: float, top_p: float, - token_queue: asyncio.Queue, loop: asyncio.AbstractEventLoop) -> None: - # Send tokens immediately via queue - for output in stream: - token = extract_token(output) - if token: - asyncio.run_coroutine_threadsafe( - token_queue.put(token), loop - ) -``` - -### 3. Sentence-Based Buffering - -**Implementation**: Smart buffering that flushes on sentence boundaries or timeout. - -**Location**: `src/fluid_server/api/v1/chat.py` - -```python -def should_flush_buffer(buffer: str, last_flush_time: float, current_time: float) -> bool: - # Flush on sentence endings - if buffer.rstrip().endswith(('.', '!', '?', ':', ';')): - return True - - # Flush after timeout (1.5 seconds) - if current_time - last_flush_time >= 1.5: - return True - - return False -``` - -**Benefits**: -- Natural sentence-by-sentence streaming -- Prevents indefinite buffering with timeout -- Better user experience with coherent chunks - -### 4. Connection State Monitoring - -**Implementation**: Track connection state and handle disconnects gracefully. - -```python -async def stream_chat_completion(): - try: - async for token in model.generate_stream(...): - yield f"data: {json.dumps(chunk)}\\n\\n" - await asyncio.sleep(0) # Allow other async operations - except asyncio.CancelledError: - # Client disconnected - clean shutdown - logger.info("Client disconnected, stopping stream") - return - except Exception as e: - # Handle other errors gracefully - error_chunk = create_error_chunk(str(e)) - yield f"data: {json.dumps(error_chunk)}\\n\\n" -``` - -### 5. Timeout Handling - -**Implementation**: Added timeouts to prevent hung generators. - -**Location**: Both runtime implementations - -```python -async def generate_stream(self, ...): - try: - # Get token with timeout - token = await asyncio.wait_for(token_queue.get(), timeout=30.0) - - if token is None: # End of stream signal - break - - yield token - except asyncio.TimeoutError: - logger.warning("Token generation timeout - ending stream") - break -``` - -## HTTP Response Headers - -**Added Headers** for optimal streaming: -```python -headers = { - "Content-Type": "text/plain; charset=utf-8", - "Cache-Control": "no-cache", - "Connection": "keep-alive", - "X-Accel-Buffering": "no", # Disable nginx buffering -} -``` - -## Testing Results - -### Before Implementation -``` -2025-08-30 17:30:15,420 - Token: Hello -2025-08-30 17:30:15,420 - Token: world -2025-08-30 17:30:15,420 - Token: how -2025-08-30 17:30:15,420 - Token: are -2025-08-30 17:30:15,420 - Token: you? -``` -*All tokens had identical timestamps - indicating blocking behavior* - -### After Implementation -``` -2025-08-30 17:31:01,234 - Token: Hello -2025-08-30 17:31:01,456 - Token: world -2025-08-30 17:31:01,678 - Token: how -2025-08-30 17:31:01,890 - Token: are -2025-08-30 17:31:02,123 - Token: you? -``` -*Progressive timestamps showing true streaming* - -## Model Runtime Support - -### LlamaCpp Runtime (`llamacpp_llm.py`) -- ✅ Async queue streaming implemented -- ✅ GGUF model support with any HuggingFace format -- ✅ Vulkan backend for GPU acceleration -- ✅ Timeout handling and graceful shutdown - -### OpenVINO Runtime (`openvino_llm.py`) -- ✅ Async queue streaming implemented -- ✅ OpenVINO GenAI integration -- ✅ Timeout handling and graceful shutdown -- ✅ Memory-efficient token processing - -## Configuration - -### Default Settings -```python -# In config.py -DEFAULT_KEEPALIVE_INTERVAL = 20.0 # seconds -DEFAULT_SENTENCE_FLUSH_TIMEOUT = 1.5 # seconds -DEFAULT_TOKEN_QUEUE_SIZE = 100 # tokens -DEFAULT_GENERATION_TIMEOUT = 30.0 # seconds -``` - -### Customizable via Environment -```bash -FLUID_KEEPALIVE_INTERVAL=15 -FLUID_FLUSH_TIMEOUT=2.0 -FLUID_QUEUE_SIZE=50 -``` - -## Client Compatibility - -### OpenAI API Compatibility -All improvements maintain full OpenAI API compatibility: -- Same request/response format -- Same SSE event structure -- Same error handling patterns - -### Supported Clients -- ✅ OpenAI Python client -- ✅ cURL with SSE parsing -- ✅ Browser EventSource API -- ✅ Custom HTTP clients with streaming support - -## Performance Impact - -### Latency -- **First token**: No change (same model loading time) -- **Token streaming**: Significant improvement (immediate vs batched) -- **Connection stability**: Greatly improved with keepalive - -### Memory -- **Queue overhead**: Minimal (~100 tokens × ~50 bytes = ~5KB) -- **Model memory**: Unchanged -- **Connection overhead**: Slight increase due to heartbeat - -### Throughput -- **Tokens/second**: Same generation speed -- **Concurrent connections**: Better handling due to async improvements -- **Error recovery**: Faster with graceful disconnect handling - -## Monitoring and Debugging - -### Log Messages -```python -logger.info("Starting streaming generation") -logger.debug(f"Buffered {len(buffer)} characters") -logger.info("Client disconnected, stopping stream") -logger.warning("Token generation timeout - ending stream") -``` - -### Health Checks -The streaming improvements include health check endpoints: -- `/health` - Basic server health -- `/v1/models` - Available models status -- Connection state visible in logs - -## Future Improvements - -### Potential Enhancements -1. **Adaptive Keepalive**: Adjust interval based on model speed -2. **Client Capability Detection**: Optimize based on client features -3. **Compression**: Add gzip support for bandwidth efficiency -4. **Metrics**: Add streaming performance metrics -5. **Backpressure Handling**: Better queue management under load - -### Monitoring -Consider adding: -- Connection duration metrics -- Token streaming rate tracking -- Error rate monitoring -- Client disconnect patterns - -## Rollback Instructions - -If issues arise, the streaming improvements can be reverted by: - -1. **Revert token streaming**: Remove async queue, return to blocking collection -2. **Remove keepalive**: Comment out heartbeat sending code -3. **Remove buffering**: Stream tokens immediately without sentence detection -4. **Restore original headers**: Remove streaming optimization headers - -The changes are modular and can be selectively disabled if needed. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 9989022..615a246 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,9 +19,12 @@ dependencies = [ "librosa>=0.10.0", "pyinstaller>=6.15.0", "soundfile>=0.13.1", - "onnxruntime-qnn==1.22.0", + "onnxruntime-qnn==1.22.0; sys_platform == 'win32'", "openai-whisper>=20231117", "llama-cpp-python>=0.3.0", + "lancedb>=0.14.0", + "sentence-transformers>=2.2.0", + "pillow>=10.0.0", ] [tool.uv] diff --git a/src/fluid_server/__main__.py b/src/fluid_server/__main__.py index ee2f0cc..2435dd4 100644 --- a/src/fluid_server/__main__.py +++ b/src/fluid_server/__main__.py @@ -36,9 +36,9 @@ def main() -> None: if getattr(sys, 'frozen', False): import io if sys.stdout is None: - sys.stdout = io.TextIOWrapper(io.BufferedWriter(io.BytesIO()), encoding='utf-8') + sys.stdout = io.TextIOWrapper(io.BytesIO(), encoding='utf-8') if sys.stderr is None: - sys.stderr = io.TextIOWrapper(io.BufferedWriter(io.BytesIO()), encoding='utf-8') + sys.stderr = io.TextIOWrapper(io.BytesIO(), encoding='utf-8') parser = argparse.ArgumentParser( description="Fluid Server - OpenAI-compatible API with multiple model support", @@ -176,7 +176,7 @@ def main() -> None: ) # Validate that model path exists - create if missing instead of exiting - if not config.model_path.exists(): + if config.model_path and not config.model_path.exists(): logger.warning(f"Model path does not exist: {config.model_path}") logger.info("Creating model directory automatically...") try: @@ -193,8 +193,8 @@ def main() -> None: ) # Discover available models - logger.info(f"Discovering models in {config.model_path}") - available_models = ModelDiscovery.find_models(config.model_path, config.llm_model) + logger.info(f"Discovering models in {config.model_path_resolved}") + available_models = ModelDiscovery.find_models(config.model_path_resolved, config.llm_model) # Log available models if available_models.get("llm"): @@ -209,7 +209,7 @@ def main() -> None: # Validate requested models exist if config.llm_model not in available_models.get("llm", []): - logger.warning(f"LLM model '{config.llm_model}' not found in {config.model_path / 'llm'}") + logger.warning(f"LLM model '{config.llm_model}' not found in {config.model_path_resolved / 'llm'}") if available_models.get("llm"): logger.info(f"Available LLM models: {available_models['llm']}") logger.info("Continuing without LLM support...") diff --git a/src/fluid_server/api/v1/embeddings.py b/src/fluid_server/api/v1/embeddings.py new file mode 100644 index 0000000..68ef2cf --- /dev/null +++ b/src/fluid_server/api/v1/embeddings.py @@ -0,0 +1,303 @@ +""" +OpenAI-compatible embeddings endpoint +""" + +import logging +import time +from typing import Annotated, List, Union + +from fastapi import APIRouter, Depends, HTTPException, Request, File, UploadFile, Form +from fastapi.responses import JSONResponse + +from ...managers.embedding_manager import EmbeddingManager +from ...managers.runtime_manager import RuntimeManager +from ...models.openai import ( + EmbeddingRequest, + EmbeddingResponse, + EmbeddingData, + EmbeddingUsage +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/v1") + + +def get_embedding_manager(request: Request) -> EmbeddingManager: + """Dependency to get embedding manager""" + return request.app.embedding_manager + + +def get_runtime_manager(request: Request) -> RuntimeManager: + """Dependency to get runtime manager""" + return request.app.runtime_manager + + +@router.post("/embeddings") +async def create_embeddings( + request: EmbeddingRequest, + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)] +) -> EmbeddingResponse: + """ + Create embeddings for text inputs (OpenAI-compatible) + + This endpoint is compatible with OpenAI's embeddings API and can be used + as a drop-in replacement. + """ + try: + # Validate that embeddings are enabled + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + # Ensure input is a list + inputs = request.input if isinstance(request.input, list) else [request.input] + + # Generate embeddings + start_time = time.time() + embeddings = await embedding_manager.get_text_embeddings( + texts=inputs, + model_name=request.model + ) + processing_time = time.time() - start_time + + # Create response data + embedding_data = [] + for i, embedding in enumerate(embeddings): + embedding_data.append( + EmbeddingData( + embedding=embedding, + index=i + ) + ) + + # Calculate usage statistics (approximate) + total_tokens = sum(len(text.split()) for text in inputs) + usage = EmbeddingUsage( + prompt_tokens=total_tokens, + total_tokens=total_tokens + ) + + # Create response + response = EmbeddingResponse( + data=embedding_data, + model=request.model, + usage=usage + ) + + logger.info( + f"Generated embeddings for {len(inputs)} inputs " + f"using model '{request.model}' in {processing_time:.2f}s" + ) + + return response + + except Exception as e: + logger.error(f"Error generating embeddings: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/embeddings/batch") +async def create_embeddings_batch( + requests: List[EmbeddingRequest], + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)] +) -> List[EmbeddingResponse]: + """ + Create embeddings for multiple requests in batch + """ + try: + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + responses = [] + for request in requests: + # Process each request individually but return as batch + response = await create_embeddings(request, embedding_manager) + responses.append(response) + + return responses + + except Exception as e: + logger.error(f"Error in batch embeddings: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/embeddings/multimodal") +async def create_multimodal_embeddings( + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)], + input_type: str = Form(..., description="Type of input: text, image, or audio"), + model: str = Form(..., description="Model to use"), + text: str = Form(None, description="Text input (if input_type is text)"), + file: UploadFile = File(None, description="File input (for image or audio)"), + encoding_format: str = Form("float", description="Encoding format"), + dimensions: int = Form(None, description="Number of dimensions"), + user: str = Form(None, description="User identifier") +) -> EmbeddingResponse: + """ + Create embeddings for multimodal inputs (text, image, or audio) + """ + try: + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + # Validate input based on type + if input_type == "text": + if not text: + raise HTTPException( + status_code=400, + detail="Text input required when input_type is 'text'" + ) + inputs = [text] + embeddings = await embedding_manager.get_text_embeddings( + texts=inputs, + model_name=model + ) + + elif input_type == "image": + if not file: + raise HTTPException( + status_code=400, + detail="File input required when input_type is 'image'" + ) + + # Validate file type + if not file.content_type or not file.content_type.startswith("image/"): + raise HTTPException( + status_code=400, + detail="File must be an image" + ) + + # Read file data + file_data = await file.read() + embeddings = await embedding_manager.get_image_embeddings( + image_bytes=file_data, + model_name=model + ) + inputs = [f"image:{file.filename}"] + + elif input_type == "audio": + if not file: + raise HTTPException( + status_code=400, + detail="File input required when input_type is 'audio'" + ) + + # Validate file type + if not file.content_type or not file.content_type.startswith("audio/"): + raise HTTPException( + status_code=400, + detail="File must be an audio file" + ) + + # Read file data + file_data = await file.read() + embeddings = await embedding_manager.get_audio_embeddings( + audio_bytes=file_data, + model_name=model + ) + inputs = [f"audio:{file.filename}"] + + else: + raise HTTPException( + status_code=400, + detail="input_type must be 'text', 'image', or 'audio'" + ) + + # Create response data + embedding_data = [] + for i, embedding in enumerate(embeddings): + embedding_data.append( + EmbeddingData( + embedding=embedding, + index=i + ) + ) + + # Calculate usage statistics + if input_type == "text": + total_tokens = sum(len(text.split()) for text in inputs) + else: + # For non-text, use a rough estimate based on file size + total_tokens = len(file_data) // 100 if file_data else 1 + + usage = EmbeddingUsage( + prompt_tokens=total_tokens, + total_tokens=total_tokens + ) + + # Create response + response = EmbeddingResponse( + data=embedding_data, + model=model, + usage=usage + ) + + logger.info( + f"Generated {input_type} embeddings using model '{model}'" + ) + + return response + + except Exception as e: + logger.error(f"Error generating multimodal embeddings: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/embeddings/models") +async def list_embedding_models( + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)] +) -> JSONResponse: + """ + List available embedding models + """ + try: + info = embedding_manager.get_embedding_info() + + models = [] + for model_type, model_list in info["available_models"].items(): + for model_name in model_list: + models.append({ + "id": model_name, + "object": "model", + "created": int(time.time()), + "owned_by": "fluid-server", + "model_type": f"embedding_{model_type}" + }) + + return JSONResponse(content={ + "object": "list", + "data": models + }) + + except Exception as e: + logger.error(f"Error listing embedding models: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/embeddings/info") +async def get_embedding_info( + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)] +) -> JSONResponse: + """ + Get detailed information about embedding system status + """ + try: + info = embedding_manager.get_embedding_info() + return JSONResponse(content=info) + + except Exception as e: + logger.error(f"Error getting embedding info: {e}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/src/fluid_server/api/v1/models.py b/src/fluid_server/api/v1/models.py index 3f7b6d9..c24d21c 100644 --- a/src/fluid_server/api/v1/models.py +++ b/src/fluid_server/api/v1/models.py @@ -142,7 +142,7 @@ async def download_model( ) # Get model directory - model_base_path = runtime_manager.config.model_path / request.model_type + model_base_path = runtime_manager.config.model_path_resolved / request.model_type model_base_path.mkdir(parents=True, exist_ok=True) # Determine local name diff --git a/src/fluid_server/api/v1/vector_store.py b/src/fluid_server/api/v1/vector_store.py new file mode 100644 index 0000000..7f192ab --- /dev/null +++ b/src/fluid_server/api/v1/vector_store.py @@ -0,0 +1,429 @@ +""" +Vector store management endpoints for LanceDB operations +""" + +import logging +import uuid +from typing import Annotated, List, Optional + +from fastapi import APIRouter, Depends, HTTPException, Request, File, UploadFile, Form +from fastapi.responses import JSONResponse + +from ...managers.embedding_manager import EmbeddingManager +from ...storage.lancedb_client import LanceDBClient, VectorDocument +from ...models.openai import ( + VectorStoreInsertRequest, + VectorStoreInsertResponse, + VectorStoreSearchRequest, + VectorStoreSearchResponse, + VectorStoreSearchResult, + CollectionListResponse, + CollectionInfo, + CreateCollectionRequest +) + +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/v1/vector_store") + + +def get_embedding_manager(request: Request) -> EmbeddingManager: + """Dependency to get embedding manager""" + return request.app.embedding_manager + + +def get_lancedb_client(request: Request) -> LanceDBClient: + """Dependency to get LanceDB client""" + return request.app.lancedb_client + + +@router.post("/collections") +async def create_collection( + request: CreateCollectionRequest, + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> JSONResponse: + """ + Create a new vector store collection + """ + try: + await lancedb_client.create_collection( + collection_name=request.name, + dimension=request.dimension, + content_type=request.content_type, + overwrite=request.overwrite + ) + + return JSONResponse(content={ + "collection_name": request.name, + "dimension": request.dimension, + "content_type": request.content_type, + "created": True + }) + + except Exception as e: + logger.error(f"Error creating collection '{request.name}': {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/collections") +async def list_collections( + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> CollectionListResponse: + """ + List all vector store collections + """ + try: + collection_names = await lancedb_client.list_collections() + collections = [] + + for name in collection_names: + try: + stats = await lancedb_client.get_collection_stats(name) + collections.append( + CollectionInfo( + name=name, + num_documents=stats["num_documents"], + content_types=["text"] # Default, could be enhanced + ) + ) + except Exception as e: + logger.warning(f"Could not get stats for collection '{name}': {e}") + collections.append( + CollectionInfo( + name=name, + num_documents=0, + content_types=["text"] + ) + ) + + return CollectionListResponse( + collections=collections, + total_collections=len(collections) + ) + + except Exception as e: + logger.error(f"Error listing collections: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/insert") +async def insert_documents( + request: VectorStoreInsertRequest, + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)], + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> VectorStoreInsertResponse: + """ + Insert documents into a vector store collection + """ + try: + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + # Extract text content for embedding generation + texts = [doc.content for doc in request.documents] + + # Generate embeddings using specified model or default + model_name = request.model or embedding_manager.config.embedding_model + embeddings = await embedding_manager.get_text_embeddings( + texts=texts, + model_name=model_name + ) + + # Create VectorDocument objects + vector_documents = [] + inserted_ids = [] + + for i, (doc, embedding) in enumerate(zip(request.documents, embeddings)): + # Use provided ID or generate one + doc_id = doc.id if doc.id else str(uuid.uuid4()) + inserted_ids.append(doc_id) + + vector_doc = VectorDocument( + id=doc_id, + content=doc.content, + vector=embedding, + metadata=doc.metadata or {}, + content_type=doc.content_type + ) + vector_documents.append(vector_doc) + + # Insert into LanceDB + await lancedb_client.insert_documents( + collection_name=request.collection, + documents=vector_documents + ) + + logger.info( + f"Inserted {len(vector_documents)} documents into collection '{request.collection}'" + ) + + return VectorStoreInsertResponse( + inserted_count=len(vector_documents), + collection=request.collection, + ids=inserted_ids + ) + + except Exception as e: + logger.error(f"Error inserting documents: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/search") +async def search_vectors( + request: VectorStoreSearchRequest, + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)], + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> VectorStoreSearchResponse: + """ + Search for similar vectors in a collection + """ + try: + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + # Generate query embedding based on query type + if request.query_type == "text": + if isinstance(request.query, bytes): + raise HTTPException( + status_code=400, + detail="Text query must be a string" + ) + + model_name = request.model or embedding_manager.config.embedding_model + query_embeddings = await embedding_manager.get_text_embeddings( + texts=[request.query], + model_name=model_name + ) + query_vector = query_embeddings[0] + + elif request.query_type == "image": + if not isinstance(request.query, bytes): + raise HTTPException( + status_code=400, + detail="Image query must be bytes" + ) + + model_name = request.model or embedding_manager.config.multimodal_model + query_embeddings = await embedding_manager.get_image_embeddings( + image_bytes=request.query, + model_name=model_name + ) + query_vector = query_embeddings[0] + + elif request.query_type == "audio": + if not isinstance(request.query, bytes): + raise HTTPException( + status_code=400, + detail="Audio query must be bytes" + ) + + query_embeddings = await embedding_manager.get_audio_embeddings( + audio_bytes=request.query, + model_name=request.model + ) + query_vector = query_embeddings[0] + + else: + raise HTTPException( + status_code=400, + detail="query_type must be 'text', 'image', or 'audio'" + ) + + # Perform vector search + search_results = await lancedb_client.search_vectors( + collection_name=request.collection, + query_vector=query_vector, + limit=request.limit, + filter_condition=request.filter + ) + + # Convert results to response format + results = [] + for result in search_results: + results.append( + VectorStoreSearchResult( + id=result["id"], + content=result["content"], + metadata=result.get("metadata", {}), + similarity_score=result.get("similarity_score", 0.0), + content_type=result.get("content_type", "text") + ) + ) + + logger.info( + f"Found {len(results)} results for {request.query_type} query in collection '{request.collection}'" + ) + + return VectorStoreSearchResponse( + results=results, + collection=request.collection, + query_type=request.query_type, + total_results=len(results) + ) + + except Exception as e: + logger.error(f"Error searching vectors: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/search/multimodal") +async def search_multimodal( + embedding_manager: Annotated[EmbeddingManager, Depends(get_embedding_manager)], + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)], + collection: str = Form(..., description="Collection name"), + query_type: str = Form(..., description="Query type: text, image, or audio"), + limit: int = Form(10, description="Maximum results to return"), + text_query: str = Form(None, description="Text query (if query_type is text)"), + file_query: UploadFile = File(None, description="File query (for image or audio)"), + filter: str = Form(None, description="Optional filter condition"), + model: str = Form(None, description="Model to use") +) -> VectorStoreSearchResponse: + """ + Search for similar vectors using multimodal queries (text, image, or audio files) + """ + try: + if not embedding_manager.config.enable_embeddings: + raise HTTPException( + status_code=503, + detail="Embeddings functionality is disabled" + ) + + # Prepare query data based on type + if query_type == "text": + if not text_query: + raise HTTPException( + status_code=400, + detail="text_query required when query_type is 'text'" + ) + query_data = text_query + + elif query_type in ["image", "audio"]: + if not file_query: + raise HTTPException( + status_code=400, + detail="file_query required for image/audio queries" + ) + + # Validate file type + expected_prefix = query_type + "/" + if not file_query.content_type or not file_query.content_type.startswith(expected_prefix): + raise HTTPException( + status_code=400, + detail=f"File must be a {query_type} file" + ) + + query_data = await file_query.read() + + else: + raise HTTPException( + status_code=400, + detail="query_type must be 'text', 'image', or 'audio'" + ) + + # Create search request + search_request = VectorStoreSearchRequest( + collection=collection, + query=query_data, + query_type=query_type, + limit=limit, + filter=filter, + model=model + ) + + # Perform search using the main search function + return await search_vectors(search_request, embedding_manager, lancedb_client) + + except Exception as e: + logger.error(f"Error in multimodal search: {e}") + if isinstance(e, HTTPException): + raise + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/{collection}/{document_id}") +async def get_document( + collection: str, + document_id: str, + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> JSONResponse: + """ + Get a specific document by ID + """ + try: + document = await lancedb_client.get_document(collection, document_id) + + if not document: + raise HTTPException( + status_code=404, + detail=f"Document '{document_id}' not found in collection '{collection}'" + ) + + return JSONResponse(content={ + "id": document.id, + "content": document.content, + "metadata": document.metadata, + "content_type": document.content_type + }) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting document: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.delete("/{collection}/{document_id}") +async def delete_document( + collection: str, + document_id: str, + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> JSONResponse: + """ + Delete a document by ID + """ + try: + success = await lancedb_client.delete_document(collection, document_id) + + if success: + return JSONResponse(content={ + "deleted": True, + "collection": collection, + "document_id": document_id + }) + else: + raise HTTPException( + status_code=404, + detail=f"Document '{document_id}' not found in collection '{collection}'" + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting document: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/{collection}/stats") +async def get_collection_stats( + collection: str, + lancedb_client: Annotated[LanceDBClient, Depends(get_lancedb_client)] +) -> JSONResponse: + """ + Get statistics for a collection + """ + try: + stats = await lancedb_client.get_collection_stats(collection) + return JSONResponse(content=stats) + + except Exception as e: + logger.error(f"Error getting collection stats: {e}") + raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/src/fluid_server/app.py b/src/fluid_server/app.py index e05c731..a2194e7 100644 --- a/src/fluid_server/app.py +++ b/src/fluid_server/app.py @@ -11,14 +11,18 @@ from fastapi.responses import JSONResponse from .api import health -from .api.v1 import audio, chat, models +from .api.v1 import audio, chat, models, embeddings, vector_store from .config import ServerConfig from .managers.runtime_manager import RuntimeManager +from .managers.embedding_manager import EmbeddingManager +from .storage.lancedb_client import LanceDBClient logger = logging.getLogger(__name__) -# Global runtime manager instance +# Global manager instances runtime_manager: RuntimeManager | None = None +embedding_manager: EmbeddingManager | None = None +lancedb_client: LanceDBClient | None = None @asynccontextmanager @@ -30,20 +34,45 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: app: FastAPI application instance """ # Startup - global runtime_manager + global runtime_manager, embedding_manager, lancedb_client config: ServerConfig = app.state.config logger.info("Starting Fluid Server...") + logger.info(f"Data root: {config.data_root}") logger.info(f"Model path: {config.model_path}") logger.info(f"Cache directory: {config.cache_dir}") + logger.info(f"Embeddings enabled: {config.enable_embeddings}") logger.info("Device assignment: GPU for LLM, NPU for Whisper") # Initialize runtime manager runtime_manager = RuntimeManager(config) await runtime_manager.initialize() + # Initialize embedding manager if enabled + if config.enable_embeddings: + try: + embedding_manager = EmbeddingManager(config) + await embedding_manager.initialize() + + # Initialize LanceDB client + lancedb_client = LanceDBClient( + db_path=config.embeddings_db_path, + db_name=config.embeddings_db_name + ) + await lancedb_client.initialize() + + logger.info("Embedding system initialized successfully") + except Exception as e: + logger.warning(f"Failed to initialize embedding system: {e}") + embedding_manager = None + lancedb_client = None + else: + logger.info("Embeddings disabled in configuration") + # Store in app state for dependency injection app.state.runtime_manager = runtime_manager + app.state.embedding_manager = embedding_manager + app.state.lancedb_client = lancedb_client yield @@ -51,6 +80,10 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: logger.info("Shutting down Fluid Server...") if runtime_manager: await runtime_manager.unload_all() + if embedding_manager: + await embedding_manager.shutdown() + if lancedb_client: + await lancedb_client.close() def create_app(config: ServerConfig) -> FastAPI: @@ -87,24 +120,57 @@ def create_app(config: ServerConfig) -> FastAPI: from .api.v1.audio import get_runtime_manager as audio_get_runtime_manager from .api.v1.chat import get_runtime_manager as chat_get_runtime_manager from .api.v1.models import get_runtime_manager as models_get_runtime_manager - - # Dependency to get runtime manager + + # Import embedding dependencies if they exist + try: + from .api.v1.embeddings import get_embedding_manager, get_runtime_manager as embeddings_get_runtime_manager + from .api.v1.vector_store import get_embedding_manager as vs_get_embedding_manager, get_lancedb_client + EMBEDDINGS_AVAILABLE = True + except ImportError: + EMBEDDINGS_AVAILABLE = False + + # Dependency functions def get_runtime_manager() -> RuntimeManager: """Get runtime manager dependency""" return app.state.runtime_manager - - app.dependency_overrides = { + + def get_embedding_manager_dep() -> EmbeddingManager: + """Get embedding manager dependency""" + return app.state.embedding_manager + + def get_lancedb_client_dep() -> LanceDBClient: + """Get LanceDB client dependency""" + return app.state.lancedb_client + + # Set up dependency overrides + overrides = { chat_get_runtime_manager: get_runtime_manager, models_get_runtime_manager: get_runtime_manager, audio_get_runtime_manager: get_runtime_manager, health_get_runtime_manager: get_runtime_manager, } + + # Add embedding dependencies if available + if EMBEDDINGS_AVAILABLE: + overrides.update({ + get_embedding_manager: get_embedding_manager_dep, + embeddings_get_runtime_manager: get_runtime_manager, + vs_get_embedding_manager: get_embedding_manager_dep, + get_lancedb_client: get_lancedb_client_dep, + }) + + app.dependency_overrides = overrides # Include routers app.include_router(health.router) app.include_router(chat.router) app.include_router(models.router) app.include_router(audio.router) + + # Include embedding routers if embeddings are available + if EMBEDDINGS_AVAILABLE and config.enable_embeddings: + app.include_router(embeddings.router) + app.include_router(vector_store.router) # Global exception handler @app.exception_handler(Exception) @@ -130,10 +196,13 @@ def create_worker_app() -> FastAPI: config = ServerConfig( host=os.getenv("FLUID_HOST", "127.0.0.1"), port=int(os.getenv("FLUID_PORT", "3847")), - model_path=Path(os.getenv("FLUID_MODEL_PATH", "./models")), + data_root=Path(os.getenv("FLUID_DATA_ROOT", "./data")), + model_path=Path(os.getenv("FLUID_MODEL_PATH", "")) if os.getenv("FLUID_MODEL_PATH") else None, cache_dir=Path(os.getenv("FLUID_CACHE_DIR", "")) if os.getenv("FLUID_CACHE_DIR") else None, llm_model=os.getenv("FLUID_LLM_MODEL", "qwen3-8b-int4-ov"), whisper_model=os.getenv("FLUID_WHISPER_MODEL", "whisper-tiny"), + enable_embeddings=os.getenv("FLUID_ENABLE_EMBEDDINGS", "true").lower() == "true", + embedding_model=os.getenv("FLUID_EMBEDDING_MODEL", "BAAI/bge-small-en-v1.5"), warm_up=os.getenv("FLUID_WARM_UP", "true").lower() == "true", idle_timeout_minutes=int(os.getenv("FLUID_IDLE_TIMEOUT", "5")), ) diff --git a/src/fluid_server/config.py b/src/fluid_server/config.py index 26857f0..4c0beff 100644 --- a/src/fluid_server/config.py +++ b/src/fluid_server/config.py @@ -14,15 +14,24 @@ class ServerConfig: host: str = "127.0.0.1" port: int = 3847 - # Model paths - model_path: Path = Path("./models") # Base path for all models - cache_dir: Path | None = None # Defaults to model_path/cache + # Data paths + data_root: Path = Path("./data") # Root directory for all server data + model_path: Path | None = None # Defaults to data_root/models + cache_dir: Path | None = None # Defaults to data_root/cache + embeddings_db_path: Path | None = None # Defaults to data_root/databases # Model selection llm_model: str = "qwen3-8b-int4-ov" # Which LLM to load whisper_model: str = "whisper-large-v3-turbo-fp16-ov-npu" # Which Whisper to load device: str = "AUTO" # Device for inference: AUTO, CPU, GPU, NPU + # Embeddings configuration + enable_embeddings: bool = True # Enable embeddings functionality + embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2" # Default text embedding model (384 dim) 22M parameters + embedding_device: str = "CPU" # Device for embeddings: AUTO, CPU, GPU, NPU (CPU more stable for sentence-transformers) + embeddings_db_name: str = "embeddings" # LanceDB database name + multimodal_model: str = "openai/clip-vit-base-patch32" # For image embeddings + # Features warm_up: bool = True # Warm up models on startup max_memory_gb: float = 4.0 # Memory limit @@ -43,15 +52,26 @@ def __post_init__(self) -> None: if self._initialized: return - # Convert to Path object and ensure it's absolute - path_obj = Path(self.model_path) - if not path_obj.is_absolute(): - self.model_path = path_obj.resolve() + # Convert data_root to Path object and ensure it's absolute + data_root_obj = Path(self.data_root) + if not data_root_obj.is_absolute(): + self.data_root = data_root_obj.resolve() else: - self.model_path = path_obj + self.data_root = data_root_obj + # Set model_path default if not provided + if self.model_path is None: + self.model_path = self.data_root / "models" + else: + model_path_obj = Path(self.model_path) + if not model_path_obj.is_absolute(): + self.model_path = model_path_obj.resolve() + else: + self.model_path = model_path_obj + + # Set cache_dir default if not provided if self.cache_dir is None: - self.cache_dir = self.model_path / "cache" + self.cache_dir = self.data_root / "cache" else: cache_path_obj = Path(self.cache_dir) if not cache_path_obj.is_absolute(): @@ -59,8 +79,40 @@ def __post_init__(self) -> None: else: self.cache_dir = cache_path_obj - # Create cache directory if it doesn't exist + # Set embeddings_db_path default if not provided + if self.embeddings_db_path is None: + self.embeddings_db_path = self.data_root / "databases" + else: + db_path_obj = Path(self.embeddings_db_path) + if not db_path_obj.is_absolute(): + self.embeddings_db_path = db_path_obj.resolve() + else: + self.embeddings_db_path = db_path_obj + + # Create necessary directories + self.data_root.mkdir(parents=True, exist_ok=True) + self.model_path.mkdir(parents=True, exist_ok=True) self.cache_dir.mkdir(parents=True, exist_ok=True) + if self.enable_embeddings: + self.embeddings_db_path.mkdir(parents=True, exist_ok=True) # Mark as initialized self._initialized = True + + @property + def model_path_resolved(self) -> Path: + """Get model_path as guaranteed Path (after __post_init__)""" + assert self.model_path is not None, "model_path should be set after __post_init__" + return self.model_path + + @property + def cache_dir_resolved(self) -> Path: + """Get cache_dir as guaranteed Path (after __post_init__)""" + assert self.cache_dir is not None, "cache_dir should be set after __post_init__" + return self.cache_dir + + @property + def embeddings_db_path_resolved(self) -> Path: + """Get embeddings_db_path as guaranteed Path (after __post_init__)""" + assert self.embeddings_db_path is not None, "embeddings_db_path should be set after __post_init__" + return self.embeddings_db_path diff --git a/src/fluid_server/managers/embedding_manager.py b/src/fluid_server/managers/embedding_manager.py new file mode 100644 index 0000000..c01c999 --- /dev/null +++ b/src/fluid_server/managers/embedding_manager.py @@ -0,0 +1,484 @@ +""" +Embedding manager for handling multiple embedding models +""" + +import asyncio +import logging +import time +from typing import Any, Dict, List, Optional, Union + +from ..config import ServerConfig +from ..runtimes.base_embedding import BaseEmbeddingRuntime, EmbeddingType +from ..runtimes.openvino_embedding import OpenVINOEmbeddingRuntime +from ..runtimes.whisper_embedding import WhisperEmbeddingRuntime +from ..utils.model_discovery import ModelDiscovery +from ..utils.model_downloader import ModelDownloader +from ..utils.retry import retry_async + +logger = logging.getLogger(__name__) + + +class EmbeddingManager: + """Manages embedding model runtimes for text, image, and audio embeddings""" + + def __init__(self, config: ServerConfig) -> None: + """ + Initialize embedding manager + + Args: + config: Server configuration + """ + self.config = config + + # Keep separate runtimes for different embedding types + self.text_runtime: BaseEmbeddingRuntime | None = None + self.multimodal_runtime: BaseEmbeddingRuntime | None = None # For text + images + self.audio_runtime: BaseEmbeddingRuntime | None = None + + # Track loaded models + self.loaded_text_model: str | None = None + self.loaded_multimodal_model: str | None = None + self.loaded_audio_model: str | None = None + + # Model discovery and downloading + self.available_models: Dict[str, List[str]] = {} + self.downloader = ModelDownloader( + config.model_path_resolved, config.cache_dir_resolved + ) + + # Idle cleanup task + self._idle_task: asyncio.Task | None = None + self.download_status: dict[str, str] = {} # Track download status for models + + # Track warm-up status + self.warmup_status = { + "in_progress": False, + "text": "pending", # pending/loading/ready/failed + "multimodal": "pending", # pending/loading/ready/failed + "audio": "pending", # pending/loading/ready/failed + "current_task": "", + "start_time": None, + } + + async def initialize(self) -> None: + """Initialize embedding manager and optionally warm up models""" + if not self.config.enable_embeddings: + logger.info("Embeddings disabled in configuration") + return + + logger.info(f"Initializing embedding manager with models from {self.config.model_path}") + + # Discover available embedding models + self._discover_embedding_models() + logger.info(f"Available embedding models: {self.available_models}") + + # Start idle cleanup monitoring + self._schedule_idle_cleanup() + + if self.config.warm_up: + logger.info("Starting background embedding model warm-up...") + asyncio.create_task(self._warm_up_models()) + + def _discover_embedding_models(self) -> None: + """Discover available embedding models in the model directory""" + embeddings_dir = self.config.model_path / "embeddings" + + if not embeddings_dir.exists(): + embeddings_dir.mkdir(parents=True, exist_ok=True) + + self.available_models = { + "text": [], + "multimodal": [], + "audio": [] + } + + # Scan for embedding models + for model_dir in embeddings_dir.iterdir(): + if model_dir.is_dir(): + model_dir_name = model_dir.name + + # Convert directory name back to HuggingFace model ID + model_name = model_dir_name.replace("_", "/") + + # Categorize by model name patterns + if "clip" in model_name.lower() or "multimodal" in model_name.lower(): + self.available_models["multimodal"].append(model_name) + elif "whisper" in model_name.lower(): + self.available_models["audio"].append(model_name) + else: + # Default to text model + self.available_models["text"].append(model_name) + + async def _warm_up_models(self) -> None: + """Warm up embedding models in the background""" + try: + self.warmup_status["in_progress"] = True + self.warmup_status["start_time"] = time.time() + self.warmup_status["current_task"] = "Starting embedding model warm-up..." + + logger.info("Starting comprehensive embedding model warm-up...") + + # Phase 1: Download models if needed + await self._download_models_if_needed() + + # Phase 2: Pre-load models + await self._preload_models() + + self.warmup_status["in_progress"] = False + elapsed = time.time() - self.warmup_status["start_time"] + logger.info(f"Embedding model warm-up completed in {elapsed:.1f}s") + + except Exception as e: + logger.error(f"Error during embedding model warm-up: {e}") + self.warmup_status["in_progress"] = False + + async def _download_models_if_needed(self) -> None: + """Download embedding models if not available locally from HuggingFace""" + # Check for default text embedding model + if self.config.embedding_model not in self.available_models.get("text", []): + logger.info(f"Text embedding model '{self.config.embedding_model}' not found locally, downloading...") + await self._download_huggingface_model( + self.config.embedding_model, + "text" + ) + + # Check for multimodal model + if self.config.multimodal_model not in self.available_models.get("multimodal", []): + logger.info(f"Multimodal model '{self.config.multimodal_model}' not found locally, downloading...") + await self._download_huggingface_model( + self.config.multimodal_model, + "multimodal" + ) + + async def _preload_models(self) -> None: + """Pre-load embedding models for faster first requests""" + tasks = [] + + # Pre-load text model + if self.available_models.get("text"): + model_name = self.config.embedding_model + if model_name in self.available_models["text"]: + tasks.append(self._load_text_model(model_name)) + + # Pre-load multimodal model + if self.available_models.get("multimodal"): + model_name = self.config.multimodal_model + # Use first available if configured model not found + available_multimodal = self.available_models["multimodal"] + if available_multimodal: + actual_model = model_name if model_name in available_multimodal else available_multimodal[0] + tasks.append(self._load_multimodal_model(actual_model)) + + # Pre-load audio model if available + if self.available_models.get("audio"): + model_name = self.available_models["audio"][0] + tasks.append(self._load_audio_model(model_name)) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + async def _download_huggingface_model(self, model_id: str, model_type: str) -> None: + """Download model from HuggingFace Hub""" + try: + import asyncio + from concurrent.futures import ThreadPoolExecutor + + # Use the existing executor or create one + executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="EmbeddingDownload") + + loop = asyncio.get_event_loop() + await loop.run_in_executor(executor, self._download_huggingface_model_sync, model_id, model_type) + + # Refresh available models after download + self._discover_embedding_models() + + except Exception as e: + logger.error(f"Failed to download model '{model_id}': {e}") + raise + + def _download_huggingface_model_sync(self, model_id: str, model_type: str) -> None: + """Synchronous HuggingFace model download""" + try: + from sentence_transformers import SentenceTransformer + import os + + # Create target directory + target_dir = self.config.model_path / "embeddings" / model_id.replace("/", "_") + target_dir.mkdir(parents=True, exist_ok=True) + + logger.info(f"Downloading {model_type} model '{model_id}' to {target_dir}") + + # Download the model using sentence-transformers + # This will cache it in the HuggingFace cache, then we copy to our structure + model = SentenceTransformer(model_id) + + # Save to our directory structure + model.save(str(target_dir)) + + logger.info(f"Successfully downloaded model '{model_id}'") + + except Exception as e: + logger.error(f"Error downloading model '{model_id}': {e}") + raise + + async def get_text_embeddings( + self, + texts: Union[str, List[str]], + model_name: Optional[str] = None + ) -> List[List[float]]: + """Generate text embeddings""" + if not self.config.enable_embeddings: + raise RuntimeError("Embeddings are disabled") + + target_model = model_name or self.config.embedding_model + + # Load model if not already loaded or different model requested + if self.loaded_text_model != target_model: + await self._load_text_model(target_model) + + if not self.text_runtime or not self.text_runtime.is_loaded: + raise RuntimeError("Text embedding model not available") + + return await self.text_runtime.embed(texts, EmbeddingType.TEXT) + + async def get_image_embeddings( + self, + image_bytes: bytes, + model_name: Optional[str] = None + ) -> List[List[float]]: + """Generate image embeddings""" + if not self.config.enable_embeddings: + raise RuntimeError("Embeddings are disabled") + + target_model = model_name or self.config.multimodal_model + + # Load model if not already loaded or different model requested + if self.loaded_multimodal_model != target_model: + await self._load_multimodal_model(target_model) + + if not self.multimodal_runtime or not self.multimodal_runtime.is_loaded: + raise RuntimeError("Multimodal embedding model not available") + + return await self.multimodal_runtime.embed(image_bytes, EmbeddingType.IMAGE) + + async def get_audio_embeddings( + self, + audio_bytes: bytes, + model_name: Optional[str] = None + ) -> List[List[float]]: + """Generate audio embeddings""" + if not self.config.enable_embeddings: + raise RuntimeError("Embeddings are disabled") + + # Use first available audio model if none specified + if not model_name: + if not self.available_models.get("audio"): + raise RuntimeError("No audio embedding models available") + model_name = self.available_models["audio"][0] + + # Load model if not already loaded or different model requested + if self.loaded_audio_model != model_name: + await self._load_audio_model(model_name) + + if not self.audio_runtime or not self.audio_runtime.is_loaded: + raise RuntimeError("Audio embedding model not available") + + return await self.audio_runtime.embed(audio_bytes, EmbeddingType.AUDIO) + + async def _load_text_model(self, model_name: str) -> None: + """Load text embedding model""" + try: + self.warmup_status["text"] = "loading" + self.warmup_status["current_task"] = f"Loading text embedding model '{model_name}'..." + + # Convert HuggingFace model ID to directory name + model_dir_name = model_name.replace("/", "_") + model_path = self.config.model_path / "embeddings" / model_dir_name + + # If model doesn't exist locally, download it + if not model_path.exists(): + logger.info(f"Model {model_name} not found locally, downloading...") + await self._download_huggingface_model(model_name, "text") + + # Create runtime + runtime = OpenVINOEmbeddingRuntime( + model_path=model_path, + cache_dir=self.config.cache_dir, + device=self.config.embedding_device, + model_type="text" + ) + + await runtime.load() + + # Replace current runtime + if self.text_runtime: + await self.text_runtime.unload() + + self.text_runtime = runtime + self.loaded_text_model = model_name + self.warmup_status["text"] = "ready" + + logger.info(f"Text embedding model '{model_name}' loaded successfully") + + except Exception as e: + self.warmup_status["text"] = "failed" + logger.error(f"Failed to load text embedding model '{model_name}': {e}") + raise + + async def _load_multimodal_model(self, model_name: str) -> None: + """Load multimodal embedding model""" + try: + self.warmup_status["multimodal"] = "loading" + self.warmup_status["current_task"] = f"Loading multimodal embedding model '{model_name}'..." + + model_path = self.config.model_path / "embeddings" / model_name + + # Create runtime + runtime = OpenVINOEmbeddingRuntime( + model_path=model_path, + cache_dir=self.config.cache_dir, + device=self.config.embedding_device, + model_type="multimodal" + ) + + await runtime.load() + + # Replace current runtime + if self.multimodal_runtime: + await self.multimodal_runtime.unload() + + self.multimodal_runtime = runtime + self.loaded_multimodal_model = model_name + self.warmup_status["multimodal"] = "ready" + + logger.info(f"Multimodal embedding model '{model_name}' loaded successfully") + + except Exception as e: + self.warmup_status["multimodal"] = "failed" + logger.error(f"Failed to load multimodal embedding model '{model_name}': {e}") + raise + + async def _load_audio_model(self, model_name: str) -> None: + """Load audio embedding model""" + try: + self.warmup_status["audio"] = "loading" + self.warmup_status["current_task"] = f"Loading audio embedding model '{model_name}'..." + + model_path = self.config.model_path / "embeddings" / model_name + + # Create runtime + runtime = WhisperEmbeddingRuntime( + model_path=model_path, + cache_dir=self.config.cache_dir, + device=self.config.embedding_device, + max_memory_gb=self.config.max_memory_gb + ) + + await runtime.load() + + # Replace current runtime + if self.audio_runtime: + await self.audio_runtime.unload() + + self.audio_runtime = runtime + self.loaded_audio_model = model_name + self.warmup_status["audio"] = "ready" + + logger.info(f"Audio embedding model '{model_name}' loaded successfully") + + except Exception as e: + self.warmup_status["audio"] = "failed" + logger.error(f"Failed to load audio embedding model '{model_name}': {e}") + raise + + def _schedule_idle_cleanup(self) -> None: + """Schedule idle cleanup task""" + if self._idle_task is None or self._idle_task.done(): + self._idle_task = asyncio.create_task(self._idle_cleanup_loop()) + + async def _idle_cleanup_loop(self) -> None: + """Background task to unload idle models""" + while True: + try: + await asyncio.sleep(self.config.idle_check_interval_seconds) + await self._check_and_unload_idle_models() + except asyncio.CancelledError: + break + except Exception as e: + logger.error(f"Error in embedding idle cleanup: {e}") + + async def _check_and_unload_idle_models(self) -> None: + """Check for idle models and unload them""" + idle_timeout = self.config.idle_timeout_minutes * 60 # Convert to seconds + + runtimes = [ + ("text", self.text_runtime), + ("multimodal", self.multimodal_runtime), + ("audio", self.audio_runtime) + ] + + for runtime_type, runtime in runtimes: + if runtime and runtime.is_loaded and runtime.get_idle_time() > idle_timeout: + logger.info(f"Unloading idle {runtime_type} embedding model: {runtime.model_name}") + await runtime.unload() + + # Reset loaded model tracking + if runtime_type == "text": + self.loaded_text_model = None + self.text_runtime = None + elif runtime_type == "multimodal": + self.loaded_multimodal_model = None + self.multimodal_runtime = None + elif runtime_type == "audio": + self.loaded_audio_model = None + self.audio_runtime = None + + def get_embedding_info(self) -> dict[str, Any]: + """Get information about loaded embedding models""" + info = { + "enabled": self.config.enable_embeddings, + "available_models": self.available_models, + "warmup_status": self.warmup_status, + "loaded_models": { + "text": self.loaded_text_model, + "multimodal": self.loaded_multimodal_model, + "audio": self.loaded_audio_model + }, + "runtime_info": {} + } + + # Add runtime info for loaded models + if self.text_runtime: + info["runtime_info"]["text"] = self.text_runtime.get_info() + if self.multimodal_runtime: + info["runtime_info"]["multimodal"] = self.multimodal_runtime.get_info() + if self.audio_runtime: + info["runtime_info"]["audio"] = self.audio_runtime.get_info() + + return info + + async def shutdown(self) -> None: + """Shutdown embedding manager and unload all models""" + logger.info("Shutting down embedding manager...") + + # Cancel idle cleanup task + if self._idle_task and not self._idle_task.done(): + self._idle_task.cancel() + try: + await self._idle_task + except asyncio.CancelledError: + pass + + # Unload all models + tasks = [] + if self.text_runtime: + tasks.append(self.text_runtime.unload()) + if self.multimodal_runtime: + tasks.append(self.multimodal_runtime.unload()) + if self.audio_runtime: + tasks.append(self.audio_runtime.unload()) + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + logger.info("Embedding manager shutdown complete") \ No newline at end of file diff --git a/src/fluid_server/managers/runtime_manager.py b/src/fluid_server/managers/runtime_manager.py index 40105ff..03a44ee 100644 --- a/src/fluid_server/managers/runtime_manager.py +++ b/src/fluid_server/managers/runtime_manager.py @@ -5,6 +5,7 @@ import asyncio import logging import time +from pathlib import Path from typing import Any from ..config import ServerConfig @@ -22,10 +23,10 @@ from ..runtimes.qnn_whisper import QNNWhisperRuntime QNN_AVAILABLE = True except ImportError: - QNNWhisperRuntime = None + QNNWhisperRuntime: type | None = None # Explicit annotation for shadowing QNN_AVAILABLE = False else: - QNNWhisperRuntime = None + QNNWhisperRuntime: type | None = None # Explicit annotation for shadowing QNN_AVAILABLE = False from ..utils.model_discovery import ModelDiscovery from ..utils.model_downloader import ModelDownloader @@ -50,9 +51,9 @@ def __init__(self, config: ServerConfig) -> None: self.whisper_runtime: BaseRuntime | None = None self.loaded_llm_model: str | None = None self.loaded_whisper_model: str | None = None - self.available_models = ModelDiscovery.find_models(config.model_path, config.llm_model) + self.available_models = ModelDiscovery.find_models(config.model_path_resolved, config.llm_model) self.downloader = ModelDownloader( - config.model_path, config.cache_dir or config.model_path / "cache" + config.model_path_resolved, config.cache_dir_resolved ) self._idle_task: asyncio.Task | None = None self.download_status: dict[str, str] = {} # Track download status for models @@ -221,7 +222,7 @@ async def load_llm(self, model_name: str | None = None) -> OpenVINOLLMRuntime | gc.collect() # Get model path - model_path = ModelDiscovery.get_model_path(self.config.model_path, "llm", model_to_load) + model_path = ModelDiscovery.get_model_path(self.config.model_path_resolved, "llm", model_to_load) if model_path is None: logger.info(f"LLM model '{model_to_load}' not found locally, attempting to download...") @@ -229,8 +230,8 @@ async def load_llm(self, model_name: str | None = None) -> OpenVINOLLMRuntime | model_available = await self.downloader.ensure_model_available("llm", model_to_load) if model_available: # Refresh model discovery after download - self.available_models = ModelDiscovery.find_models(self.config.model_path) - model_path = ModelDiscovery.get_model_path(self.config.model_path, "llm", model_to_load) + self.available_models = ModelDiscovery.find_models(self.config.model_path_resolved) + model_path = ModelDiscovery.get_model_path(self.config.model_path_resolved, "llm", model_to_load) logger.info(f"Successfully downloaded LLM model '{model_to_load}'") else: logger.error(f"Failed to download LLM model '{model_to_load}'") @@ -243,6 +244,10 @@ async def load_llm(self, model_name: str | None = None) -> OpenVINOLLMRuntime | # Determine runtime type based on model format runtime_type = ModelDiscovery.get_llm_runtime_type(model_path, model_to_load) logger.info(f"Loading LLM model '{model_to_load}' using {runtime_type.upper()} runtime") + + # Type assertion for type checker - model_path is guaranteed to be Path here + assert model_path is not None, "model_path should not be None at this point" + model_path_resolved: Path = model_path # Explicit type narrowing @retry_async( max_attempts=3, @@ -256,15 +261,15 @@ async def load_llm(self, model_name: str | None = None) -> OpenVINOLLMRuntime | async def _load_with_retry(): if runtime_type == "llamacpp": runtime = LlamaCppRuntime( - model_path=model_path, - cache_dir=self.config.cache_dir or self.config.model_path / "cache", + model_path=model_path_resolved, + cache_dir=self.config.cache_dir_resolved, device="GPU", # Will use Vulkan backend model_name=model_to_load, # Pass the actual model identifier ) else: # openvino runtime = OpenVINOLLMRuntime( - model_path=model_path, - cache_dir=self.config.cache_dir or self.config.model_path / "cache", + model_path=model_path_resolved, + cache_dir=self.config.cache_dir_resolved, device="GPU", max_memory_gb=self.config.max_memory_gb, ) @@ -318,7 +323,7 @@ async def load_whisper(self, model_name: str | None = None) -> BaseRuntime | Non gc.collect() # Get model path - model_path = ModelDiscovery.get_model_path(self.config.model_path, "whisper", model_to_load) + model_path = ModelDiscovery.get_model_path(self.config.model_path_resolved, "whisper", model_to_load) if model_path is None: logger.error(f"Whisper model '{model_to_load}' not found") @@ -343,20 +348,20 @@ async def _load_with_retry(): logger.warning("QNN runtime requested but not available, falling back to OpenVINO") runtime = OpenVINOWhisperRuntime( model_path=model_path, - cache_dir=self.config.cache_dir or self.config.model_path / "cache", + cache_dir=self.config.cache_dir_resolved, device="NPU", max_memory_gb=self.config.max_memory_gb, ) else: runtime = QNNWhisperRuntime( model_path=model_path, - cache_dir=self.config.cache_dir or self.config.model_path / "cache", + cache_dir=self.config.cache_dir_resolved, device="NPU", ) else: # openvino runtime = OpenVINOWhisperRuntime( model_path=model_path, - cache_dir=self.config.cache_dir or self.config.model_path / "cache", + cache_dir=self.config.cache_dir_resolved, device="NPU", max_memory_gb=self.config.max_memory_gb, ) @@ -532,7 +537,7 @@ async def unload_all(self) -> None: def discover_models(self) -> None: """Refresh the list of available models""" logger.info(f"Discovering models in {self.config.model_path}") - self.available_models = ModelDiscovery.find_models(self.config.model_path) + self.available_models = ModelDiscovery.find_models(self.config.model_path_resolved) logger.info(f"Available models: {self.available_models}") def get_status(self) -> dict[str, Any]: diff --git a/src/fluid_server/models/openai.py b/src/fluid_server/models/openai.py index 35f3b6b..d36d16a 100644 --- a/src/fluid_server/models/openai.py +++ b/src/fluid_server/models/openai.py @@ -126,3 +126,131 @@ class HealthStatus(BaseModel): memory_usage_gb: float | None = Field(None, description="Current memory usage") warmup_status: dict[str, Any] | None = Field(None, description="Model warm-up status") version: str = Field(..., description="Server version") + + +# ============== Embedding Models ============== +class EmbeddingRequest(BaseModel): + """OpenAI-compatible embedding request""" + + input: str | list[str] = Field(..., description="Text input(s) to embed") + model: str = Field(..., description="ID of the model to use") + encoding_format: str | None = Field("float", description="Format to return embeddings in") + dimensions: int | None = Field(None, description="Number of dimensions for embedding") + user: str | None = Field(None, description="Unique identifier for the end-user") + + +class EmbeddingData(BaseModel): + """Single embedding data point""" + + object: str = "embedding" + embedding: list[float] = Field(..., description="The embedding vector") + index: int = Field(..., description="Index of the embedding in the input list") + + +class EmbeddingUsage(BaseModel): + """Usage information for embedding request""" + + prompt_tokens: int = Field(..., description="Number of tokens in the input") + total_tokens: int = Field(..., description="Total number of tokens used") + + +class EmbeddingResponse(BaseModel): + """OpenAI-compatible embedding response""" + + object: str = "list" + data: list[EmbeddingData] = Field(..., description="List of embeddings") + model: str = Field(..., description="Model used for embeddings") + usage: EmbeddingUsage = Field(..., description="Usage statistics") + + +# ============== Multimodal Embedding Models ============== +class MultimodalEmbeddingRequest(BaseModel): + """Request for multimodal embeddings (text, image, audio)""" + + input: str | dict[str, Any] = Field(..., description="Input data (text, image bytes, or audio bytes)") + input_type: str = Field(..., description="Type of input: text, image, or audio") + model: str = Field(..., description="ID of the model to use") + encoding_format: str | None = Field("float", description="Format to return embeddings in") + dimensions: int | None = Field(None, description="Number of dimensions for embedding") + user: str | None = Field(None, description="Unique identifier for the end-user") + + +# ============== Vector Store Models ============== +class VectorStoreDocument(BaseModel): + """Document for vector storage""" + + id: str = Field(..., description="Unique document identifier") + content: str = Field(..., description="Document content") + metadata: dict[str, Any] | None = Field(None, description="Additional metadata") + content_type: str = Field("text", description="Type of content") + + +class VectorStoreInsertRequest(BaseModel): + """Request to insert documents into vector store""" + + collection: str = Field(..., description="Collection name") + documents: list[VectorStoreDocument] = Field(..., description="Documents to insert") + model: str | None = Field(None, description="Embedding model to use") + + +class VectorStoreInsertResponse(BaseModel): + """Response for vector store insertion""" + + inserted_count: int = Field(..., description="Number of documents inserted") + collection: str = Field(..., description="Collection name") + ids: list[str] = Field(..., description="IDs of inserted documents") + + +class VectorStoreSearchRequest(BaseModel): + """Request to search vector store""" + + collection: str = Field(..., description="Collection name") + query: str | bytes = Field(..., description="Query text, image, or audio data") + query_type: str = Field("text", description="Type of query: text, image, or audio") + limit: int = Field(10, ge=1, le=100, description="Maximum number of results") + filter: str | None = Field(None, description="Optional filter condition") + model: str | None = Field(None, description="Embedding model to use for query") + + +class VectorStoreSearchResult(BaseModel): + """Single search result""" + + id: str = Field(..., description="Document ID") + content: str = Field(..., description="Document content") + metadata: dict[str, Any] | None = Field(None, description="Document metadata") + similarity_score: float = Field(..., description="Similarity score (0-1)") + content_type: str = Field(..., description="Type of content") + + +class VectorStoreSearchResponse(BaseModel): + """Response for vector store search""" + + results: list[VectorStoreSearchResult] = Field(..., description="Search results") + collection: str = Field(..., description="Collection name") + query_type: str = Field(..., description="Type of query used") + total_results: int = Field(..., description="Total number of results found") + + +class CollectionInfo(BaseModel): + """Information about a vector store collection""" + + name: str = Field(..., description="Collection name") + num_documents: int = Field(..., description="Number of documents in collection") + embedding_dimension: int | None = Field(None, description="Embedding vector dimension") + content_types: list[str] = Field(default_factory=list, description="Types of content stored") + + +class CollectionListResponse(BaseModel): + """Response listing all collections""" + + collections: list[CollectionInfo] = Field(..., description="List of collections") + total_collections: int = Field(..., description="Total number of collections") + + +class CreateCollectionRequest(BaseModel): + """Request to create a new collection""" + + name: str = Field(..., description="Collection name") + dimension: int = Field(..., description="Embedding vector dimension") + content_type: str = Field("text", description="Primary content type for this collection") + overwrite: bool = Field(False, description="Whether to overwrite existing collection") diff --git a/src/fluid_server/runtimes/base_embedding.py b/src/fluid_server/runtimes/base_embedding.py new file mode 100644 index 0000000..9785a32 --- /dev/null +++ b/src/fluid_server/runtimes/base_embedding.py @@ -0,0 +1,95 @@ +""" +Base embedding runtime class for all embedding model backends +""" + +import time +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, List, Union +from enum import Enum + + +class EmbeddingType(Enum): + """Supported embedding types""" + TEXT = "text" + IMAGE = "image" + AUDIO = "audio" + + +class BaseEmbeddingRuntime(ABC): + """Base class for all embedding model runtimes""" + + def __init__(self, model_path: Path, cache_dir: Path, device: str) -> None: + """ + Initialize embedding runtime + + Args: + model_path: Path to the model directory + cache_dir: Path to cache compiled models + device: Device to run on (CPU, GPU, NPU) + """ + self.model_path = model_path + self.cache_dir = cache_dir + self.device = device + self.is_loaded = False + self.last_used = time.time() + + @abstractmethod + async def load(self) -> None: + """Load the embedding model into memory""" + pass + + @abstractmethod + async def unload(self) -> None: + """Unload the model to free memory""" + pass + + @abstractmethod + def get_info(self) -> dict[str, Any]: + """Get runtime and model information""" + pass + + @abstractmethod + async def embed( + self, + inputs: Union[str, List[str], bytes], + embedding_type: EmbeddingType + ) -> List[List[float]]: + """ + Generate embeddings for the given inputs + + Args: + inputs: Text string(s), image bytes, or audio bytes + embedding_type: Type of embedding to generate + + Returns: + List of embedding vectors + """ + pass + + @abstractmethod + def get_embedding_dimension(self) -> int: + """Get the dimension of embeddings produced by this model""" + pass + + @abstractmethod + def get_supported_types(self) -> List[EmbeddingType]: + """Get list of supported embedding types for this runtime""" + pass + + def update_last_used(self) -> None: + """Update the last used timestamp""" + self.last_used = time.time() + + def get_idle_time(self) -> float: + """Get how long the runtime has been idle in seconds""" + return time.time() - self.last_used + + @property + def model_name(self) -> str: + """Get the model name from path""" + return self.model_path.name + + def supports_type(self, embedding_type: EmbeddingType) -> bool: + """Check if this runtime supports a specific embedding type""" + return embedding_type in self.get_supported_types() \ No newline at end of file diff --git a/src/fluid_server/runtimes/openvino_embedding.py b/src/fluid_server/runtimes/openvino_embedding.py new file mode 100644 index 0000000..d5efb61 --- /dev/null +++ b/src/fluid_server/runtimes/openvino_embedding.py @@ -0,0 +1,279 @@ +""" +OpenVINO runtime for text and image embedding models +""" + +import asyncio +import logging +import time +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from threading import Lock +from typing import Any, List, Union, Optional +from PIL import Image +import io +import numpy as np + +from .base_embedding import BaseEmbeddingRuntime, EmbeddingType + +logger = logging.getLogger(__name__) + + +class OpenVINOEmbeddingRuntime(BaseEmbeddingRuntime): + """OpenVINO runtime for text and image embedding models""" + + # Class-level dedicated thread pool for embedding operations + _embedding_executor: ThreadPoolExecutor | None = None + + @classmethod + def get_executor(cls) -> ThreadPoolExecutor: + """Get or create dedicated embedding thread pool""" + if cls._embedding_executor is None: + cls._embedding_executor = ThreadPoolExecutor( + max_workers=2, + thread_name_prefix="Embedding" + ) + return cls._embedding_executor + + def __init__( + self, + model_path: Path, + cache_dir: Path, + device: str, + model_type: str = "text" # "text", "clip", "multimodal" + ) -> None: + super().__init__(model_path, cache_dir, device) + self.model_type = model_type + self.model: Any | None = None + self.tokenizer: Any | None = None + self.processor: Any | None = None + self.model_lock = Lock() + self.last_used = time.time() + self._embedding_dim: Optional[int] = None + + async def load(self) -> None: + """Load the embedding model""" + if self.is_loaded: + logger.debug(f"Embedding model {self.model_name} already loaded") + return + + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor(self.get_executor(), self._load_model) + self.is_loaded = True + logger.info(f"Embedding model {self.model_name} loaded successfully") + except Exception as e: + logger.error(f"Failed to load embedding model {self.model_name}: {e}") + self.is_loaded = False + raise + + def _load_model(self) -> None: + """Load model in thread executor""" + try: + # Lazy imports + import openvino as ov + from sentence_transformers import SentenceTransformer + from transformers import AutoTokenizer, AutoModel + import torch + + self.ov = ov + + # Use model-specific cache subdirectory + model_cache = self.cache_dir / "embeddings" / self.model_name + model_cache.mkdir(parents=True, exist_ok=True) + + logger.info(f"Loading embedding model '{self.model_name}' from {self.model_path}") + logger.info(f"Using cache at {model_cache}") + logger.info(f"Device: {self.device}") + logger.info(f"Model type: {self.model_type}") + + start_time = time.time() + + # Load based on model type + if self.model_type == "text": + self._load_text_model() + elif self.model_type in ["clip", "multimodal"]: + self._load_multimodal_model() + else: + raise ValueError(f"Unsupported model type: {self.model_type}") + + load_time = time.time() - start_time + logger.info(f"Embedding model loaded in {load_time:.2f}s") + + except Exception as e: + logger.error(f"Error loading embedding model: {e}") + raise + + def _load_text_model(self) -> None: + """Load text-only embedding model""" + try: + from sentence_transformers import SentenceTransformer + + # Try to load as SentenceTransformer first + self.model = SentenceTransformer(str(self.model_path), device='cpu') + + # Get embedding dimension + test_embedding = self.model.encode(["test"]) + self._embedding_dim = len(test_embedding[0]) + + except Exception as e: + logger.warning(f"Failed to load as SentenceTransformer: {e}") + # Fallback to transformers + from transformers import AutoTokenizer, AutoModel + import torch + + self.tokenizer = AutoTokenizer.from_pretrained(str(self.model_path)) + self.model = AutoModel.from_pretrained(str(self.model_path)) + + # Get embedding dimension from model config + self._embedding_dim = self.model.config.hidden_size + + def _load_multimodal_model(self) -> None: + """Load multimodal (CLIP) embedding model""" + try: + from transformers import CLIPProcessor, CLIPModel + + self.processor = CLIPProcessor.from_pretrained(str(self.model_path)) + self.model = CLIPModel.from_pretrained(str(self.model_path)) + + # Get embedding dimension + self._embedding_dim = self.model.config.text_config.hidden_size + + except Exception as e: + logger.error(f"Failed to load CLIP model: {e}") + raise + + async def unload(self) -> None: + """Unload the model to free memory""" + if not self.is_loaded: + logger.debug(f"Embedding model {self.model_name} not loaded") + return + + try: + with self.model_lock: + self.model = None + self.tokenizer = None + self.processor = None + self._embedding_dim = None + self.is_loaded = False + + logger.info(f"Embedding model {self.model_name} unloaded") + except Exception as e: + logger.error(f"Error unloading embedding model: {e}") + raise + + async def embed( + self, + inputs: Union[str, List[str], bytes], + embedding_type: EmbeddingType + ) -> List[List[float]]: + """Generate embeddings for inputs""" + if not self.is_loaded: + raise RuntimeError(f"Model {self.model_name} not loaded") + + if not self.supports_type(embedding_type): + raise ValueError(f"Model {self.model_name} does not support {embedding_type.value} embeddings") + + self.update_last_used() + + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._embed_sync, + inputs, + embedding_type + ) + except Exception as e: + logger.error(f"Error generating embeddings: {e}") + raise + + def _embed_sync( + self, + inputs: Union[str, List[str], bytes], + embedding_type: EmbeddingType + ) -> List[List[float]]: + """Synchronous embedding generation""" + with self.model_lock: + if embedding_type == EmbeddingType.TEXT: + return self._embed_text(inputs) + elif embedding_type == EmbeddingType.IMAGE: + return self._embed_image(inputs) + else: + raise ValueError(f"Unsupported embedding type: {embedding_type}") + + def _embed_text(self, inputs: Union[str, List[str]]) -> List[List[float]]: + """Generate text embeddings""" + if isinstance(inputs, str): + inputs = [inputs] + + if hasattr(self.model, 'encode'): + # SentenceTransformer model + embeddings = self.model.encode(inputs, convert_to_numpy=True) + return embeddings.tolist() + else: + # Transformers model + import torch + + tokens = self.tokenizer(inputs, padding=True, truncation=True, return_tensors="pt") + + with torch.no_grad(): + outputs = self.model(**tokens) + # Mean pooling + embeddings = outputs.last_hidden_state.mean(dim=1) + + return embeddings.cpu().numpy().tolist() + + def _embed_image(self, inputs: bytes) -> List[List[float]]: + """Generate image embeddings using CLIP""" + if self.model_type not in ["clip", "multimodal"]: + raise ValueError("Image embeddings require CLIP/multimodal model") + + try: + # Convert bytes to PIL Image + image = Image.open(io.BytesIO(inputs)) + if image.mode != 'RGB': + image = image.convert('RGB') + + # Process image + inputs_processed = self.processor(images=image, return_tensors="pt") + + import torch + with torch.no_grad(): + embeddings = self.model.get_image_features(**inputs_processed) + # Normalize embeddings + embeddings = embeddings / embeddings.norm(p=2, dim=-1, keepdim=True) + + return embeddings.cpu().numpy().tolist() + + except Exception as e: + logger.error(f"Error processing image: {e}") + raise + + def get_embedding_dimension(self) -> int: + """Get the dimension of embeddings""" + if self._embedding_dim is None: + raise RuntimeError("Model not loaded or dimension not determined") + return self._embedding_dim + + def get_supported_types(self) -> List[EmbeddingType]: + """Get supported embedding types""" + if self.model_type == "text": + return [EmbeddingType.TEXT] + elif self.model_type in ["clip", "multimodal"]: + return [EmbeddingType.TEXT, EmbeddingType.IMAGE] + else: + return [] + + def get_info(self) -> dict[str, Any]: + """Get runtime information""" + return { + "runtime_type": "openvino_embedding", + "model_name": self.model_name, + "model_path": str(self.model_path), + "model_type": self.model_type, + "device": self.device, + "is_loaded": self.is_loaded, + "supported_types": [t.value for t in self.get_supported_types()], + "embedding_dimension": self._embedding_dim, + "idle_time": self.get_idle_time() + } \ No newline at end of file diff --git a/src/fluid_server/runtimes/whisper_embedding.py b/src/fluid_server/runtimes/whisper_embedding.py new file mode 100644 index 0000000..a51061b --- /dev/null +++ b/src/fluid_server/runtimes/whisper_embedding.py @@ -0,0 +1,264 @@ +""" +Whisper encoder runtime for audio embeddings +""" + +import asyncio +import logging +import time +import numpy as np +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any, List, Union +from threading import Lock + +from .base_embedding import BaseEmbeddingRuntime, EmbeddingType + +logger = logging.getLogger(__name__) + + +class WhisperEmbeddingRuntime(BaseEmbeddingRuntime): + """Whisper encoder runtime for audio embeddings""" + + # Class-level dedicated thread pool for audio embedding operations + _audio_embedding_executor: ThreadPoolExecutor | None = None + + @classmethod + def get_executor(cls) -> ThreadPoolExecutor: + """Get or create dedicated audio embedding thread pool""" + if cls._audio_embedding_executor is None: + cls._audio_embedding_executor = ThreadPoolExecutor( + max_workers=2, + thread_name_prefix="AudioEmbedding" + ) + return cls._audio_embedding_executor + + def __init__( + self, + model_path: Path, + cache_dir: Path, + device: str, + max_memory_gb: float = 4.0 + ) -> None: + super().__init__(model_path, cache_dir, device) + self.model: Any | None = None + self.processor: Any | None = None + self.model_lock = Lock() + self.last_used = time.time() + self.max_memory_gb = max_memory_gb + self._embedding_dim = 1024 # Whisper encoder dimension + self._load_lock = asyncio.Lock() + + async def load(self) -> None: + """Load the Whisper encoder model""" + async with self._load_lock: + if self.is_loaded: + logger.debug(f"Audio embedding model {self.model_name} already loaded") + return + + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor(self.get_executor(), self._load_model) + self.is_loaded = True + logger.info(f"Audio embedding model {self.model_name} loaded successfully") + except Exception as e: + logger.error(f"Failed to load audio embedding model {self.model_name}: {e}") + self.is_loaded = False + raise + + def _load_model(self) -> None: + """Load model in thread executor""" + try: + # Lazy imports + import whisper + from transformers import WhisperProcessor, WhisperModel + + # Use model-specific cache subdirectory + model_cache = self.cache_dir / "audio_embeddings" / self.model_name + model_cache.mkdir(parents=True, exist_ok=True) + + logger.info(f"Loading audio embedding model '{self.model_name}' from {self.model_path}") + logger.info(f"Using cache at {model_cache}") + logger.info(f"Device: {self.device}") + + start_time = time.time() + + try: + # Try to load using transformers first (for fine-tuned models) + self.processor = WhisperProcessor.from_pretrained(str(self.model_path)) + self.model = WhisperModel.from_pretrained(str(self.model_path)) + + # Get embedding dimension from model config + self._embedding_dim = self.model.config.d_model + + logger.info("Loaded Whisper model using transformers") + + except Exception as e: + logger.warning(f"Failed to load with transformers: {e}") + + # Fallback to whisper library + try: + # Map model path to whisper model name if it's a standard model + model_name = self._get_whisper_model_name() + self.model = whisper.load_model(model_name, device="cpu") + + logger.info(f"Loaded Whisper model '{model_name}' using whisper library") + + except Exception as e2: + logger.error(f"Failed to load with whisper library: {e2}") + raise + + load_time = time.time() - start_time + logger.info(f"Audio embedding model loaded in {load_time:.2f}s") + + except Exception as e: + logger.error(f"Error loading audio embedding model: {e}") + raise + + def _get_whisper_model_name(self) -> str: + """Map model path to whisper model name""" + model_name_lower = self.model_name.lower() + + if "tiny" in model_name_lower: + return "tiny" + elif "base" in model_name_lower: + return "base" + elif "small" in model_name_lower: + return "small" + elif "medium" in model_name_lower: + return "medium" + elif "large" in model_name_lower: + if "v3" in model_name_lower: + return "large-v3" + elif "v2" in model_name_lower: + return "large-v2" + else: + return "large" + else: + # Default to base model + return "base" + + async def unload(self) -> None: + """Unload the model to free memory""" + if not self.is_loaded: + logger.debug(f"Audio embedding model {self.model_name} not loaded") + return + + try: + with self.model_lock: + self.model = None + self.processor = None + self.is_loaded = False + + logger.info(f"Audio embedding model {self.model_name} unloaded") + except Exception as e: + logger.error(f"Error unloading audio embedding model: {e}") + raise + + async def embed( + self, + inputs: Union[str, List[str], bytes], + embedding_type: EmbeddingType + ) -> List[List[float]]: + """Generate embeddings for audio inputs""" + if not self.is_loaded: + raise RuntimeError(f"Model {self.model_name} not loaded") + + if embedding_type != EmbeddingType.AUDIO: + raise ValueError(f"Whisper embedding model only supports audio inputs") + + self.update_last_used() + + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._embed_sync, + inputs + ) + except Exception as e: + logger.error(f"Error generating audio embeddings: {e}") + raise + + def _embed_sync(self, audio_bytes: bytes) -> List[List[float]]: + """Synchronous audio embedding generation""" + with self.model_lock: + return self._embed_audio(audio_bytes) + + def _embed_audio(self, audio_bytes: bytes) -> List[List[float]]: + """Generate audio embeddings using Whisper encoder""" + try: + import librosa + import torch + import io + + # Load audio from bytes + audio_data, sr = librosa.load(io.BytesIO(audio_bytes), sr=16000) + + if hasattr(self, 'processor') and self.processor is not None: + # Using transformers model + inputs = self.processor( + audio_data, + sampling_rate=16000, + return_tensors="pt" + ) + + with torch.no_grad(): + # Get encoder outputs (audio embeddings) + encoder_outputs = self.model.encoder( + inputs["input_features"], + output_hidden_states=True + ) + + # Use the mean of the last hidden state as the audio embedding + embeddings = encoder_outputs.last_hidden_state.mean(dim=1) + + return embeddings.cpu().numpy().tolist() + + else: + # Using whisper library + # Pad or truncate audio to 30 seconds (Whisper's expected length) + target_length = 16000 * 30 # 30 seconds at 16kHz + if len(audio_data) < target_length: + audio_data = np.pad(audio_data, (0, target_length - len(audio_data))) + else: + audio_data = audio_data[:target_length] + + # Import whisper for log mel spectrogram + import whisper + + # Convert to log-mel spectrogram + mel = whisper.log_mel_spectrogram(audio_data) + + # Get encoder features + with torch.no_grad(): + encoder_output = self.model.encoder(mel.unsqueeze(0)) + # Mean pool across time dimension to get fixed-size embedding + embeddings = encoder_output.mean(dim=1) + + return embeddings.cpu().numpy().tolist() + + except Exception as e: + logger.error(f"Error processing audio: {e}") + raise + + def get_embedding_dimension(self) -> int: + """Get the dimension of embeddings""" + return self._embedding_dim + + def get_supported_types(self) -> List[EmbeddingType]: + """Get supported embedding types""" + return [EmbeddingType.AUDIO] + + def get_info(self) -> dict[str, Any]: + """Get runtime information""" + return { + "runtime_type": "whisper_embedding", + "model_name": self.model_name, + "model_path": str(self.model_path), + "device": self.device, + "is_loaded": self.is_loaded, + "supported_types": [t.value for t in self.get_supported_types()], + "embedding_dimension": self._embedding_dim, + "idle_time": self.get_idle_time(), + "max_memory_gb": self.max_memory_gb + } \ No newline at end of file diff --git a/src/fluid_server/storage/__init__.py b/src/fluid_server/storage/__init__.py new file mode 100644 index 0000000..15d8aac --- /dev/null +++ b/src/fluid_server/storage/__init__.py @@ -0,0 +1,7 @@ +""" +Storage module for vector databases and data persistence +""" + +from .lancedb_client import LanceDBClient, VectorDocument + +__all__ = ["LanceDBClient", "VectorDocument"] \ No newline at end of file diff --git a/src/fluid_server/storage/lancedb_client.py b/src/fluid_server/storage/lancedb_client.py new file mode 100644 index 0000000..a1314de --- /dev/null +++ b/src/fluid_server/storage/lancedb_client.py @@ -0,0 +1,455 @@ +""" +LanceDB client for vector storage and retrieval +""" + +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional, Union +import asyncio +from concurrent.futures import ThreadPoolExecutor + +try: + import lancedb + from lancedb.pydantic import LanceModel, Vector + from lancedb.embeddings import get_registry + LANCEDB_AVAILABLE = True +except ImportError: + LANCEDB_AVAILABLE = False + lancedb = None # type: ignore + LanceModel = None # type: ignore + Vector = None # type: ignore + get_registry = None # type: ignore + +logger = logging.getLogger(__name__) + + +class VectorDocument: + """Base document class for vector storage""" + + def __init__( + self, + id: str, + content: str, + vector: List[float], + metadata: Optional[Dict[str, Any]] = None, + content_type: str = "text" + ): + self.id = id + self.content = content + self.vector = vector + self.metadata = metadata or {} + self.content_type = content_type + + def to_dict(self) -> Dict[str, Any]: + """Convert document to dictionary for storage""" + return { + "id": self.id, + "content": self.content, + "vector": self.vector, + "metadata": self.metadata, + "content_type": self.content_type + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "VectorDocument": + """Create document from dictionary""" + return cls( + id=data["id"], + content=data["content"], + vector=data["vector"], + metadata=data.get("metadata", {}), + content_type=data.get("content_type", "text") + ) + + +class LanceDBClient: + """Client for LanceDB vector database operations""" + + # Class-level thread pool for database operations + _db_executor: ThreadPoolExecutor | None = None + + @classmethod + def get_executor(cls) -> ThreadPoolExecutor: + """Get or create dedicated database thread pool""" + if cls._db_executor is None: + cls._db_executor = ThreadPoolExecutor( + max_workers=2, + thread_name_prefix="LanceDB" + ) + return cls._db_executor + + def __init__(self, db_path: Path, db_name: str = "embeddings"): + """ + Initialize LanceDB client + + Args: + db_path: Path to database directory + db_name: Name of the database + """ + if not LANCEDB_AVAILABLE: + raise ImportError("LanceDB is not available. Install with: pip install lancedb") + + self.db_path = db_path + self.db_name = db_name + self.db: Any = None + self._tables: Dict[str, Any] = {} + + # Ensure database directory exists + self.db_path.mkdir(parents=True, exist_ok=True) + + async def initialize(self) -> None: + """Initialize database connection""" + try: + loop = asyncio.get_event_loop() + self.db = await loop.run_in_executor( + self.get_executor(), + self._initialize_sync + ) + logger.info(f"LanceDB initialized at {self.db_path}") + except Exception as e: + logger.error(f"Failed to initialize LanceDB: {e}") + raise + + def _initialize_sync(self) -> Any: + """Synchronous database initialization""" + db_uri = str(self.db_path / self.db_name) + return lancedb.connect(db_uri) + + async def create_collection( + self, + collection_name: str, + dimension: int, + content_type: str = "text", + overwrite: bool = False + ) -> None: + """ + Create a new collection/table + + Args: + collection_name: Name of the collection + dimension: Vector dimension + content_type: Type of content stored + overwrite: Whether to overwrite existing collection + """ + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor( + self.get_executor(), + self._create_collection_sync, + collection_name, + dimension, + content_type, + overwrite + ) + logger.info(f"Created collection '{collection_name}' with dimension {dimension}") + except Exception as e: + logger.error(f"Failed to create collection '{collection_name}': {e}") + raise + + def _create_collection_sync( + self, + collection_name: str, + dimension: int, + content_type: str, + overwrite: bool + ) -> None: + """Synchronous collection creation""" + # For now, use simple dict-based schema instead of dynamic Pydantic model + # This avoids complex type checking issues with dynamic class creation + + # Check if collection exists + existing_tables = self.db.table_names() + + if collection_name in existing_tables: + if overwrite: + # Drop existing table + self.db.drop_table(collection_name) + logger.info(f"Dropped existing collection '{collection_name}'") + else: + # Load existing table + self._tables[collection_name] = self.db.open_table(collection_name) + return + + # Create new table with simple schema + # Start with empty data that matches the schema + initial_data = [{ + "id": "placeholder", + "content": "placeholder", + "vector": [0.0] * dimension, + "metadata": {}, + "content_type": content_type + }] + + table = self.db.create_table( + collection_name, + data=initial_data, + mode="create" + ) + + # Remove placeholder record + table.delete("id = 'placeholder'") + + self._tables[collection_name] = table + + async def list_collections(self) -> List[str]: + """List all collections in the database""" + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._list_collections_sync + ) + except Exception as e: + logger.error(f"Failed to list collections: {e}") + raise + + def _list_collections_sync(self) -> List[str]: + """Synchronous collection listing""" + return self.db.table_names() + + async def insert_documents( + self, + collection_name: str, + documents: List[VectorDocument] + ) -> None: + """ + Insert documents into a collection + + Args: + collection_name: Name of the collection + documents: List of documents to insert + """ + try: + loop = asyncio.get_event_loop() + await loop.run_in_executor( + self.get_executor(), + self._insert_documents_sync, + collection_name, + documents + ) + logger.info(f"Inserted {len(documents)} documents into '{collection_name}'") + except Exception as e: + logger.error(f"Failed to insert documents into '{collection_name}': {e}") + raise + + def _insert_documents_sync( + self, + collection_name: str, + documents: List[VectorDocument] + ) -> None: + """Synchronous document insertion""" + if collection_name not in self._tables: + table = self.db.open_table(collection_name) + self._tables[collection_name] = table + else: + table = self._tables[collection_name] + + # Convert documents to dictionaries + data = [doc.to_dict() for doc in documents] + + # Insert data + table.add(data) + + async def search_vectors( + self, + collection_name: str, + query_vector: List[float], + limit: int = 10, + filter_condition: Optional[str] = None + ) -> List[Dict[str, Any]]: + """ + Search for similar vectors + + Args: + collection_name: Name of the collection + query_vector: Query vector + limit: Number of results to return + filter_condition: Optional filter condition + + Returns: + List of similar documents with scores + """ + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._search_vectors_sync, + collection_name, + query_vector, + limit, + filter_condition + ) + except Exception as e: + logger.error(f"Failed to search vectors in '{collection_name}': {e}") + raise + + def _search_vectors_sync( + self, + collection_name: str, + query_vector: List[float], + limit: int, + filter_condition: Optional[str] + ) -> List[Dict[str, Any]]: + """Synchronous vector search""" + if collection_name not in self._tables: + table = self.db.open_table(collection_name) + self._tables[collection_name] = table + else: + table = self._tables[collection_name] + + # Build search query + search = table.search(query_vector).limit(limit) + + if filter_condition: + search = search.where(filter_condition) + + # Execute search and convert to list + results = search.to_list() + + # Add similarity scores (LanceDB returns _distance, we convert to similarity) + for result in results: + if '_distance' in result: + # Convert distance to similarity score (0-1, higher is more similar) + result['similarity_score'] = 1.0 / (1.0 + result['_distance']) + + return results + + async def get_document( + self, + collection_name: str, + document_id: str + ) -> Optional[VectorDocument]: + """ + Get a specific document by ID + + Args: + collection_name: Name of the collection + document_id: ID of the document + + Returns: + Document if found, None otherwise + """ + try: + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + self.get_executor(), + self._get_document_sync, + collection_name, + document_id + ) + + if result: + return VectorDocument.from_dict(result) + return None + + except Exception as e: + logger.error(f"Failed to get document '{document_id}' from '{collection_name}': {e}") + raise + + def _get_document_sync( + self, + collection_name: str, + document_id: str + ) -> Optional[Dict[str, Any]]: + """Synchronous document retrieval""" + if collection_name not in self._tables: + table = self.db.open_table(collection_name) + self._tables[collection_name] = table + else: + table = self._tables[collection_name] + + # Search for document by ID + results = table.search().where(f"id = '{document_id}'").to_list() + + return results[0] if results else None + + async def delete_document( + self, + collection_name: str, + document_id: str + ) -> bool: + """ + Delete a document by ID + + Args: + collection_name: Name of the collection + document_id: ID of the document to delete + + Returns: + True if deleted, False if not found + """ + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._delete_document_sync, + collection_name, + document_id + ) + except Exception as e: + logger.error(f"Failed to delete document '{document_id}' from '{collection_name}': {e}") + raise + + def _delete_document_sync( + self, + collection_name: str, + document_id: str + ) -> bool: + """Synchronous document deletion""" + if collection_name not in self._tables: + table = self.db.open_table(collection_name) + self._tables[collection_name] = table + else: + table = self._tables[collection_name] + + # Delete document + table.delete(f"id = '{document_id}'") + return True # LanceDB doesn't return deletion count + + async def get_collection_stats(self, collection_name: str) -> Dict[str, Any]: + """ + Get statistics for a collection + + Args: + collection_name: Name of the collection + + Returns: + Collection statistics + """ + try: + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + self.get_executor(), + self._get_collection_stats_sync, + collection_name + ) + except Exception as e: + logger.error(f"Failed to get stats for collection '{collection_name}': {e}") + raise + + def _get_collection_stats_sync(self, collection_name: str) -> Dict[str, Any]: + """Synchronous collection stats retrieval""" + if collection_name not in self._tables: + table = self.db.open_table(collection_name) + self._tables[collection_name] = table + else: + table = self._tables[collection_name] + + # Get table info + num_rows = table.count_rows() + schema = table.schema + + return { + "collection_name": collection_name, + "num_documents": num_rows, + "schema": str(schema) + } + + async def close(self) -> None: + """Close database connection and cleanup resources""" + try: + self._tables.clear() + self.db = None + logger.info("LanceDB connection closed") + except Exception as e: + logger.error(f"Error closing LanceDB: {e}") \ No newline at end of file