Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion components/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ class Config:
enable_multimodal: bool = False
multimodal_encode_prefill_worker: bool = False
mm_prompt_template: str = "USER: <image>\n<prompt> ASSISTANT:"

# vLLM-native encoder worker (ECConnector mode)
vllm_native_encoder_worker: bool = False
ec_connector_backend: Optional[str] = "ECExampleConnector"
ec_storage_path: Optional[str] = None
ec_extra_config: Optional[str] = None
ec_consumer_mode: bool = False # For PD workers

# dump config to file
dump_config_to: Optional[str] = None

Expand Down Expand Up @@ -191,6 +199,34 @@ def parse_args() -> Config:
"'USER: <image> please describe the image ASSISTANT:'."
),
)
parser.add_argument(
"--vllm-native-encoder-worker",
action="store_true",
help="Run as vLLM-native encoder worker using ECConnector for encoder disaggregation (requires shared storage)",
)
parser.add_argument(
"--ec-connector-backend",
type=str,
default="ECExampleConnector",
help="ECConnector implementation class for encoder disaggregation. Default: ECExampleConnector (disk-based)",
)
parser.add_argument(
"--ec-storage-path",
type=str,
default=None,
help="Storage path for ECConnector (required for ECExampleConnector, optional for other backends)",
)
parser.add_argument(
"--ec-extra-config",
type=str,
default=None,
help="Additional ECConnector configuration as JSON string",
)
parser.add_argument(
"--ec-consumer-mode",
action="store_true",
help="Configure as ECConnector consumer for receiving encoder embeddings (for PD workers)",
)
parser.add_argument(
"--store-kv",
type=str,
Expand Down Expand Up @@ -275,19 +311,35 @@ def parse_args() -> Config:
+ int(bool(args.multimodal_worker))
+ int(bool(args.multimodal_decode_worker))
+ int(bool(args.multimodal_encode_prefill_worker))
+ int(bool(args.vllm_native_encoder_worker))
)
if mm_flags > 1:
raise ValueError(
"Use only one of --multimodal-processor, --multimodal-encode-worker, --multimodal-worker, --multimodal-decode-worker, or --multimodal-encode-prefill-worker"
"Use only one of --multimodal-processor, --multimodal-encode-worker, --multimodal-worker, "
"--multimodal-decode-worker, --multimodal-encode-prefill-worker, or --vllm-native-encoder-worker"
)

if mm_flags == 1 and not args.enable_multimodal:
raise ValueError("Use --enable-multimodal to enable multimodal processing")

# Validate vLLM-native encoder worker config
if args.vllm_native_encoder_worker:
if (
args.ec_connector_backend == "ECExampleConnector"
and not args.ec_storage_path
):
raise ValueError(
"--ec-storage-path is required when using ECExampleConnector backend. "
"Specify a shared storage path for encoder cache."
)

# Set component and endpoint based on worker type
if args.multimodal_processor:
config.component = "processor"
config.endpoint = "generate"
elif args.vllm_native_encoder_worker:
config.component = "encoder"
config.endpoint = "generate"
elif args.multimodal_encode_worker:
config.component = "encoder"
config.endpoint = "generate"
Expand Down Expand Up @@ -325,6 +377,11 @@ def parse_args() -> Config:
config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker
config.enable_multimodal = args.enable_multimodal
config.mm_prompt_template = args.mm_prompt_template
config.vllm_native_encoder_worker = args.vllm_native_encoder_worker
config.ec_connector_backend = args.ec_connector_backend
config.ec_storage_path = args.ec_storage_path
config.ec_extra_config = args.ec_extra_config
config.ec_consumer_mode = args.ec_consumer_mode
config.store_kv = args.store_kv
config.request_plane = args.request_plane
config.enable_local_indexer = args.enable_local_indexer
Expand Down
115 changes: 115 additions & 0 deletions components/src/dynamo/vllm/ec_transfer_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Utilities for configuring vLLM's ECTransferConfig for encoder disaggregation.

ECTransferConfig enables encoder/consumer separation where:
- Producer (encoder worker): Executes multimodal encoder and saves to storage
- Consumer (PD worker): Loads encoder outputs from storage

Supports multiple storage backends: disk (ECExampleConnector), Redis, S3, etc.
"""

import json
import logging
from typing import Any, Dict, Optional

from vllm.config import ECTransferConfig

logger = logging.getLogger(__name__)


def create_ec_transfer_config(
engine_id: str,
ec_role: str,
ec_connector_backend: str = "ECExampleConnector",
ec_storage_path: Optional[str] = None,
ec_extra_config: Optional[str] = None,
) -> ECTransferConfig:
"""
Create ECTransferConfig for vLLM encoder disaggregation.

Args:
engine_id: Unique identifier for this engine instance
ec_role: Role of this instance - "ec_producer" (encoder) or "ec_consumer" (PD worker)
ec_connector_backend: ECConnector implementation class name
ec_storage_path: Storage path for disk-based connectors
ec_extra_config: Additional connector config as JSON string

Returns:
ECTransferConfig configured for the specified role

Raises:
ValueError: If required config is missing
"""
# Parse extra config if provided
extra_config: Dict[str, Any] = {}
if ec_extra_config:
try:
extra_config = json.loads(ec_extra_config)
logger.debug(f"Parsed ec_extra_config: {extra_config}")
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in --ec-extra-config: {e}")

# Add storage path to config if provided
if ec_storage_path:
extra_config["storage_path"] = ec_storage_path

# Validate required fields
if ec_connector_backend == "ECExampleConnector" and "storage_path" not in extra_config:
raise ValueError(
"ECExampleConnector requires 'storage_path' in config. "
"Provide via --ec-storage-path or --ec-extra-config"
)

logger.info(
f"Creating ECTransferConfig: engine_id={engine_id}, role={ec_role}, "
f"backend={ec_connector_backend}, config={extra_config}"
)

return ECTransferConfig(
engine_id=engine_id,
ec_role=ec_role,
ec_connector=ec_connector_backend,
ec_connector_extra_config=extra_config,
)


def get_encoder_engine_id(namespace: str, component: str, instance_id: int) -> str:
"""
Generate unique engine_id for encoder worker instance.

Format: {namespace}.{component}.encoder.{instance_id}

Args:
namespace: Dynamo namespace
component: Component name (typically "encoder")
instance_id: Instance ID from runtime

Returns:
Unique engine ID string
"""
engine_id = f"{namespace}.{component}.encoder.{instance_id}"
logger.debug(f"Generated encoder engine_id: {engine_id}")
return engine_id


def get_pd_engine_id(namespace: str, component: str, instance_id: int) -> str:
"""
Generate unique engine_id for PD worker instance.

Format: {namespace}.{component}.pd.{instance_id}

Args:
namespace: Dynamo namespace
component: Component name (typically "backend" or "decoder")
instance_id: Instance ID from runtime

Returns:
Unique engine ID string
"""
engine_id = f"{namespace}.{component}.pd.{instance_id}"
logger.debug(f"Generated PD engine_id: {engine_id}")
return engine_id

100 changes: 98 additions & 2 deletions components/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,17 @@
)
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.ec_transfer_utils import (
create_ec_transfer_config,
get_encoder_engine_id,
get_pd_engine_id,
)
from dynamo.vllm.multimodal_handlers import (
EncodeWorkerHandler,
MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler,
ProcessorHandler,
VLLMNativeEncoderWorkerHandler,
)

from .args import Config, overwrite_args, parse_args
Expand Down Expand Up @@ -85,7 +91,10 @@ def signal_handler():
config.model = config.engine_args.model = await fetch_llm(config.model)

# Route to appropriate initialization based on config flags
if config.multimodal_processor:
if config.vllm_native_encoder_worker:
await init_vllm_native_encoder(runtime, config)
logger.debug("init_vllm_native_encoder completed")
elif config.multimodal_processor:
await init_multimodal_processor(runtime, config)
logger.debug("init_multimodal_processor completed")
elif config.multimodal_encode_worker:
Expand Down Expand Up @@ -717,7 +726,71 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con
),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
logger.error(f"Failed to serve encode worker endpoint: {e}")
raise
finally:
handler.cleanup()


async def init_vllm_native_encoder(runtime: DistributedRuntime, config: Config):
"""
Initialize vLLM-native encoder worker component (ECConnector mode).

This encoder uses vLLM's built-in encoder disaggregation with ECConnector.
vLLM handles encoder execution, caching, and storage automatically.
"""
# 1. Create component and endpoint
component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint)

# 2. Configure ECTransferConfig for producer role
engine_id = get_encoder_engine_id(
config.namespace, config.component, component.instance_id
)

ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_producer", # This instance produces encoder outputs
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)

# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config
logger.info(f"Configured as ECConnector producer with engine_id={engine_id}")

# 3. Setup vLLM engine
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config)

# 4. Create handler (no PD client needed for ECConnector mode)
handler = VLLMNativeEncoderWorkerHandler(
runtime,
component,
engine_client,
config,
)
handler.add_temp_dir(prometheus_temp_dir)

# 5. No async init needed - vLLM handles everything
# await handler.async_init(runtime) # Not needed for ECConnector mode

logger.info("Starting to serve vLLM-native encoder endpoint...")

# 6. Serve endpoint
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve vLLM-native encoder endpoint: {e}")
raise
finally:
handler.cleanup()
Expand All @@ -732,12 +805,35 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
2. --multimodal-encode-prefill-worker: Handles inline encoding (e.g., Llama 4)

Both can operate in aggregated (P+D) or disaggregated (P→D) mode.

When --ec-consumer-mode is enabled, configures as ECConnector consumer
to load encoder embeddings from shared storage.
"""
component = runtime.namespace(config.namespace).component(config.component)

generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")

# Configure ECConnector consumer mode if enabled
if config.ec_consumer_mode:
logger.info("Configuring as ECConnector consumer for encoder embeddings")

engine_id = get_pd_engine_id(
config.namespace, config.component, component.instance_id
)

ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_consumer", # This instance consumes encoder outputs
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)

# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config
logger.info(f"Configured as ECConnector consumer with engine_id={engine_id}")

(
engine_client,
vllm_config,
Expand Down
4 changes: 4 additions & 0 deletions components/src/dynamo/vllm/multimodal_handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

from dynamo.vllm.multimodal_handlers.encode_worker_handler import EncodeWorkerHandler
from dynamo.vllm.multimodal_handlers.processor_handler import ProcessorHandler
from dynamo.vllm.multimodal_handlers.vllm_native_encoder_handler import (
VLLMNativeEncoderWorkerHandler,
)
from dynamo.vllm.multimodal_handlers.worker_handler import (
MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler,
Expand All @@ -13,4 +16,5 @@
"ProcessorHandler",
"MultimodalPDWorkerHandler",
"MultimodalDecodeWorkerHandler",
"VLLMNativeEncoderWorkerHandler",
]
Loading
Loading