[TRTLLM-8922][feat] gen-first disagg scheduling, part 2#12239
[TRTLLM-8922][feat] gen-first disagg scheduling, part 2#12239reasonsolo wants to merge 4 commits intoNVIDIA:mainfrom
Conversation
5764dcf to
cdce47c
Compare
📝 WalkthroughWalkthroughThis PR introduces generation-first scheduling support for disaggregated serving. Changes include adding Changes
Sequence Diagram(s)sequenceDiagram
participant User as User/CLI
participant Executor as Executor
participant CacheXcv as Cache Transceiver
participant ContextMgr as Context Manager
participant Router as Router
participant OpenAI as OpenAI Service
User->>Executor: start with schedule_style=generation_first
Executor->>CacheXcv: check_disagg_ctx_schedulable_status(new_requests)
CacheXcv->>ContextMgr: prepare_context_requests()
ContextMgr->>CacheXcv: TP consensus on ready IDs
CacheXcv->>CacheXcv: if ctx_need_pp_sync: PP allgather & consensus
CacheXcv-->>Executor: return prepared context requests
Executor->>OpenAI: send generation request first
OpenAI->>OpenAI: initiate streaming response pipeline
Executor->>OpenAI: send context request (concurrent)
OpenAI->>OpenAI: pipe context response via queue
OpenAI->>Router: prepare_servers(ready_servers)
Router->>Router: track prepared servers, skip re-preparation
OpenAI-->>User: stream generation chunks as they arrive
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Comment Tip CodeRabbit can suggest fixes for GitHub Check annotations.Configure the |
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tensorrt_llm/serve/router.py (1)
314-333:⚠️ Potential issue | 🟠 MajorDon’t hold
_lockacross server preparation I/O.
_prepare_server()can run the preparation hook and fetch/server_info. Awaiting it inside_monitor_servers()blocks routing and load accounting on every new server until that network work finishes.Suggested change
async with self._lock: if final_servers != self._servers: old_servers = self._servers.copy() self._servers = final_servers + added_servers = [ + server for server in final_servers if server not in old_servers + ] # Call handler for server list changes self._on_servers_updated(old_servers, self._servers) # Log removed servers for server in old_servers: if server not in final_servers: self._prepared_ready_servers.discard(server) self._server_info.pop(server, None) logger.info(f"Server {server} is removed") - # Log added servers - for server in final_servers: - if server not in old_servers: - await self._prepare_server(server) - logger.info(f"Server {server} is added") else: + added_servers = [] logger.debug( f"No change in {self._server_role} server list: {len(self._servers)} servers" ) + for server in added_servers: + await self._prepare_server(server) + logger.info(f"Server {server} is added")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/serve/router.py` around lines 314 - 333, The code currently awaits self._prepare_server(...) while holding self._lock in _monitor_servers, which blocks routing; instead, after computing old_servers and setting self._servers and calling _on_servers_updated, collect the list of newly added servers (servers in final_servers not in old_servers), release the lock, then for each added server await self._prepare_server(server) and log the addition; finally, re-acquire the lock only to update shared state such as self._prepared_ready_servers and self._server_info for the prepared servers. Keep removals and the initial swap of self._servers/_on_servers_updated inside the locked section, but do all network I/O (calls to _prepare_server) outside the lock.tensorrt_llm/llmapi/llm.py (1)
283-287:⚠️ Potential issue | 🟠 MajorDon’t cache the empty-dict RPC fallback as a success value.
GenerationExecutorProxy.get_disaggregated_params()now returns{}both when there are no params and when the RPC path is temporarily unavailable. Storing that result on first access prevents any retry, so a startup race can leaveschedule_style/ctx metadata empty for the whole lifetime of theLLM. Use a distinct failure sentinel or only cache a confirmed successful fetch.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tensorrt_llm/llmapi/llm.py` around lines 283 - 287, The current disaggregated_params() caches an empty dict returned by self._executor.get_disaggregated_params(), which can represent an RPC failure and prevents retries; change the logic in disaggregated_params() so that you only assign to self._disaggregated_params when the fetch returns a confirmed successful non-empty result (i.e., if result is truthy) or use a distinct failure sentinel (e.g., a private object) instead of caching {} — leave self._disaggregated_params as None on empty/failure so future calls will retry the RPC, and only persist the value when the returned dict is non-empty (or wrapped with an explicit success flag).
🧹 Nitpick comments (1)
tests/integration/defs/disaggregated/test_disaggregated.py (1)
793-801: Reusesetup_model_symlinkto avoid duplicated setup logic.This block duplicates the helper used across adjacent tests, which increases drift risk.
♻️ Proposed refactor
- src_dst_dict = { - llama_model_root: - f"{llm_venv.get_working_directory()}/TinyLlama/TinyLlama-1.1B-Chat-v1.0", - } - for src, dst in src_dst_dict.items(): - if not os.path.islink(dst): - os.makedirs(os.path.dirname(dst), exist_ok=True) - os.symlink(src, dst, target_is_directory=True) + setup_model_symlink(llm_venv, llama_model_root, + "TinyLlama/TinyLlama-1.1B-Chat-v1.0")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/defs/disaggregated/test_disaggregated.py` around lines 793 - 801, Replace the manual symlink creation loop with a call to the existing helper setup_model_symlink to remove duplication: identify the current src_dst_dict construction (using llama_model_root and f"{llm_venv.get_working_directory()}/TinyLlama/TinyLlama-1.1B-Chat-v1.0") and pass the same source and destination pair into setup_model_symlink so the helper performs the os.makedirs/os.symlink logic instead of duplicating it in the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tensorrt_llm/_torch/pyexecutor/py_executor.py`:
- Around line 2731-2736: The current computation of need_check_one uses a single
comprehension which can produce all([])==True when no requests match and also
uses a backslash continuation that triggers E131; change it to first build a
filtered list (e.g., non_gen_reqs = [req for req in self.active_requests if
req.py_disaggregated_params and req.py_disaggregated_params.schedule_style !=
DisaggScheduleStyle.GENERATION_FIRST]), then set need_check_one =
bool(non_gen_reqs) and need_check_one = need_check_one and
all(req.is_disagg_generation_transmission_in_progress for req in non_gen_reqs)
(or combine into a single boolean expression that checks non-emptiness before
all), and remove the backslash line continuation; ensure this logic is used
before computing at_least_num so at_least_num is not forced to 1 incorrectly.
In `@tensorrt_llm/executor/proxy.py`:
- Around line 396-400: Replace the broad Exception handler in the block that
calls self.rpc_client.get_disaggregated_params().remote() so only RPC transport
errors are caught: import RPCError from .rpc.rpc_common and change the except to
"except RPCError as e" while preserving the current logger.warning fallback and
return behavior; do not swallow other exceptions so programming/serialization
errors will propagate.
In `@tensorrt_llm/llmapi/disagg_utils.py`:
- Around line 181-184: When parsing config (around the config.node_id = node_id
/ config.schedule_style = schedule_style assignments) validate inputs
immediately: if node_id is not None assert it is an int and >= 0 (otherwise
raise ValueError with a clear message), and if schedule_style is truthy validate
it against a small allowed set (add a constant like ALLOWED_SCHEDULE_STYLES or
reuse an existing enum) and raise ValueError if it’s not one of the allowed
values; only assign config.node_id and config.schedule_style after these checks
so invalid values fail fast with descriptive errors.
In `@tensorrt_llm/serve/openai_client.py`:
- Line 130: The debug line in openai_client.py currently logs the full
request.disaggregated_params which can leak sensitive or large internal data;
update the logger.debug call in the context of the Send logic (where
logger.debug, self._role, request.disaggregated_params and url are referenced)
to avoid dumping the full payload — instead log stable, safe identifiers (e.g.,
request.id or request.request_id, model name/version, and the target url) plus a
small metadata summary such as the size or presence flag of disaggregated_params
(e.g., "has_disaggregated_params=True" and/or byte length) so identifiers are
recorded without exposing opaque_state, IPC handles, or endpoints.
In `@tensorrt_llm/serve/openai_disagg_service.py`:
- Around line 398-422: The background consumer task created as consume_task from
_consume_gen can leak if the caller stops iterating the returned async generator
from _yield_from_queue early; modify _yield_from_queue to ensure the
consume_task is cancelled and awaited when the generator is closed or finishes:
within _yield_from_queue wrap the loop in try/finally, and in the finally block
check consume_task (the task created from _consume_gen) and if not done call
consume_task.cancel() and await it (suppress CancelledError), so the background
work tied to gen_response and queue is properly cleaned up; reference
_consume_gen, consume_task, _yield_from_queue, gen_response, and queue when
making the change.
In `@tensorrt_llm/serve/router.py`:
- Around line 195-209: Race between _prepare_server and remove_server can leave
stale entries in _server_info and _prepared_ready_servers after awaits; to fix,
before mutating state after the await (after calling _server_preparation_func
and _fetch_server_info) re-verify the server is still registered under the
router lock (or alternatively maintain an in-flight preparation set keyed by
server) and only then write self._server_info[server] and add to
self._prepared_ready_servers; use the existing router lock or a new per-server
in-flight set to prevent committing when remove_server has removed the server.
In `@tests/integration/defs/accuracy/test_disaggregated_serving.py`:
- Around line 1619-1624: The `@pytest.mark.parametrize` decorator block has
misaligned indentation for the ids list causing Flake8 E126; fix by aligning the
ids list and closing parentheses to use 4-space indentation consistent with the
opening line, e.g., ensure the "ids=[" line lines up under the parameter list
and the closing "])" aligns with the decorator expression; update the decorator
that declares "ctx_tp_pp" and its parameter tuples so the "ids" entries
("ctx_tp1pp1", ...) and the final bracket/parenthesis are indented using 4-space
blocks to match the rest of the decorator.
In `@tests/integration/defs/disaggregated/test_disaggregated.py`:
- Around line 153-156: The overlap-specific client validation condition that
currently only matches the descriptor names "overlap" and "trtllm_sampler" needs
to include the new descriptors "overlap_gen_first" and "overlap_gen_first_pp4"
(or be broadened to match any descriptor starting with "overlap") so that these
configs use the overlap client matrix and exercise chat/chat-streaming parity;
update the conditional in
tests/integration/defs/disaggregated/test_disaggregated.py where descriptor
names are checked to add these two identifiers (or replace the exact-name check
with a startswith("overlap") check) so overlap_gen_first* uses the correct
client set.
- Around line 789-790: The parametrized test
test_disaggregated_overlap_gen_first references missing config files
(disagg_config_overlap_gen_first.yaml and
disagg_config_overlap_gen_first_pp4.yaml) which causes run_disaggregated_test to
fail when loading configs for ctx_pp4; fix by either adding the two YAML files
into test_configs with the expected settings (matching other disaggregated test
config structure) or update the mapping used by
test_disaggregated_overlap_gen_first to point to existing config filenames;
ensure the filenames referenced where the test builds the config mapping (the
entries for ctx_pp variants) match files present under test_configs so
run_disaggregated_test can load them at runtime.
---
Outside diff comments:
In `@tensorrt_llm/llmapi/llm.py`:
- Around line 283-287: The current disaggregated_params() caches an empty dict
returned by self._executor.get_disaggregated_params(), which can represent an
RPC failure and prevents retries; change the logic in disaggregated_params() so
that you only assign to self._disaggregated_params when the fetch returns a
confirmed successful non-empty result (i.e., if result is truthy) or use a
distinct failure sentinel (e.g., a private object) instead of caching {} — leave
self._disaggregated_params as None on empty/failure so future calls will retry
the RPC, and only persist the value when the returned dict is non-empty (or
wrapped with an explicit success flag).
In `@tensorrt_llm/serve/router.py`:
- Around line 314-333: The code currently awaits self._prepare_server(...) while
holding self._lock in _monitor_servers, which blocks routing; instead, after
computing old_servers and setting self._servers and calling _on_servers_updated,
collect the list of newly added servers (servers in final_servers not in
old_servers), release the lock, then for each added server await
self._prepare_server(server) and log the addition; finally, re-acquire the lock
only to update shared state such as self._prepared_ready_servers and
self._server_info for the prepared servers. Keep removals and the initial swap
of self._servers/_on_servers_updated inside the locked section, but do all
network I/O (calls to _prepare_server) outside the lock.
---
Nitpick comments:
In `@tests/integration/defs/disaggregated/test_disaggregated.py`:
- Around line 793-801: Replace the manual symlink creation loop with a call to
the existing helper setup_model_symlink to remove duplication: identify the
current src_dst_dict construction (using llama_model_root and
f"{llm_venv.get_working_directory()}/TinyLlama/TinyLlama-1.1B-Chat-v1.0") and
pass the same source and destination pair into setup_model_symlink so the helper
performs the os.makedirs/os.symlink logic instead of duplicating it in the test.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 7deb8312-4f52-42b8-9dff-e5d3c54493f2
📒 Files selected for processing (13)
tensorrt_llm/_torch/disaggregation/native/py_cache_transceiver.pytensorrt_llm/_torch/pyexecutor/py_executor.pytensorrt_llm/commands/serve.pytensorrt_llm/disaggregated_params.pytensorrt_llm/executor/proxy.pytensorrt_llm/llmapi/disagg_utils.pytensorrt_llm/llmapi/llm.pytensorrt_llm/serve/openai_client.pytensorrt_llm/serve/openai_disagg_service.pytensorrt_llm/serve/router.pytests/integration/defs/accuracy/test_disaggregated_serving.pytests/integration/defs/disaggregated/test_disaggregated.pytests/unittest/disaggregated/test_py_cache_transceiver_mp.py
40ddad8 to
865c24c
Compare
tensorrt_llm/_torch/disaggregation/native/py_cache_transceiver.py
Outdated
Show resolved
Hide resolved
|
/bot run --disable-fail-fast |
|
/bot run --disable-fail-fast |
|
PR_Github #39590 [ run ] triggered by Bot. Commit: |
|
PR_Github #39590 [ run ] completed with state
|
|
PR_Github #40000 [ run ] triggered by Bot. Commit: |
|
PR_Github #40000 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #40017 [ run ] triggered by Bot. Commit: |
|
PR_Github #40017 [ run ] completed with state
|
df05160 to
33ba674
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #40041 [ run ] triggered by Bot. Commit: |
|
PR_Github #40041 [ run ] completed with state
|
|
/bot run --disable-fail-fast |
|
PR_Github #40287 [ run ] triggered by Bot. Commit: |
|
PR_Github #40287 [ run ] completed with state
|
Squashed from 6 commits for clean rebase onto main. Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
33ba674 to
3a6d694
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #40477 [ run ] triggered by Bot. Commit: |
|
PR_Github #40477 [ run ] completed with state
|
Add overlap gen-first and accuracy gen-first test entries to l0_dgx_h100.yml. Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
c9a1e81 to
1e9f0bb
Compare
|
/bot run --disable-fail-fast |
|
PR_Github #40497 [ run ] triggered by Bot. Commit: |
|
/bot run --disable-fail-fast |
|
PR_Github #40503 [ run ] triggered by Bot. Commit: |
|
PR_Github #40503 [ run ] completed with state
|
…isagg get_disaggregated_params() was wrapping ctx_info_endpoint in a list, but the protocol and dataclass both expect Optional[str]. This caused 400 Bad Request when the orchestrator forwarded the list value to the generation server's Pydantic model (extra="forbid"). Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
In the overlap scheduler, _send_kv_async can both start and complete a KV transfer in the same iteration (before _handle_responses runs). When this happens, _end_transfer_and_maybe_terminate would set state to CONTEXT_COMPLETE and free resources before a response was created, leaving a zombie request that never completes. Fix: in _end_transfer_and_maybe_terminate, detect fast-transfer (request still in active_requests) and create the response while state is still TRANS_IN_PROGRESS (required by C++ createResult), then end transfer and terminate. Also fix the all_gen_first scheduling path to use non-blocking transfer status checks, preventing indefinite blocking when all active requests are gen-first and waiting for peer info. Signed-off-by: Lizhi Zhou <1432185+reasonsolo@users.noreply.github.com>
|
/bot run --disable-fail-fast |
|
PR_Github #40541 [ run ] triggered by Bot. Commit: |
|
PR_Github #40541 [ run ] completed with state |
Summary by CodeRabbit
New Features
Improvements
Tests
Description
Test Coverage
PR Checklist
Please review the following before submitting your PR:
PR description clearly explains what and why. If using CodeRabbit's summary, please make sure it makes sense.
PR Follows TRT-LLM CODING GUIDELINES to the best of your knowledge.
Test cases are provided for new code paths (see test instructions)
Any new dependencies have been scanned for license and vulnerabilities
CODEOWNERS updated if ownership changes
Documentation updated as needed
Update tava architecture diagram if there is a significant design change in PR.
The reviewers assigned automatically/manually are appropriate for the PR.
Please check this after reviewing the above items as appropriate for this PR.
GitHub Bot Help
To see a list of available CI bot commands, please comment
/bot help.