Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 74 additions & 7 deletions components/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,23 @@ class Config:

# multimodal options
multimodal_processor: bool = False
# Emebdding Cache Processor is different from the regular processor
# TODO: Have a single processor for all cases and adopting rust based processor
ec_processor: bool = False
multimodal_encode_worker: bool = False
multimodal_worker: bool = False
multimodal_decode_worker: bool = False
enable_multimodal: bool = False
multimodal_encode_prefill_worker: bool = False
mm_prompt_template: str = "USER: <image>\n<prompt> ASSISTANT:"

# vLLM-native encoder worker (ECConnector mode)
vllm_native_encoder_worker: bool = False
ec_connector_backend: Optional[str] = "ECExampleConnector"
ec_storage_path: Optional[str] = None
ec_extra_config: Optional[str] = None
ec_consumer_mode: bool = False

# dump config to file
dump_config_to: Optional[str] = None

Expand Down Expand Up @@ -153,6 +164,11 @@ def parse_args() -> Config:
action="store_true",
help="Run as multimodal processor component for handling multimodal requests",
)
parser.add_argument(
"--ec-processor",
action="store_true",
help="Run as ECConnector processor (routes multimodal requests to encoder then PD workers)",
)
parser.add_argument(
"--multimodal-encode-worker",
action="store_true",
Expand Down Expand Up @@ -191,6 +207,34 @@ def parse_args() -> Config:
"'USER: <image> please describe the image ASSISTANT:'."
),
)
parser.add_argument(
"--vllm-native-encoder-worker",
action="store_true",
help="Run as vLLM-native encoder worker using ECConnector for encoder disaggregation (requires shared storage)",
)
parser.add_argument(
"--ec-connector-backend",
type=str,
default="ECExampleConnector",
help="ECConnector implementation class for encoder disaggregation. Default: ECExampleConnector (disk-based)",
)
parser.add_argument(
"--ec-storage-path",
type=str,
default=None,
help="Storage path for ECConnector (required for ECExampleConnector, optional for other backends)",
)
parser.add_argument(
"--ec-extra-config",
type=str,
default=None,
help="Additional ECConnector configuration as JSON string",
)
parser.add_argument(
"--ec-consumer-mode",
action="store_true",
help="Configure as ECConnector consumer for receiving encoder embeddings (for PD workers)",
)
parser.add_argument(
"--store-kv",
type=str,
Expand Down Expand Up @@ -271,27 +315,42 @@ def parse_args() -> Config:
# Check multimodal role exclusivity
mm_flags = (
int(bool(args.multimodal_processor))
+ int(bool(args.ec_processor))
+ int(bool(args.multimodal_encode_worker))
+ int(bool(args.multimodal_worker))
+ int(bool(args.multimodal_decode_worker))
+ int(bool(args.multimodal_encode_prefill_worker))
+ int(bool(args.vllm_native_encoder_worker))
)
if mm_flags > 1:
raise ValueError(
"Use only one of --multimodal-processor, --multimodal-encode-worker, --multimodal-worker, --multimodal-decode-worker, or --multimodal-encode-prefill-worker"
"Use only one of --multimodal-processor, --ec-processor, --multimodal-encode-worker, --multimodal-worker, "
"--multimodal-decode-worker, --multimodal-encode-prefill-worker, or --vllm-native-encoder-worker"
)

if mm_flags == 1 and not args.enable_multimodal:
raise ValueError("Use --enable-multimodal to enable multimodal processing")

# Validate vLLM-native encoder worker config
if args.vllm_native_encoder_worker:
if (
args.ec_connector_backend == "ECExampleConnector"
and not args.ec_storage_path
):
raise ValueError(
"--ec-storage-path is required when using ECExampleConnector backend. "
"Specify a shared storage path for encoder cache."
)

# Set component and endpoint based on worker type
if args.multimodal_processor:
if args.multimodal_processor or args.ec_processor:
config.component = "processor"
config.endpoint = "generate"
elif args.multimodal_encode_worker:
config.component = "encoder"
config.endpoint = "generate"
elif args.multimodal_encode_prefill_worker:
elif (
args.vllm_native_encoder_worker
or args.multimodal_encode_worker
or args.multimodal_encode_prefill_worker
):
config.component = "encoder"
config.endpoint = "generate"
elif args.multimodal_decode_worker:
Expand Down Expand Up @@ -319,12 +378,18 @@ def parse_args() -> Config:
config.custom_jinja_template = args.custom_jinja_template
config.dyn_endpoint_types = args.dyn_endpoint_types
config.multimodal_processor = args.multimodal_processor
config.ec_processor = args.ec_processor
config.multimodal_encode_worker = args.multimodal_encode_worker
config.multimodal_worker = args.multimodal_worker
config.multimodal_decode_worker = args.multimodal_decode_worker
config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker
config.enable_multimodal = args.enable_multimodal
config.mm_prompt_template = args.mm_prompt_template
config.vllm_native_encoder_worker = args.vllm_native_encoder_worker
config.ec_connector_backend = args.ec_connector_backend
config.ec_storage_path = args.ec_storage_path
config.ec_extra_config = args.ec_extra_config
config.ec_consumer_mode = args.ec_consumer_mode
config.store_kv = args.store_kv
config.request_plane = args.request_plane
config.enable_local_indexer = args.enable_local_indexer
Expand Down Expand Up @@ -509,7 +574,9 @@ def overwrite_args(config):
setattr(config.engine_args, key, value)
logger.debug(f" engine_args.{key} = {value}")
else:
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
logger.debug(
f" Skipping engine_args.{key} (not available in this vLLM version)"
)


def get_host_ip() -> str:
Expand Down
166 changes: 164 additions & 2 deletions components/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.multimodal_handlers import (
ECProcessorHandler,
EncodeWorkerHandler,
MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler,
ProcessorHandler,
VLLMEncodeWorkerHandler,
)
from dynamo.vllm.multimodal_utils.encode_utils import create_ec_transfer_config

from .args import Config, overwrite_args, parse_args
from .handlers import DecodeWorkerHandler, PrefillWorkerHandler
Expand Down Expand Up @@ -85,7 +88,13 @@ def signal_handler():
config.model = config.engine_args.model = await fetch_llm(config.model)

# Route to appropriate initialization based on config flags
if config.multimodal_processor:
if config.vllm_native_encoder_worker:
await init_vllm_native_encoder(runtime, config)
logger.debug("init_vllm_native_encoder completed")
elif config.ec_processor:
await init_ec_processor(runtime, config)
logger.debug("init_ec_processor completed")
elif config.multimodal_processor:
await init_multimodal_processor(runtime, config)
logger.debug("init_multimodal_processor completed")
elif config.multimodal_encode_worker:
Expand Down Expand Up @@ -717,7 +726,138 @@ async def init_multimodal_encode_worker(runtime: DistributedRuntime, config: Con
),
)
except Exception as e:
logger.error(f"Failed to serve endpoints: {e}")
logger.error(f"Failed to serve encode worker endpoint: {e}")
raise
finally:
handler.cleanup()


async def init_vllm_native_encoder(runtime: DistributedRuntime, config: Config):
"""
Initialize vLLM-native encoder worker component (ECConnector mode).
In this mode, vLLM handles encoder execution, caching, and storage automatically.
"""
# Create component and endpoint
component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint)

# Configure ECTransferConfig for producer role
instance_id = 0
engine_id = f"{config.namespace}.{config.component}.encoder.{instance_id}"

# Configure encoder with producer role, it will be responsible for creating embeddings and storing them in the shared storage
ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_producer",
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)

# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config

# Setup vLLM engine
(
engine_client,
vllm_config,
default_sampling_params,
prometheus_temp_dir,
) = setup_vllm_engine(config)

# Initialize vLLM Native Encoder Worker Handler
handler = VLLMEncodeWorkerHandler(
runtime,
component,
engine_client,
config,
)
handler.add_temp_dir(prometheus_temp_dir)

# 5. No async init needed - vLLM handles everything
# await handler.async_init(runtime) # Not needed for ECConnector mode

logger.info("Starting to serve vLLM-native encoder endpoint...")

# 6. Serve endpoint
try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve vLLM-native encoder endpoint: {e}")
raise
finally:
handler.cleanup()


async def init_ec_processor(runtime: DistributedRuntime, config: Config):
"""
Initialize ECConnector processor component.

Simple processor that routes multimodal requests using ECConnector pattern:
1. Preprocess request (same as regular processor)
2. Send multimodal items to encoder workers (stores to shared storage)
3. Forward preprocessed request to PD worker (loads from shared storage)
4. Stream response back to client
"""
# Create component and endpoint
component = runtime.namespace(config.namespace).component(config.component)
generate_endpoint = component.endpoint(config.endpoint)

# Get encoder worker client
encoder_client = (
await runtime.namespace(config.namespace)
.component("encoder")
.endpoint("generate")
.client()
)

# Get PD worker client
pd_client = (
await runtime.namespace(config.namespace)
.component("backend")
.endpoint("generate")
.client()
)

# Get prompt template from args (must be passed via environment or command line)
mm_prompt_template = config.mm_prompt_template

# Create EC processor handler (with preprocessing like regular processor)
handler = ECProcessorHandler(
config.engine_args,
encoder_worker_client=encoder_client,
pd_worker_client=pd_client,
prompt_template=mm_prompt_template,
)

logger.info("Waiting for encoder and PD worker instances...")
await encoder_client.wait_for_instances()
await pd_client.wait_for_instances()

# Register the endpoint as entrypoint to a model (same as regular processor)
await register_llm(
ModelInput.Text, # Custom processor is used and this type bypasses SDK processor
ModelType.Chat,
generate_endpoint,
config.model,
config.served_model_name,
kv_cache_block_size=config.engine_args.block_size,
)

logger.info("Starting to serve EC processor endpoint...")

try:
await asyncio.gather(
generate_endpoint.serve_endpoint(
handler.generate, metrics_labels=[("model", config.model)]
),
)
except Exception as e:
logger.error(f"Failed to serve EC processor endpoint: {e}")
raise
finally:
handler.cleanup()
Expand All @@ -732,12 +872,34 @@ async def init_multimodal_worker(runtime: DistributedRuntime, config: Config):
2. --multimodal-encode-prefill-worker: Handles inline encoding (e.g., Llama 4)

Both can operate in aggregated (P+D) or disaggregated (P→D) mode.

When --ec-consumer-mode is enabled, configures as ECConnector consumer
to load encoder embeddings from shared storage.
"""
component = runtime.namespace(config.namespace).component(config.component)

generate_endpoint = component.endpoint(config.endpoint)
clear_endpoint = component.endpoint("clear_kv_blocks")

# Configure ECConnector consumer mode if enabled
if config.ec_consumer_mode:
logger.info("Configuring as ECConnector consumer for encoder embeddings")
instance_id = 0
engine_id = f"{config.namespace}.{config.component}.backend.{instance_id}"

# The PD Worker just load the embeddings from the shared storage, so it is a consumer
ec_transfer_config = create_ec_transfer_config(
engine_id=engine_id,
ec_role="ec_consumer",
ec_connector_backend=config.ec_connector_backend,
ec_storage_path=config.ec_storage_path,
ec_extra_config=config.ec_extra_config,
)

# Set ECTransferConfig on engine args
config.engine_args.ec_transfer_config = ec_transfer_config
logger.info(f"Configured as ECConnector consumer with engine_id={engine_id}")

(
engine_client,
vllm_config,
Expand Down
12 changes: 10 additions & 2 deletions components/src/dynamo/vllm/multimodal_handlers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,24 @@
# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from dynamo.vllm.multimodal_handlers.encode_worker_handler import EncodeWorkerHandler
from dynamo.vllm.multimodal_handlers.processor_handler import ProcessorHandler
from dynamo.vllm.multimodal_handlers.encode_worker_handler import (
EncodeWorkerHandler,
VLLMEncodeWorkerHandler,
)
from dynamo.vllm.multimodal_handlers.processor_handler import (
ECProcessorHandler,
ProcessorHandler,
)
from dynamo.vllm.multimodal_handlers.worker_handler import (
MultimodalDecodeWorkerHandler,
MultimodalPDWorkerHandler,
)

__all__ = [
"EncodeWorkerHandler",
"VLLMEncodeWorkerHandler",
"ProcessorHandler",
"MultimodalPDWorkerHandler",
"MultimodalDecodeWorkerHandler",
"ECProcessorHandler",
]
Loading
Loading