22# SPDX-License-Identifier: Apache-2.0
33
44import argparse
5- import json
65import logging
76import os
87import socket
98import sys
10- import time
119from typing import Callable , List , Optional , Tuple
1210
1311from vllm .config import KVTransferConfig
1412from vllm .distributed .kv_events import KVEventsConfig
1513from vllm .engine .arg_utils import AsyncEngineArgs
1614
17- from dynamo .runtime import DistributedRuntime
18-
1915logger = logging .getLogger (__name__ )
2016
2117DYN_NAMESPACE = os .environ .get ("DYN_NAMESPACE" , "dynamo" )
@@ -30,7 +26,6 @@ class Config:
3026 component : str
3127 endpoint : str
3228 kv_port : Optional [int ] = None
33- side_channel_port : Optional [int ] = None
3429
3530 # mirror vLLM
3631 model : str
@@ -115,76 +110,45 @@ def base_parse_args(
115110 return args , config
116111
117112
118- async def allocate_and_reserve_port (
119- runtime : DistributedRuntime ,
120- namespace : str ,
121- worker_id : str ,
122- reason : str ,
123- ) -> int :
124- """
125- Get an OS-assigned port and atomically reserve it.
126- Retries until successful or internal max attempts reached.
127- """
113+ def get_kv_port () -> int :
114+ """Get KV events port from environment or default."""
115+ return int (os .getenv ("DYN_VLLM_KV_EVENT_PORT" , "20080" ))
128116
129- context_json = {
130- "worker_id" : worker_id ,
131- "reason" : reason ,
132- "reserved_at" : time .time (),
133- "pid" : os .getpid (),
134- "block_size" : 1 ,
135- }
136117
137- # Any ephemeral port, equivalent to binding port 0
138- port_range_min = 32_768
139- port_range_max = 60_999
140- allocated_ports = await runtime .allocate_port_block (
141- namespace ,
142- port_range_min ,
143- port_range_max ,
144- 1 , # how many ports to allocate
145- json .dumps (context_json ),
146- )
147- if not allocated_ports :
148- raise RuntimeError ("allocate_port_block returned no ports" )
149- port = allocated_ports [0 ]
150- logger .debug (f"Reserved OS-assigned port { port } for { worker_id } " )
151- return port
118+ def ensure_side_channel_host ():
119+ """Ensure the NIXL side-channel host is available without overriding user settings."""
120+ existing_host = os .getenv ("VLLM_NIXL_SIDE_CHANNEL_HOST" )
121+ if existing_host :
122+ logger .debug (
123+ "Preserving existing VLLM_NIXL_SIDE_CHANNEL_HOST=%s" , existing_host
124+ )
125+ return
152126
127+ try :
128+ host_name = socket .gethostname ()
129+ host_ip = socket .gethostbyname (host_name )
130+ with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as test_socket :
131+ test_socket .bind ((host_ip , 0 ))
132+ os .environ ["VLLM_NIXL_SIDE_CHANNEL_HOST" ] = host_ip
133+ logger .debug ("Set VLLM_NIXL_SIDE_CHANNEL_HOST to %s" , host_ip )
134+ except (socket .error , socket .gaierror ):
135+ logger .warning ("Failed to get hostname, falling back to 127.0.0.1" )
136+ os .environ ["VLLM_NIXL_SIDE_CHANNEL_HOST" ] = "127.0.0.1"
153137
154- async def configure_ports (runtime : DistributedRuntime , config : Config ):
155- """Configure including port allocation and vLLM overrides."""
156138
157- # First, allocate ports
158- dp_rank = config .engine_args .data_parallel_rank or 0
159- worker_id = f"vllm-{ config .component } -dp{ dp_rank } "
160-
161- # Allocate KV events port
162- kv_port = await allocate_and_reserve_port (
163- runtime = runtime ,
164- namespace = config .namespace ,
165- worker_id = f"{ worker_id } " ,
166- reason = "zmq_kv_event_port" ,
167- )
139+ def configure_ports (config : Config ):
140+ """Configure port settings from dedicated environment overrides."""
168141
169- # Allocate side channel port
170- side_channel_port = await allocate_and_reserve_port (
171- runtime = runtime ,
172- namespace = config .namespace ,
173- worker_id = f"{ worker_id } " ,
174- reason = "nixl_side_channel_port" ,
175- )
142+ if config .engine_args .enable_prefix_caching :
143+ config .kv_port = get_kv_port ()
176144
177- # Update config with allocated ports
178- config .kv_port = kv_port
179- config .side_channel_port = side_channel_port
145+ ensure_side_channel_host ()
180146
181147
182148def overwrite_args (config ):
183149 """Set vLLM defaults for Dynamo."""
184- assert config .kv_port is not None , "Must set the kv_port, use configure_ports"
185- assert (
186- config .side_channel_port is not None
187- ), "Must set the side_channel_port, use configure_ports"
150+ if config .engine_args .enable_prefix_caching :
151+ assert config .kv_port is not None , "Must set the kv_port, use configure_ports"
188152
189153 dp_rank = config .engine_args .data_parallel_rank or 0
190154
@@ -206,34 +170,10 @@ def overwrite_args(config):
206170 ),
207171 }
208172
209- set_side_channel_host_and_port (config )
210-
211173 logger .debug ("Setting Dynamo defaults for vLLM" )
212174 for key , value in defaults .items ():
213175 if hasattr (config .engine_args , key ):
214176 setattr (config .engine_args , key , value )
215177 logger .debug (f" engine_args.{ key } = { value } " )
216178 else :
217179 raise ValueError (f"{ key } not found in AsyncEngineArgs from vLLM." )
218-
219-
220- def set_side_channel_host_and_port (config : Config , hostname : Optional [str ] = None ):
221- """vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
222- This sets the port number for the side channel.
223- """
224- if hostname is None :
225- hostname = socket .gethostname ()
226- # Test if hostname is usable by attempting to bind to it
227- try :
228- with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as test_socket :
229- test_socket .bind ((hostname , 0 ))
230- except (socket .error , socket .gaierror ):
231- # If hostname is not usable, fall back to localhost
232- logger .warning (
233- f"Hostname '{ hostname } ' is not usable, falling back to '127.0.0.1'"
234- )
235- hostname = "127.0.0.1"
236-
237- os .environ ["VLLM_NIXL_SIDE_CHANNEL_HOST" ] = hostname
238- os .environ ["VLLM_NIXL_SIDE_CHANNEL_PORT" ] = str (config .side_channel_port )
239- logger .debug (f"Set NIXL side channel to { hostname } :{ config .side_channel_port } " )
0 commit comments