Skip to content

Commit 6aabff9

Browse files
nnshah1claude
andcommitted
refactor: replace dynamic port allocation with environment variables in multimodal example
Align multimodal example port configuration with components/src/dynamo/vllm/args.py approach. Changes: - Replace allocate_and_reserve_port() with get_kv_port() using DYN_VLLM_KV_EVENT_PORT env var - Update configure_ports() to be synchronous and environment-based - Replace set_side_channel_host_and_port() with ensure_side_channel_host() (host only) - Remove side_channel_port from Config class - Update worker.py to use new configure_ports(config) signature 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 209783d commit 6aabff9

File tree

2 files changed

+30
-83
lines changed

2 files changed

+30
-83
lines changed

examples/multimodal/components/worker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ def signal_handler():
418418
args, config = VllmBaseWorker.parse_args()
419419

420420
# vLLM config overwrites
421-
await configure_ports(runtime, config)
421+
configure_ports(config)
422422
overwrite_args(config)
423423
await init(runtime, args, config)
424424

examples/multimodal/utils/args.py

Lines changed: 29 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ class Config:
3030
component: str
3131
endpoint: str
3232
kv_port: Optional[int] = None
33-
side_channel_port: Optional[int] = None
3433

3534
# mirror vLLM
3635
model: str
@@ -115,76 +114,45 @@ def base_parse_args(
115114
return args, config
116115

117116

118-
async def allocate_and_reserve_port(
119-
runtime: DistributedRuntime,
120-
namespace: str,
121-
worker_id: str,
122-
reason: str,
123-
) -> int:
124-
"""
125-
Get an OS-assigned port and atomically reserve it.
126-
Retries until successful or internal max attempts reached.
127-
"""
128-
129-
context_json = {
130-
"worker_id": worker_id,
131-
"reason": reason,
132-
"reserved_at": time.time(),
133-
"pid": os.getpid(),
134-
"block_size": 1,
135-
}
117+
def get_kv_port() -> int:
118+
"""Get KV events port from environment or default."""
119+
return int(os.getenv("DYN_VLLM_KV_EVENT_PORT", "20080"))
136120

137-
# Any ephemeral port, equivalent to binding port 0
138-
port_range_min = 32_768
139-
port_range_max = 60_999
140-
allocated_ports = await runtime.allocate_port_block(
141-
namespace,
142-
port_range_min,
143-
port_range_max,
144-
1, # how many ports to allocate
145-
json.dumps(context_json),
146-
)
147-
if not allocated_ports:
148-
raise RuntimeError("allocate_port_block returned no ports")
149-
port = allocated_ports[0]
150-
logger.debug(f"Reserved OS-assigned port {port} for {worker_id}")
151-
return port
152121

122+
def ensure_side_channel_host():
123+
"""Ensure the NIXL side-channel host is available without overriding user settings."""
124+
existing_host = os.getenv("VLLM_NIXL_SIDE_CHANNEL_HOST")
125+
if existing_host:
126+
logger.debug(
127+
"Preserving existing VLLM_NIXL_SIDE_CHANNEL_HOST=%s", existing_host
128+
)
129+
return
153130

154-
async def configure_ports(runtime: DistributedRuntime, config: Config):
155-
"""Configure including port allocation and vLLM overrides."""
131+
try:
132+
host_name = socket.gethostname()
133+
host_ip = socket.gethostbyname(host_name)
134+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
135+
test_socket.bind((host_ip, 0))
136+
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = host_ip
137+
logger.debug("Set VLLM_NIXL_SIDE_CHANNEL_HOST to %s", host_ip)
138+
except (socket.error, socket.gaierror):
139+
logger.warning("Failed to get hostname, falling back to 127.0.0.1")
140+
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = "127.0.0.1"
156141

157-
# First, allocate ports
158-
dp_rank = config.engine_args.data_parallel_rank or 0
159-
worker_id = f"vllm-{config.component}-dp{dp_rank}"
160-
161-
# Allocate KV events port
162-
kv_port = await allocate_and_reserve_port(
163-
runtime=runtime,
164-
namespace=config.namespace,
165-
worker_id=f"{worker_id}",
166-
reason="zmq_kv_event_port",
167-
)
168142

169-
# Allocate side channel port
170-
side_channel_port = await allocate_and_reserve_port(
171-
runtime=runtime,
172-
namespace=config.namespace,
173-
worker_id=f"{worker_id}",
174-
reason="nixl_side_channel_port",
175-
)
143+
def configure_ports(config: Config):
144+
"""Configure port settings from dedicated environment overrides."""
145+
146+
if config.engine_args.enable_prefix_caching:
147+
config.kv_port = get_kv_port()
176148

177-
# Update config with allocated ports
178-
config.kv_port = kv_port
179-
config.side_channel_port = side_channel_port
149+
ensure_side_channel_host()
180150

181151

182152
def overwrite_args(config):
183153
"""Set vLLM defaults for Dynamo."""
184-
assert config.kv_port is not None, "Must set the kv_port, use configure_ports"
185-
assert (
186-
config.side_channel_port is not None
187-
), "Must set the side_channel_port, use configure_ports"
154+
if config.engine_args.enable_prefix_caching:
155+
assert config.kv_port is not None, "Must set the kv_port, use configure_ports"
188156

189157
dp_rank = config.engine_args.data_parallel_rank or 0
190158

@@ -206,7 +174,6 @@ def overwrite_args(config):
206174
),
207175
}
208176

209-
set_side_channel_host_and_port(config)
210177

211178
logger.debug("Setting Dynamo defaults for vLLM")
212179
for key, value in defaults.items():
@@ -217,23 +184,3 @@ def overwrite_args(config):
217184
raise ValueError(f"{key} not found in AsyncEngineArgs from vLLM.")
218185

219186

220-
def set_side_channel_host_and_port(config: Config, hostname: Optional[str] = None):
221-
"""vLLM V1 NixlConnector creates a side channel to exchange metadata with other NIXL connectors.
222-
This sets the port number for the side channel.
223-
"""
224-
if hostname is None:
225-
hostname = socket.gethostname()
226-
# Test if hostname is usable by attempting to bind to it
227-
try:
228-
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
229-
test_socket.bind((hostname, 0))
230-
except (socket.error, socket.gaierror):
231-
# If hostname is not usable, fall back to localhost
232-
logger.warning(
233-
f"Hostname '{hostname}' is not usable, falling back to '127.0.0.1'"
234-
)
235-
hostname = "127.0.0.1"
236-
237-
os.environ["VLLM_NIXL_SIDE_CHANNEL_HOST"] = hostname
238-
os.environ["VLLM_NIXL_SIDE_CHANNEL_PORT"] = str(config.side_channel_port)
239-
logger.debug(f"Set NIXL side channel to {hostname}:{config.side_channel_port}")

0 commit comments

Comments
 (0)