diff --git a/components/src/dynamo/vllm/handlers.py b/components/src/dynamo/vllm/handlers.py index 19f34b1ebb..6d02e0da53 100644 --- a/components/src/dynamo/vllm/handlers.py +++ b/components/src/dynamo/vllm/handlers.py @@ -6,7 +6,7 @@ import os from abc import ABC, abstractmethod from contextlib import asynccontextmanager -from typing import Any, AsyncGenerator, Dict +from typing import Any, AsyncGenerator, Dict, Final from vllm.inputs import TokensPrompt from vllm.sampling_params import SamplingParams @@ -16,6 +16,13 @@ from dynamo.runtime.logging import configure_dynamo_logging from .engine_monitor import VllmEngineMonitor +from .multimodal_utils.image_loader import ImageLoader + +# Multimodal data dictionary keys +IMAGE_URL_KEY: Final = "image_url" +VIDEO_URL_KEY: Final = "video_url" +URL_VARIANT_KEY: Final = "Url" +DECODED_VARIANT_KEY: Final = "Decoded" configure_dynamo_logging() logger = logging.getLogger(__name__) @@ -65,6 +72,7 @@ def __init__(self, runtime, component, engine, default_sampling_params): self.default_sampling_params = default_sampling_params self.kv_publishers: list[ZmqKvEventPublisher] | None = None self.engine_monitor = VllmEngineMonitor(runtime, engine) + self.image_loader = ImageLoader() @abstractmethod async def generate(self, request, context) -> AsyncGenerator[dict, None]: @@ -111,6 +119,50 @@ def cleanup(self): """Override in subclasses if cleanup is needed.""" pass + async def _extract_multimodal_data( + self, request: Dict[str, Any] + ) -> Dict[str, Any] | None: + """ + Extract and decode multimodal data from PreprocessedRequest. + """ + if "multi_modal_data" not in request or request["multi_modal_data"] is None: + return None + + mm_map = request["multi_modal_data"] + vllm_mm_data = {} + + # Process image_url entries + images = [] + for item in mm_map.get(IMAGE_URL_KEY, []): + if isinstance(item, dict) and URL_VARIANT_KEY in item: + url = item[URL_VARIANT_KEY] + try: + # ImageLoader supports both data: and http(s): URLs with caching + image = await self.image_loader.load_image(url) + images.append(image) + logger.debug(f"Loaded image from URL: {url[:80]}...") + except Exception: + logger.exception(f"Failed to load image from {url[:80]}...") + raise + elif isinstance(item, dict) and DECODED_VARIANT_KEY in item: + # Decoded support from PRs #3971/#3988 (frontend decoding + NIXL transfer) + # Will contain NIXL metadata for direct memory access + # TODO: Implement NIXL read when PRs merge + logger.warning( + "Decoded multimodal data not yet supported in standard worker" + ) + + if images: + # vLLM expects single image or list + vllm_mm_data["image"] = images[0] if len(images) == 1 else images + logger.debug(f"Extracted {len(images)} image(s) for multimodal processing") + + # Handle video_url entries (future expansion) + if VIDEO_URL_KEY in mm_map: + logger.warning("Video multimodal data not yet supported in standard worker") + + return vllm_mm_data if vllm_mm_data else None + async def generate_tokens( self, prompt, sampling_params, request_id, data_parallel_rank=None ): @@ -168,7 +220,12 @@ async def generate(self, request, context): request_id = context.id() logger.debug(f"Decode Request ID: {request_id}") - prompt = TokensPrompt(prompt_token_ids=request["token_ids"]) + # Extract and decode multimodal data if present + multi_modal_data = await self._extract_multimodal_data(request) + + prompt = TokensPrompt( + prompt_token_ids=request["token_ids"], multi_modal_data=multi_modal_data + ) # Build sampling params from request sampling_params = build_sampling_params(request, self.default_sampling_params) @@ -210,8 +267,13 @@ async def generate(self, request, context): request_id = context.id() logger.debug(f"Prefill Request ID: {request_id}") + # Extract and decode multimodal data if present + multi_modal_data = await self._extract_multimodal_data(request) + token_ids = request["token_ids"] - prompt = TokensPrompt(prompt_token_ids=token_ids) + prompt = TokensPrompt( + prompt_token_ids=token_ids, multi_modal_data=multi_modal_data + ) # Build sampling params from request using shared utility sampling_params = build_sampling_params(request, self.default_sampling_params) diff --git a/examples/backends/vllm/launch/agg_multimodal.sh b/examples/backends/vllm/launch/agg_multimodal.sh index 5eccaf11e4..c1a667b686 100755 --- a/examples/backends/vllm/launch/agg_multimodal.sh +++ b/examples/backends/vllm/launch/agg_multimodal.sh @@ -1,13 +1,21 @@ #!/bin/bash # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +# +# Aggregated multimodal serving with standard Dynamo preprocessing +# +# Architecture: Single-worker PD (Prefill-Decode) +# - Frontend: Rust OpenAIPreprocessor handles image URLs (HTTP and data:// base64) +# - Worker: Standard vLLM worker with vision model support +# +# For EPD (Encode-Prefill-Decode) architecture with dedicated encoding worker, +# see agg_multimodal_epd.sh + set -e trap 'echo Cleaning up...; kill 0' EXIT # Default values -MODEL_NAME="llava-hf/llava-1.5-7b-hf" -PROMPT_TEMPLATE="USER: \n ASSISTANT:" -PROVIDED_PROMPT_TEMPLATE="" +MODEL_NAME="Qwen/Qwen2.5-VL-7B-Instruct" # Parse command line arguments while [[ $# -gt 0 ]]; do @@ -16,15 +24,10 @@ while [[ $# -gt 0 ]]; do MODEL_NAME=$2 shift 2 ;; - --prompt-template) - PROVIDED_PROMPT_TEMPLATE=$2 - shift 2 - ;; -h|--help) echo "Usage: $0 [OPTIONS]" echo "Options:" - echo " --model Specify the model to use (default: $MODEL_NAME)" - echo " --prompt-template