Skip to content

Commit f6c4544

Browse files
committed
bugdfix coordinator
Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
1 parent b9c9d89 commit f6c4544

File tree

2 files changed

+9
-39
lines changed

2 files changed

+9
-39
lines changed

tests/distributed/omni_coordinator/test_omni_coordinator.py

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@ def test_omni_coordinator_registration_broadcast():
4242
OmniCoordinator publishes an InstanceList containing all registered instances.
4343
"""
4444
coordinator = OmniCoordinator(
45-
router_zmq_addr="tcp://127.0.0.1:0",
46-
pub_zmq_addr="tcp://127.0.0.1:0",
45+
router_zmq_addr="tcp://127.0.0.1:15555",
46+
pub_zmq_addr="tcp://127.0.0.1:15556",
4747
heartbeat_timeout=1000.0,
4848
)
49-
router_addr, pub_addr = coordinator.get_endpoints()
49+
router_addr, pub_addr = "tcp://127.0.0.1:15555", "tcp://127.0.0.1:15556"
5050

5151
sub_ctx = zmq.Context.instance()
5252
sub = sub_ctx.socket(zmq.SUB)
@@ -85,11 +85,11 @@ def test_omni_coordinator_heartbeat_timeout_handling():
8585
OmniCoordinator marks it as unhealthy and excludes it from the active list.
8686
"""
8787
coordinator = OmniCoordinator(
88-
router_zmq_addr="tcp://127.0.0.1:0",
89-
pub_zmq_addr="tcp://127.0.0.1:0",
88+
router_zmq_addr="tcp://127.0.0.1:25555",
89+
pub_zmq_addr="tcp://127.0.0.1:25556",
9090
heartbeat_timeout=5.0,
9191
)
92-
router_addr, pub_addr = coordinator.get_endpoints()
92+
router_addr, pub_addr = "tcp://127.0.0.1:25555", "tcp://127.0.0.1:25556"
9393

9494
sub_ctx = zmq.Context.instance()
9595
sub = sub_ctx.socket(zmq.SUB)
@@ -132,10 +132,6 @@ def test_omni_coordinator_heartbeat_timeout_handling():
132132
assert "tcp://stage:b" in addrs
133133
assert "tcp://stage:c" not in addrs
134134

135-
info_c = coordinator.get_instance("tcp://stage:c")
136-
assert info_c is not None
137-
assert info_c.status == StageStatus.ERROR
138-
139135
client_a.close()
140136
client_b.close()
141137
dealer_c.close(0)
@@ -150,11 +146,11 @@ def test_omni_coordinator_instance_shutdown_handling():
150146
OmniCoordinator removes it from the active list and broadcasts an updated list.
151147
"""
152148
coordinator = OmniCoordinator(
153-
router_zmq_addr="tcp://127.0.0.1:0",
154-
pub_zmq_addr="tcp://127.0.0.1:0",
149+
router_zmq_addr="tcp://127.0.0.1:35555",
150+
pub_zmq_addr="tcp://127.0.0.1:35556",
155151
heartbeat_timeout=1000.0,
156152
)
157-
router_addr, pub_addr = coordinator.get_endpoints()
153+
router_addr, pub_addr = "tcp://127.0.0.1:35555", "tcp://127.0.0.1:35556"
158154

159155
sub_ctx = zmq.Context.instance()
160156
sub = sub_ctx.socket(zmq.SUB)
@@ -178,10 +174,6 @@ def test_omni_coordinator_instance_shutdown_handling():
178174
assert msg is not None
179175
assert len(msg["instances"]) == 0
180176

181-
info = coordinator.get_instance("tcp://stage:shutdown")
182-
assert info is not None
183-
assert info.status == StageStatus.DOWN
184-
185177
client.close()
186178
coordinator.close()
187179
sub.close(0)

vllm_omni/distributed/omni_coordinator/omni_coordinator.py

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -70,34 +70,12 @@ def __init__(
7070
self._heartbeat_thread = threading.Thread(target=self._heartbeat_loop, daemon=True)
7171
self._heartbeat_thread.start()
7272

73-
def get_endpoints(self) -> tuple[str, str]:
74-
"""Return (router_addr, pub_addr) as actually bound.
75-
76-
Useful when binding to tcp://127.0.0.1:0 to obtain the assigned port.
77-
"""
78-
router_ep = self._router.getsockopt(zmq.LAST_ENDPOINT).decode("ascii")
79-
pub_ep = self._pub.getsockopt(zmq.LAST_ENDPOINT).decode("ascii")
80-
return (router_ep, pub_ep)
81-
82-
83-
def get_all_instances(self) -> InstanceList:
84-
"""Return an :class:`InstanceList` of all known instances."""
85-
with self._lock:
86-
instances = list(self._instances.values())
87-
return InstanceList(instances=instances, timestamp=time())
88-
8973
def get_active_instances(self) -> InstanceList:
9074
"""Return an :class:`InstanceList` of active (UP) instances only."""
9175
with self._lock:
9276
active = [inst for inst in self._instances.values() if inst.status == StageStatus.UP]
9377
return InstanceList(instances=active, timestamp=time())
9478

95-
def get_instance(self, zmq_addr: str) -> InstanceInfo | None:
96-
"""Return the :class:`InstanceInfo` for a specific instance."""
97-
with self._lock:
98-
return self._instances.get(zmq_addr)
99-
100-
10179
def add_new_instance(self, event: InstanceEvent) -> None:
10280
"""Add a new instance based on an incoming event."""
10381
with self._lock:

0 commit comments

Comments
 (0)