Skip to content

Commit 734a34e

Browse files
committed
Revert "feat: (cherrypick) Support Dynamo KVBM with TRTLLM Disagg (#3527) (#5525)"
This reverts commit 2a315ec.
1 parent 2745242 commit 734a34e

File tree

7 files changed

+125
-166
lines changed

7 files changed

+125
-166
lines changed

components/src/dynamo/trtllm/main.py

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
SchedulerConfig,
2929
)
3030
from tensorrt_llm.llmapi.llm import SamplingParams
31-
from tensorrt_llm.llmapi.llm_args import KvCacheConnectorConfig
3231
from tensorrt_llm.llmapi.llm_utils import update_llm_args_with_extra_options
3332
from tensorrt_llm.llmapi.tokenizer import tokenizer_factory
3433
from tensorrt_llm.metrics import MetricsCollector
@@ -110,22 +109,6 @@ async def get_engine_runtime_config(
110109
return runtime_config
111110

112111

113-
def build_kv_connector_config(config: Config):
114-
if config.connector is not None:
115-
if config.connector == "kvbm":
116-
return KvCacheConnectorConfig(
117-
connector_module="kvbm.trtllm_integration.connector",
118-
connector_scheduler_class="DynamoKVBMConnectorLeader",
119-
connector_worker_class="DynamoKVBMConnectorWorker",
120-
)
121-
elif config.connector == "none":
122-
return None
123-
else:
124-
logging.error(f"Invalid connector: {config.connector}")
125-
sys.exit(1)
126-
return None
127-
128-
129112
async def worker():
130113
config = cmd_line_args()
131114

@@ -185,9 +168,6 @@ async def init(runtime: DistributedRuntime, config: Config):
185168
free_gpu_memory_fraction=config.free_gpu_memory_fraction
186169
)
187170

188-
if config.connector is not None and "kvbm" in config.connector:
189-
kv_cache_config.enable_partial_reuse = False
190-
191171
dynamic_batch_config = DynamicBatchConfig(
192172
enable_batch_size_tuning=True,
193173
enable_max_num_tokens_tuning=False,
@@ -197,8 +177,6 @@ async def init(runtime: DistributedRuntime, config: Config):
197177
capacity_scheduler_policy=CapacitySchedulerPolicy.GUARANTEED_NO_EVICT,
198178
dynamic_batch_config=dynamic_batch_config,
199179
)
200-
kv_connector_config = build_kv_connector_config(config)
201-
202180
modality = getattr(config, "modality", None) or "text"
203181
arg_map = {
204182
"model": model_path,
@@ -214,7 +192,6 @@ async def init(runtime: DistributedRuntime, config: Config):
214192
"max_beam_width": config.max_beam_width,
215193
"max_batch_size": config.max_batch_size,
216194
"return_perf_metrics": config.publish_events_and_metrics,
217-
"kv_connector_config": kv_connector_config,
218195
}
219196

220197
if config.extra_engine_args != "":

components/src/dynamo/trtllm/utils/trtllm_utils.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -281,13 +281,6 @@ def cmd_line_args():
281281
choices=get_reasoning_parser_names(),
282282
help="Reasoning parser name for the model. If not specified, no reasoning parsing is performed.",
283283
)
284-
parser.add_argument(
285-
"--connector",
286-
type=str,
287-
default="none",
288-
choices=["none", "kvbm"],
289-
help="Connector to use for the model.",
290-
)
291284
add_config_dump_args(parser)
292285
parser.add_argument(
293286
"--custom-jinja-template",
@@ -387,7 +380,6 @@ def cmd_line_args():
387380
config.enable_local_indexer = str(args.enable_local_indexer).lower() == "true"
388381
# Derive use_kv_events from publish_events_and_metrics
389382
config.use_kv_events = config.publish_events_and_metrics
390-
config.connector = args.connector
391383

392384
# Handle custom jinja template path expansion (environment variables and home directory)
393385
if args.custom_jinja_template:

docs/kvbm/trtllm-setup.md

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ To learn what KVBM is, please check [here](kvbm_architecture.md)
2525
> - Ensure that `etcd` and `nats` are running before starting.
2626
> - KVBM only supports TensorRT-LLM’s PyTorch backend.
2727
> - Disable partial reuse `enable_partial_reuse: false` in the LLM API config’s `kv_connector_config` to increase offloading cache hits.
28-
> - KVBM requires TensorRT-LLM v1.2.0rc2 or newer.
28+
> - KVBM requires TensorRT-LLM v1.1.0rc5 or newer.
2929
> - Enabling KVBM metrics with TensorRT-LLM is still a work in progress.
3030
3131
## Quick Start
@@ -106,16 +106,6 @@ curl localhost:8000/v1/chat/completions -H "Content-Type: application/json"
106106

107107
```
108108

109-
KVBM is also supported on the prefill worker of disaggregated serving. To launch the prefill worker, run:
110-
```bash
111-
# [DYNAMO] To serve an LLM model with dynamo
112-
python3 -m dynamo.trtllm \
113-
--model-path Qwen/Qwen3-0.6B \
114-
--served-model-name Qwen/Qwen3-0.6B \
115-
--extra-engine-args /tmp/kvbm_llm_api_config.yaml
116-
--disaggregation-mode prefill &
117-
```
118-
119109
Alternatively, can use "trtllm-serve" with KVBM by replacing the above two [DYNAMO] cmds with below:
120110
```bash
121111
trtllm-serve Qwen/Qwen3-0.6B --host localhost --port 8000 --backend pytorch --extra_llm_api_options /tmp/kvbm_llm_api_config.yaml

lib/bindings/kvbm/python/kvbm/trtllm_integration/connector/kvbm_connector_leader.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import os
66
from typing import List, Optional
77

8-
import tensorrt_llm
98
from kvbm import KvbmLeader
109
from kvbm.trtllm_integration.consolidator_config import is_truthy
1110
from kvbm.trtllm_integration.rust import KvbmRequest
@@ -119,12 +118,6 @@ def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
119118
output = RustSchedulerOutput()
120119

121120
for req in scheduler_output.new_requests:
122-
if not hasattr(req, "num_scheduled_tokens"):
123-
raise ValueError(
124-
f"""num_scheduled_tokens is not found in the SchedulerOutput!
125-
You're currently using TRTLLM {tensorrt_llm.__version__}
126-
The mimimum supported version is 1.2.0rc2"""
127-
)
128121
output.add_new_request(
129122
str(req.request_id),
130123
req.new_tokens,
@@ -142,14 +135,6 @@ def build_connector_meta(self, scheduler_output: SchedulerOutput) -> bytes:
142135
req.computed_position,
143136
)
144137

145-
output.add_num_scheduled_tokens(
146-
{
147-
str(req.request_id): req.num_scheduled_tokens
148-
for req in scheduler_output.new_requests
149-
+ scheduler_output.cached_requests
150-
}
151-
)
152-
153138
return self._connector.build_connector_metadata(output)
154139

155140
def get_num_new_matched_tokens(

lib/bindings/kvbm/src/block_manager/vllm/connector/leader/slot.rs

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ pub trait Slot: std::fmt::Debug {
110110
num_scheduled_tokens: usize,
111111
) -> Result<(), SlotError>;
112112

113+
// TRT-LLM does not include scheduled tokens in the scheduler output.
114+
// Ideally, we should have a dedicated implementation for the TRT-LLM slot.
115+
// However, since only this single function needs to be rewritten for now,
116+
// we keep it as a separate function in Slot.
117+
fn apply_scheduler_output_with_computed_position(
118+
&mut self,
119+
tokens: &[u32],
120+
block_ids: &[usize],
121+
computed_position: usize,
122+
is_new_request: bool,
123+
) -> Result<(), SlotError>;
124+
113125
fn record_start_iteration(&mut self, iteration: u64) -> Result<(), SlotError>;
114126

115127
fn mark_as_prefilling(&mut self, iteration: u64) -> Result<(), SlotError>;
@@ -630,6 +642,111 @@ impl Slot for VllmConnectorSlot {
630642
Ok(())
631643
}
632644

645+
#[tracing::instrument(level = "debug", skip_all, fields(request_id = self.request_id.as_str()))]
646+
fn apply_scheduler_output_with_computed_position(
647+
&mut self,
648+
tokens: &[u32],
649+
block_ids: &[usize],
650+
computed_position: usize,
651+
is_new_request: bool,
652+
) -> Result<(), SlotError> {
653+
// TRTLLM's KV Connector Manager will have (computed_position - external matches)
654+
// in onborading case
655+
if computed_position < self.current_position {
656+
tracing::debug!(
657+
"computed_position={} < current_position={}, so we are onboarding during prefilling phase",
658+
computed_position,
659+
self.current_position
660+
);
661+
return Ok(());
662+
}
663+
664+
// now we decide what we should do for the new computed tokens
665+
tracing::debug!(
666+
"applying scheduler output, computed_position={}, sequence_total_tokens={}",
667+
computed_position,
668+
self.sequence.total_tokens()
669+
);
670+
671+
if computed_position < self.sequence.total_tokens() {
672+
// no need to apply new tokens, since it's applied when created the slot during prefilling
673+
self.state = SlotState::Prefilling;
674+
} else {
675+
tracing::debug!(
676+
"appending {} newly decoded tokens to sequence",
677+
tokens.len()
678+
);
679+
self.sequence.extend(tokens.into()).unwrap();
680+
self.state = SlotState::Decoding;
681+
}
682+
683+
// apply new block_ids, this should be applied for both prefilling and decoding
684+
// because this is unknown when creating the slot
685+
if !block_ids.is_empty() {
686+
tracing::debug!("assigning {} new device blocks slot", block_ids.len());
687+
self.device_blocks.extend(block_ids);
688+
}
689+
690+
// This approach is fragile, but it’s the only way currently to skip evaluating
691+
// the device matched blocks and to avoid offloading them again.
692+
// TODO: Consider adding an indicator in the scheduler output to distinguish between
693+
// matched and unmatched device blocks/tokens from the scheduler.
694+
let maybe_have_device_matched_blocks =
695+
is_new_request && computed_position > 0 && self.evaluated_blocks == 0;
696+
697+
if maybe_have_device_matched_blocks {
698+
self.evaluated_blocks = (computed_position + 1) / self.block_size;
699+
}
700+
701+
let num_candidate_blocks =
702+
((computed_position + 1) / self.block_size).saturating_sub(self.evaluated_blocks);
703+
704+
if num_candidate_blocks > 0 {
705+
// do we have a mechanism for skipping gpu cache hit blocks? not sure yet.
706+
// for now, offload all the blocks to the host
707+
let offload_block_ids: Vec<usize> = self
708+
.device_blocks
709+
.iter()
710+
.skip(self.evaluated_blocks)
711+
.take(num_candidate_blocks)
712+
.copied()
713+
.collect::<Vec<_>>();
714+
715+
assert_eq!(
716+
offload_block_ids.len(),
717+
num_candidate_blocks,
718+
"device block overflow - candidate blocks exceed block count at offset {}",
719+
self.evaluated_blocks
720+
);
721+
722+
let offload_token_blocks: Vec<TokenBlock> = self
723+
.sequence
724+
.blocks()
725+
.iter()
726+
.skip(self.evaluated_blocks)
727+
.take(num_candidate_blocks)
728+
.cloned()
729+
.collect::<Vec<_>>();
730+
731+
self.offload_blocks(&offload_block_ids, &offload_token_blocks)
732+
.expect("failed to offload blocks");
733+
734+
self.evaluated_blocks += num_candidate_blocks;
735+
}
736+
737+
// done applying policy
738+
tracing::debug!(
739+
"done applying kv cache policy at current_position: {}; computed_position: {}",
740+
self.current_position,
741+
computed_position,
742+
);
743+
744+
// advance current position to computed position
745+
self.current_position = computed_position;
746+
747+
Ok(())
748+
}
749+
633750
fn record_start_iteration(&mut self, iteration: u64) -> Result<(), SlotError> {
634751
if self.iteration_first_scheduled.is_none() {
635752
self.iteration_first_scheduled = Some(iteration);

lib/bindings/kvbm/src/block_manager/vllm/connector/trtllm_leader.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -351,16 +351,11 @@ impl Leader for KvConnectorLeader {
351351
slot.state()
352352
);
353353

354-
let scheduled_tokens = *scheduler_output
355-
.num_scheduled_tokens
356-
.get(request_id)
357-
.unwrap_or(&0);
358-
359-
slot.apply_scheduler_output(
354+
slot.apply_scheduler_output_with_computed_position(
360355
&new_req.prompt_token_ids,
361356
&new_req.block_ids,
362357
new_req.num_computed_tokens,
363-
scheduled_tokens,
358+
true,
364359
)?;
365360

366361
if let Some(pending_ops) = slot.take_pending_operations() {
@@ -387,16 +382,11 @@ impl Leader for KvConnectorLeader {
387382
.lock()
388383
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
389384

390-
let scheduled_tokens = *scheduler_output
391-
.num_scheduled_tokens
392-
.get(request_id)
393-
.unwrap_or(&0);
394-
395-
slot.apply_scheduler_output(
385+
slot.apply_scheduler_output_with_computed_position(
396386
&cached_req.new_token_ids,
397387
&cached_req.new_block_ids,
398388
cached_req.num_computed_tokens,
399-
scheduled_tokens,
389+
false,
400390
)?;
401391

402392
if let Some(pending_ops) = slot.take_pending_operations() {

0 commit comments

Comments
 (0)