Add coordinator module and corresponding unit test#1465
Add coordinator module and corresponding unit test#1465NumberWan wants to merge 6 commits intovllm-project:mainfrom
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1dd914a75a
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| status_str = event.get("status") | ||
| if status_str is not None: | ||
| info.status = StageStatus(status_str) |
There was a problem hiding this comment.
Promote timed-out instance to UP on next heartbeat
When _check_heartbeat_timeouts() marks an instance ERROR, later heartbeat messages from OmniCoordClientForStage do not carry a status field, and _update_instance_info_locked() only updates info.status if status is present. That means a worker that briefly misses heartbeats can remain permanently ERROR (and excluded from published active instances) even after heartbeats resume, unless some separate explicit update with status='up' is sent.
Useful? React with 👍 / 👎.
| def _handle_event(self, event: dict[str, Any]) -> None: | ||
| """Dispatch an incoming event to the appropriate handler.""" | ||
| try: | ||
| zmq_addr = event.get("zmq_addr") |
There was a problem hiding this comment.
Validate event shape before calling .get in coordinator
_recv_loop() forwards any JSON value to _handle_event(), but _handle_event() assumes event is a mapping and immediately calls event.get(...). If a peer sends valid JSON that is not an object (for example [] or "oops"), this raises AttributeError, which is not caught by the current except (KeyError, ValueError, TypeError) block, so the recv thread exits and the coordinator stops processing all future stage events.
Useful? React with 👍 / 👎.
| instances_payload = payload.get("instances", []) | ||
| instances: list[InstanceInfo] = [] | ||
| for inst in instances_payload: | ||
| status_str = inst.get("status", StageStatus.UP.value) |
There was a problem hiding this comment.
Guard hub decoder against non-dict instance entries
The hub client assumes every element in payload["instances"] is a dict and directly calls inst.get(...). A malformed message containing a non-dict entry (e.g., a string) will raise AttributeError, and _recv_loop() does not catch that exception type, so the background subscriber thread can die and leave get_instance_list() permanently stale.
Useful? React with 👍 / 👎.
8526123 to
3a85b30
Compare
… test Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
f6c4544 to
cda6939
Compare
Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
lishunyang12
left a comment
There was a problem hiding this comment.
Left a few comments. Main concern is the TOCTOU race in _handle_event — the check-then-act on self._instances without holding the lock can cause duplicate registrations under concurrent events. The rest are smaller cleanups (license headers, type annotations, hardcoded test ports).
…types, dynamic ports Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
…types, dynamic ports Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
…types, dynamic ports Signed-off-by: Jeff Wan <wantszkin2003@gmail.com>
hsliuustc0106
left a comment
There was a problem hiding this comment.
PR #1465 Review: Add coordinator module and corresponding unit test
Overview
This PR implements the OmniCoordinator module for distributed stage instance coordination (Major PR 1 from #984). The implementation closely follows the design document with a few important deviations.
Critical Issues: 4 found
1. Instance Registry Never Removes DOWN Instances (Design Deviation)
File: omni_coordinator.py:271-276
The design document states instances should be "removed from the internal registry" when DOWN, but the implementation only marks them as DOWN and keeps them forever:
def _remove_instance_locked(self, event: InstanceEvent) -> None:
info.status = StageStatus.DOWN # Only marks, never removesImpact: Memory leak, unbounded registry growth over time.
2. ZMQ Context Singleton Causes Test/Production Issues
Files: omni_coordinator.py:51, omni_coord_client_for_stage.py:27, omni_coord_client_for_hub.py:28
Using zmq.Context.instance() shares a global context across all components:
self._ctx = zmq.Context.instance() # Global singletonImpact: Tests interfere with each other, closing one coordinator can break others, cannot run multiple coordinators.
Recommendation: Use per-coordinator context: self._ctx = zmq.Context()
3. No Connection State Machine / Reconnection Logic
Files: omni_coord_client_for_stage.py, omni_coord_client_for_hub.py
The design mentions reliability with retry mechanisms, but there's no reconnection logic if the ZMQ connection drops. Network blips will cause silent failures with no recovery path.
Recommendation: Add connection state machine with exponential backoff reconnection.
4. Scalability Limit Not Documented
The single-threaded coordinator has inherent scalability limits:
| Instances | Messages/sec | Status |
|---|---|---|
| < 500 | < 150 | ✅ OK |
| 500-2000 | 150-600 | |
| > 2000 | > 600 | 🔴 Bottleneck risk |
Recommendation: Add max_instances parameter or document expected limits.
Strengths
- ✅ Good test coverage matching design spec
- ✅ Message schemas match design document exactly
- ✅ Clean separation of concerns (messages, coordinator, clients)
- ✅ Thread-safe with proper locking
- ✅ ZMQ best practices (RCVTIMEO, NOBLOCK)
Recommendation
Address issues #1 and #2 before merge. Issue #3 (reconnection) could be documented as Phase 2 work if needed.
| zmq_addr = event.zmq_addr | ||
| info = self._instances.get(zmq_addr) | ||
| if info is None: | ||
| return |
There was a problem hiding this comment.
Design deviation: Instance never removed from registry
The design document states: "The OmniCoordinator internal registry entry for this instance has status StageStatus.DOWN and remove it from the internal registry"
But this implementation only marks as DOWN, never removes:
info.status = StageStatus.DOWN # Only marks, keeps in registryImpact: Memory leak over time - instances accumulate indefinitely.
Recommendation: Either remove from self._instances or add a cleanup mechanism with TTL for DOWN instances.
| self._heartbeat_timeout = heartbeat_timeout | ||
|
|
||
| self._ctx = zmq.Context.instance() | ||
| self._router = self._ctx.socket(zmq.ROUTER) |
There was a problem hiding this comment.
ZMQ Context Singleton Issue
Using zmq.Context.instance() creates a global shared context:
self._ctx = zmq.Context.instance()Problems:
- Tests interfere with each other
- Closing one coordinator can break others sharing the same context
- Cannot run multiple coordinators in the same process
Recommendation: Use a dedicated context per coordinator:
self._ctx = zmq.Context() # Not .instance()And call self._ctx.term() in close().
| self._instance_zmq_addr = instance_zmq_addr | ||
| self._stage_id = stage_id | ||
|
|
||
| self._ctx = zmq.Context.instance() |
There was a problem hiding this comment.
No reconnection logic
If the connection to the coordinator drops, this client has no way to recover:
self._socket.connect(self._coord_zmq_addr) # No error recoveryThe design mentions reliability with retry mechanisms, but there is no:
- Connection state tracking
- Automatic reconnection on failure
- Re-registration after reconnect
Recommendation: Add a connection state machine with reconnection logic, or document this as Phase 2 work.
Part of #984 - adds OmniCoordinator module
Summary
Add OmniCoordinator module for distributed stage instance coordination.
Related
Part of #984 - this PR implements the coordinator module.
ADD