Skip to content

Commit 3992615

Browse files
nnshah1claude
andcommitted
refactor: align multimodal example port allocation with vLLM components
Replace dynamic port allocation with environment variable-based configuration to match the approach used in components/src/dynamo/vllm/args.py. Changes: - Remove runtime.allocate_port_block() in favor of DYN_VLLM_KV_EVENT_PORT env var - Add port validation with registered port range (1024-49151) - Replace set_side_channel_host_and_port() with ensure_side_channel_host() - Update configure_ports() to be synchronous and environment-based - Add create_kv_events_config() function matching vLLM components pattern - Remove side_channel_port from Config class - Simplify overwrite_args() to follow vLLM components structure 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 209783d commit 3992615

File tree

2 files changed

+141
-97
lines changed

2 files changed

+141
-97
lines changed

examples/multimodal/components/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ def signal_handler():
418418
args, config = VllmBaseWorker.parse_args()
419419

420420
# vLLM config overwrites
421-
await configure_ports(runtime, config)
421+
configure_ports(config)
422422
overwrite_args(config)
423423
await init(runtime, args, config)
424424

examples/multimodal/utils/args.py

Lines changed: 140 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@
22
# SPDX-License-Identifier: Apache-2.0
33

44
import argparse
5-
import json
65
import logging
76
import os
87
import socket
98
import sys
10-
import time
119
from typing import Callable, List, Optional, Tuple
1210

1311
from vllm.config import KVTransferConfig
1412
from vllm.distributed.kv_events import KVEventsConfig
1513
from vllm.engine.arg_utils import AsyncEngineArgs
1614

17-
from dynamo.runtime import DistributedRuntime
18-
1915
logger = logging.getLogger(__name__)
2016

2117
DYN_NAMESPACE = os.environ.get("DYN_NAMESPACE", "dynamo")
@@ -30,7 +26,6 @@ class Config:
3026
component: str
3127
endpoint: str
3228
kv_port: Optional[int] = None
33-
side_channel_port: Optional[int] = None
3429

3530
# mirror vLLM
3631
model: str
@@ -115,98 +110,167 @@ def base_parse_args(
115110
return args, config
116111

117112

118-
async def allocate_and_reserve_port(
119-
runtime: DistributedRuntime,
120-
namespace: str,
121-
worker_id: str,
122-
reason: str,
123-
) -> int:
124-
"""
125-
Get an OS-assigned port and atomically reserve it.
126-
Retries until successful or internal max attempts reached.
113+
# Port range constants
114+
REGISTERED_PORT_MIN = 1024
115+
REGISTERED_PORT_MAX = 49151
116+
117+
118+
def _resolve_port(env_var: str, default_port: int) -> int:
127119
"""
120+
Resolve port from environment variable with validation.
128121
129-
context_json = {
130-
"worker_id": worker_id,
131-
"reason": reason,
132-
"reserved_at": time.time(),
133-
"pid": os.getpid(),
134-
"block_size": 1,
135-
}
122+
Args:
123+
env_var: Environment variable name
124+
default_port: Default port if env var not set
125+
126+
Returns:
127+
Validated port number
128+
129+
Raises:
130+
ValueError: If port is invalid or out of range
131+
"""
132+
env_value = os.getenv(env_var)
133+
if env_value is None:
134+
port = default_port
135+
else:
136+
try:
137+
port = int(env_value)
138+
except ValueError as exc:
139+
raise ValueError(
140+
f"{env_var} must be an integer port number, got {env_value!r}."
141+
) from exc
142+
143+
if not (REGISTERED_PORT_MIN <= port <= REGISTERED_PORT_MAX):
144+
raise ValueError(
145+
f"{env_var} port {port} is outside of the registered port range "
146+
f"({REGISTERED_PORT_MIN}-{REGISTERED_PORT_MAX})."
147+
)
136148

137-
# Any ephemeral port, equivalent to binding port 0
138-
port_range_min = 32_768
139-
port_range_max = 60_999
140-
allocated_ports = await runtime.allocate_port_block(
141-
namespace,
142-
port_range_min,
143-
port_range_max,
144-
1, # how many ports to allocate
145-
json.dumps(context_json),
146-
)
147-
if not allocated_ports:
148-
raise RuntimeError("allocate_port_block returned no ports")
149-
port = allocated_ports[0]
150-
logger.debug(f"Reserved OS-assigned port {port} for {worker_id}")
151149
return port
152150

153151

154-
async def configure_ports(runtime: DistributedRuntime, config: Config):
155-
"""Configure including port allocation and vLLM overrides."""
152+
# Environment variables configuration
153+
environment_variables = {
154+
# Port used for KV events publishing to the frontend
155+
# Note: This env variable is ignored if explicitly using --kv-events-config ''
156+
"DYN_VLLM_KV_EVENT_PORT": lambda: _resolve_port("DYN_VLLM_KV_EVENT_PORT", 20080),
157+
}
156158

157-
# First, allocate ports
158-
dp_rank = config.engine_args.data_parallel_rank or 0
159-
worker_id = f"vllm-{config.component}-dp{dp_rank}"
160-
161-
# Allocate KV events port
162-
kv_port = await allocate_and_reserve_port(
163-
runtime=runtime,
164-
namespace=config.namespace,
165-
worker_id=f"{worker_id}",
166-
reason="zmq_kv_event_port",
167-
)
168159

169-
# Allocate side channel port
170-
side_channel_port = await allocate_and_reserve_port(
171-
runtime=runtime,
172-
namespace=config.namespace,
173-
worker_id=f"{worker_id}",
174-
reason="nixl_side_channel_port",
175-
)
160+
def __getattr__(name: str):
161+
"""
162+
Gets environment variables lazily.
163+
"""
164+
if name in environment_variables:
165+
return environment_variables[name]()
166+
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
167+
168+
169+
def get_host_ip() -> str:
170+
"""Get the IP address of the host for side-channel coordination."""
171+
try:
172+
host_name = socket.gethostname()
173+
except socket.error as exc:
174+
logger.warning("Failed to get hostname: %s, falling back to 127.0.0.1", exc)
175+
return "127.0.0.1"
176+
177+
try:
178+
host_ip = socket.gethostbyname(host_name)
179+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
180+
test_socket.bind((host_ip, 0))
181+
return host_ip
182+
except socket.gaierror as exc:
183+
logger.warning(
184+
"Hostname %s cannot be resolved: %s, falling back to 127.0.0.1",
185+
host_name,
186+
exc,
187+
)
188+
return "127.0.0.1"
189+
except socket.error as exc:
190+
logger.warning(
191+
"Hostname %s is not usable for binding: %s, falling back to 127.0.0.1",
192+
host_name,
193+
exc,
194+
)
195+
return "127.0.0.1"
176196

177-
# Update config with allocated ports
178-
config.kv_port = kv_port
179-
config.side_channel_port = side_channel_port
180197

198+
def ensure_side_channel_host():
199+
"""Ensure the NIXL side-channel host is available without overriding user settings."""
181200

182-
def overwrite_args(config):
183-
"""Set vLLM defaults for Dynamo."""
184-
assert config.kv_port is not None, "Must set the kv_port, use configure_ports"
185-
assert (
186-
config.side_channel_port is not None
187-
), "Must set the side_channel_port, use configure_ports"
201+
existing_host = os.getenv("VLLM_NIXL_SIDE_CHANNEL_HOST")
202+
if existing_host:
203+
logger.debug(
204+
"Preserving existing VLLM_NIXL_SIDE_CHANNEL_HOST=%s", existing_host
205+
)
206+
return
207+
208+
host_ip = get_host_ip()
209+
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = host_ip
210+
logger.debug("Set VLLM_NIXL_SIDE_CHANNEL_HOST to %s", host_ip)
211+
212+
213+
def configure_ports(config: Config):
214+
"""Configure port settings from dedicated environment overrides."""
215+
216+
if config.engine_args.enable_prefix_caching:
217+
config.kv_port = globals()["DYN_VLLM_KV_EVENT_PORT"]
188218

219+
# Always ensure side channel host for NIXL connector
220+
ensure_side_channel_host()
221+
222+
223+
def create_kv_events_config(config: Config) -> Optional[KVEventsConfig]:
224+
"""Create KVEventsConfig for prefix caching if needed."""
225+
# If prefix caching is not enabled, no events config needed
226+
if not config.engine_args.enable_prefix_caching:
227+
return None
228+
229+
# If user provided their own config, use that
230+
if c := getattr(config.engine_args, "kv_events_config", None):
231+
logger.info(f"Using user-provided kv_events_config {c}")
232+
return None
233+
234+
# Create default events config for prefix caching
235+
if config.kv_port is None:
236+
raise ValueError(
237+
"config.kv_port is not set; call configure_ports(...) before overwrite_args "
238+
"or provide --kv-event-config to supply an explicit endpoint."
239+
)
189240
dp_rank = config.engine_args.data_parallel_rank or 0
190241

242+
return KVEventsConfig(
243+
enable_kv_cache_events=True,
244+
publisher="zmq",
245+
endpoint=f"tcp://*:{config.kv_port - dp_rank}", # vLLM will iterate dp_rank for us, so we need to subtract it out TODO: fix in vLLM
246+
)
247+
248+
249+
def overwrite_args(config):
250+
"""Set vLLM defaults for Dynamo."""
191251
defaults = {
192252
"task": "generate",
253+
# As of vLLM >=0.10.0 the engine unconditionally calls
254+
# `sampling_params.update_from_tokenizer(...)`, so we can no longer
255+
# skip tokenizer initialisation. Setting this to **False** avoids
256+
# a NoneType error when the processor accesses the tokenizer.
193257
"skip_tokenizer_init": False,
194258
"disable_log_requests": True,
195-
"enable_prefix_caching": True,
196-
# KV routing relies on logging KV metrics
197259
"disable_log_stats": False,
198-
# Always setting up kv transfer for disagg
199-
"kv_transfer_config": KVTransferConfig(
200-
kv_connector="NixlConnector", kv_role="kv_both"
201-
),
202-
"kv_events_config": KVEventsConfig(
203-
enable_kv_cache_events=True,
204-
publisher="zmq",
205-
endpoint=f"tcp://*:{config.kv_port - dp_rank}", # vLLM will iterate dp_rank for us, so we need to subtract it out TODO: fix in vLLM
206-
),
207260
}
208261

209-
set_side_channel_host_and_port(config)
262+
# Set KV transfer config for NIXL connector
263+
defaults["kv_transfer_config"] = KVTransferConfig(
264+
kv_connector="NixlConnector", kv_role="kv_both"
265+
)
266+
267+
kv_events_config = create_kv_events_config(config)
268+
logger.info(
269+
f"Using Dynamo default kv_events_config for publishing kv events over zmq: {kv_events_config}"
270+
)
271+
272+
if kv_events_config:
273+
defaults["kv_events_config"] = kv_events_config
210274

211275
logger.debug("Setting Dynamo defaults for vLLM")
212276
for key, value in defaults.items():
@@ -217,23 +281,3 @@ def overwrite_args(config):
217281
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
218282

219283

220-
def set_side_channel_host_and_port(config: Config, hostname: Optional[str] = None):
221-
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
222-
This sets the port number for the side channel.
223-
"""
224-
if hostname is None:
225-
hostname = socket.gethostname()
226-
# Test if hostname is usable by attempting to bind to it
227-
try:
228-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
229-
test_socket.bind((hostname, 0))
230-
except (socket.error, socket.gaierror):
231-
# If hostname is not usable, fall back to localhost
232-
logger.warning(
233-
f"Hostname '{hostname}' is not usable, falling back to '127.0.0.1'"
234-
)
235-
hostname = "127.0.0.1"
236-
237-
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = hostname
238-
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(config.side_channel_port)
239-
logger.debug(f"Set NIXL side channel to {hostname}:{config.side_channel_port}")

0 commit comments

Comments
 (0)