44
55import logging
66import os
7+ import socket
78from typing import Any , Dict , Optional
89
910from vllm .config import KVTransferConfig
1314
1415from dynamo ._core import get_reasoning_parser_names , get_tool_parser_names
1516from dynamo .common .config_dump import add_config_dump_args , register_encoder
16- from dynamo .runtime import DistributedRuntime
17-
18- from . import __version__
19- from .ports import (
20- DEFAULT_DYNAMO_PORT_MAX ,
21- DEFAULT_DYNAMO_PORT_MIN ,
22- DynamoPortRange ,
23- PortAllocationRequest ,
24- PortMetadata ,
25- allocate_and_reserve_port ,
26- allocate_and_reserve_port_block ,
27- get_host_ip ,
28- )
17+
18+ from . import __version__ , envs
2919
3020logger = logging .getLogger (__name__ )
3121
3222DEFAULT_MODEL = "Qwen/Qwen3-0.6B"
33-
3423VALID_CONNECTORS = {"nixl" , "lmcache" , "kvbm" , "null" , "none" }
3524
3625# Global LMCache configuration - initialize once on module import
@@ -48,7 +37,6 @@ class Config:
4837 is_decode_worker : bool
4938 migration_limit : int = 0
5039 kv_port : Optional [int ] = None
51- port_range : DynamoPortRange
5240 custom_jinja_template : Optional [str ] = None
5341
5442 # mirror vLLM
@@ -115,18 +103,6 @@ def parse_args() -> Config:
115103 default = 0 ,
116104 help = "Maximum number of times a request may be migrated to a different engine worker. The number may be overridden by the engine." ,
117105 )
118- parser .add_argument (
119- "--dynamo-port-min" ,
120- type = int ,
121- default = DEFAULT_DYNAMO_PORT_MIN ,
122- help = f"Minimum port number for Dynamo services (default: { DEFAULT_DYNAMO_PORT_MIN } ). Must be in registered ports range (1024-49151)." ,
123- )
124- parser .add_argument (
125- "--dynamo-port-max" ,
126- type = int ,
127- default = DEFAULT_DYNAMO_PORT_MAX ,
128- help = f"Maximum port number for Dynamo services (default: { DEFAULT_DYNAMO_PORT_MAX } ). Must be in registered ports range (1024-49151)." ,
129- )
130106 parser .add_argument (
131107 "--connector" ,
132108 nargs = "*" ,
@@ -249,9 +225,6 @@ def parse_args() -> Config:
249225 config .is_prefill_worker = args .is_prefill_worker
250226 config .is_decode_worker = args .is_decode_worker
251227 config .migration_limit = args .migration_limit
252- config .port_range = DynamoPortRange (
253- min = args .dynamo_port_min , max = args .dynamo_port_max
254- )
255228 config .tool_call_parser = args .dyn_tool_call_parser
256229 config .reasoning_parser = args .dyn_reasoning_parser
257230 config .custom_jinja_template = args .custom_jinja_template
@@ -315,67 +288,14 @@ def parse_args() -> Config:
315288 return config
316289
317290
318- async def configure_ports (runtime : DistributedRuntime , config : Config ):
319- """Configure including port allocation and vLLM overrides."""
320-
321- dp_rank = config .engine_args .data_parallel_rank or 0
322- worker_id = f"vllm-{ config .component } -dp{ dp_rank } "
291+ async def configure_ports (config : Config ):
292+ """Configure port settings from dedicated environment overrides."""
323293
324- # Allocate KV events port
325294 if config .engine_args .enable_prefix_caching :
326- kv_metadata = PortMetadata (worker_id = worker_id , reason = "zmq_kv_event_port" )
327- kv_port = await allocate_and_reserve_port (
328- runtime = runtime ,
329- namespace = config .namespace ,
330- metadata = kv_metadata ,
331- port_range = config .port_range ,
332- )
333- config .kv_port = kv_port
334- logger .info (f"Allocated ZMQ KV events port: { kv_port } (worker_id={ worker_id } )" )
335-
336- # Check if NIXL is needed based on connector list
337- needs_nixl = config .has_connector ("nixl" )
338-
339- if needs_nixl :
340- # Allocate side channel ports
341- # https://github.com/vllm-project/vllm/blob/releases/v0.10.0/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py#L372
342- # NIXL calculates ports as: base_port + (dp_rank * tp_size) + tp_rank
343- # For dp_rank, we need to reserve tp_size consecutive ports
344- tp_size = config .engine_args .tensor_parallel_size or 1
345-
346- # The first port for this dp_rank will be at: base_port + (dp_rank * tp_size)
347- # We need to allocate tp_size consecutive ports starting from there
348- nixl_metadata = PortMetadata (
349- worker_id = worker_id , reason = "nixl_side_channel_port"
350- )
351- nixl_request = PortAllocationRequest (
352- metadata = nixl_metadata ,
353- port_range = config .port_range ,
354- block_size = tp_size ,
355- )
356- allocated_ports = await allocate_and_reserve_port_block (
357- runtime , config .namespace , nixl_request
358- )
359- first_port_for_dp_rank = allocated_ports [0 ]
360-
361- # Calculate the base port that NIXL expects
362- # base_port = first_port_for_dp_rank - (dp_rank * tp_size)
363- nixl_offset = dp_rank * tp_size
364- base_side_channel_port = first_port_for_dp_rank - nixl_offset
295+ config .kv_port = envs .DYN_VLLM_KV_EVENT_PORT
365296
366- if base_side_channel_port < 0 :
367- raise ValueError (
368- f"NIXL base port calculation resulted in negative port: "
369- f"first_allocated_port={ first_port_for_dp_rank } , offset={ nixl_offset } , "
370- f"base_port={ base_side_channel_port } . Current range: { config .port_range .min } -{ config .port_range .max } . "
371- f"Consider using a higher port range."
372- )
373-
374- logger .info (
375- f"Allocated NIXL side channel ports: base={ base_side_channel_port } , "
376- f"allocated_ports={ allocated_ports } (worker_id={ worker_id } , dp_rank={ dp_rank } , tp_size={ tp_size } )"
377- )
378- set_side_channel_host_and_port (base_side_channel_port )
297+ if config .has_connector ("nixl" ):
298+ ensure_side_channel_host ()
379299
380300
381301def create_kv_events_config (config : Config ) -> Optional [KVEventsConfig ]:
@@ -385,18 +305,18 @@ def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
385305 return None
386306
387307 # If user provided their own config, use that
388- if getattr (config .engine_args , "kv_events_config" ):
389- logger .info ("Using user-provided kv_events_config" )
308+ if c := getattr (config .engine_args , "kv_events_config" ):
309+ logger .info (f "Using user-provided kv_events_config { c } " )
390310 return None
391311
392312 # Create default events config for prefix caching
393- logger .info ("Creating Dynamo default kv_events_config for prefix caching" )
394313 if config .kv_port is None :
395314 raise ValueError (
396315 "config.kv_port is not set; call configure_ports(...) before overwrite_args "
397316 "or provide --kv-event-config to supply an explicit endpoint."
398317 )
399318 dp_rank = config .engine_args .data_parallel_rank or 0
319+
400320 return KVEventsConfig (
401321 enable_kv_cache_events = True ,
402322 publisher = "zmq" ,
@@ -472,6 +392,10 @@ def overwrite_args(config):
472392 defaults ["kv_transfer_config" ] = kv_transfer_config
473393
474394 kv_events_config = create_kv_events_config (config )
395+ logger .info (
396+ f"Using Dynamo default kv_events_config for publishing kv events over zmq: { kv_events_config } "
397+ )
398+
475399 if kv_events_config :
476400 defaults ["kv_events_config" ] = kv_events_config
477401
@@ -484,11 +408,45 @@ def overwrite_args(config):
484408 raise ValueError (f"{ key } not found in AsyncEngineArgs from vLLM." )
485409
486410
487- def set_side_channel_host_and_port (side_channel_port : int ):
488- """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
489- This sets the port number for the side channel.
490- """
411+ def get_host_ip () -> str :
412+ """Get the IP address of the host for side-channel coordination."""
413+ try :
414+ host_name = socket .gethostname ()
415+ except socket .error as exc :
416+ logger .warning ("Failed to get hostname: %s, falling back to 127.0.0.1" , exc )
417+ return "127.0.0.1"
418+
419+ try :
420+ host_ip = socket .gethostbyname (host_name )
421+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as test_socket :
422+ test_socket .bind ((host_ip , 0 ))
423+ return host_ip
424+ except socket .gaierror as exc :
425+ logger .warning (
426+ "Hostname %s cannot be resolved: %s, falling back to 127.0.0.1" ,
427+ host_name ,
428+ exc ,
429+ )
430+ return "127.0.0.1"
431+ except socket .error as exc :
432+ logger .warning (
433+ "Hostname %s is not usable for binding: %s, falling back to 127.0.0.1" ,
434+ host_name ,
435+ exc ,
436+ )
437+ return "127.0.0.1"
438+
439+
440+ def ensure_side_channel_host ():
441+ """Ensure the NIXL side-channel host is available without overriding user settings."""
442+
443+ existing_host = os .getenv ("VLLM_NIXL_SIDE_CHANNEL_HOST" )
444+ if existing_host :
445+ logger .debug (
446+ "Preserving existing VLLM_NIXL_SIDE_CHANNEL_HOST=%s" , existing_host
447+ )
448+ return
449+
491450 host_ip = get_host_ip ()
492451 os .environ ["VLLM_NIXL_SIDE_CHANNEL_HOST" ] = host_ip
493- os .environ ["VLLM_NIXL_SIDE_CHANNEL_PORT" ] = str (side_channel_port )
494- logger .debug (f"Set NIXL side channel to { host_ip } :{ side_channel_port } " )
452+ logger .debug ("Set VLLM_NIXL_SIDE_CHANNEL_HOST to %s" , host_ip )
0 commit comments