Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions cpp/include/tensorrt_llm/batch_manager/llmRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,16 @@ class GenericLlmRequest
[](auto reason) { return reason == executor::FinishReason::kLENGTH; });
}

[[nodiscard]] bool isFinishedNormal() const noexcept
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used? Is this field (via the bindings) only accessed inside the flexKV implementation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, currently it’s only accessed inside the flexKV implementation, which can be found here. Is this acceptable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to infer that value from the other fields? We ideally don't include new fields in TRTLLM that are exclusively accessed by a specific kv connector implementation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by "normal" here? May you modify the function name to make it more expressive?

{
return std::all_of(mFinishReasons.begin(), mFinishReasons.end(),
[](auto reason)
{
return reason == executor::FinishReason::kEND_ID || reason == executor::FinishReason::kSTOP_WORDS
|| reason == executor::FinishReason::kLENGTH;
});
}

[[nodiscard]] bool isTimedOut() const
{
if (!mAllottedTimeMs.has_value())
Expand Down
1 change: 1 addition & 0 deletions cpp/tensorrt_llm/nanobind/batch_manager/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ void initBindings(nb::module_& m)
.def("set_finished_reason", &GenLlmReq::setFinishedReason, nb::arg("finish_reason"), nb::arg("beam"))
.def_prop_ro("is_finished", &GenLlmReq::isFinished)
.def_prop_ro("is_finished_due_to_length", &GenLlmReq::isFinishedDueToLength)
.def_prop_ro("is_finished_normal", &GenLlmReq::isFinishedNormal)
.def_prop_rw(
"context_current_position", &GenLlmReq::getContextCurrentPosition, &GenLlmReq::setContextCurrentPosition)
.def_prop_ro("prepopulated_prompt_len", &GenLlmReq::getPrepopulatedPromptLen)
Expand Down
1 change: 1 addition & 0 deletions cpp/tensorrt_llm/pybind/batch_manager/bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ void initBindings(pybind11::module_& m)
.def("set_finished_reason", &GenLlmReq::setFinishedReason, py::arg("finish_reason"), py::arg("beam"))
.def_property_readonly("is_finished", &GenLlmReq::isFinished)
.def_property_readonly("is_finished_due_to_length", &GenLlmReq::isFinishedDueToLength)
.def_property_readonly("is_finished_normal", &GenLlmReq::isFinishedNormal)
.def_property(
"context_current_position", &GenLlmReq::getContextCurrentPosition, &GenLlmReq::setContextCurrentPosition)
.def_property_readonly("prepopulated_prompt_len", &GenLlmReq::getPrepopulatedPromptLen)
Expand Down
11 changes: 11 additions & 0 deletions tensorrt_llm/_torch/pyexecutor/kv_cache_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,13 @@ def update_state_after_alloc(self, request: LlmRequest,
block_ids: The KV cacheblock IDs that were allocated.
"""

def wait_for_initialization(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we have layer_pre_hook and layer_post_hook, can we have a plugin manager initialization hook?

"""
Some connectors need to wait for some resources to be initialized.
For example, FlexKV needs to wait for the FlexKV manager to be initialized.
"""
return


# An internal dataclass to handle async saving/loading requests.
@dataclass
Expand Down Expand Up @@ -570,3 +577,7 @@ def layer_pre_hook(self, module, *args):

def layer_post_hook(self, module, *args):
self.worker.save_kv_layer(module.layer_idx, torch.cuda.current_stream())

def wait_for_initialization(self):
if self.scheduler is not None:
self.scheduler.wait_for_initialization()
17 changes: 14 additions & 3 deletions tensorrt_llm/_torch/pyexecutor/py_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ def _maybe_init_kv_connector_manager(self):
module.register_forward_hook(
self.kv_connector_manager.layer_post_hook)

self.kv_connector_manager.wait_for_initialization()

def _event_loop_wrapper(self):
try:
with customized_gc_thresholds(
Expand Down Expand Up @@ -610,7 +612,7 @@ def profile_step():
if prev_device_step_time is None:
prev_device_step_time = "N/A" # Handle first iteration
else:
prev_device_step_time = f"{prev_device_step_time}ms"
prev_device_step_time = f"{prev_device_step_time} ms"
host_step_time = (end_time - start_time) * 1000 # milliseconds
formatted_timestamp = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S")
Expand All @@ -620,7 +622,7 @@ def profile_step():
f"rank = {self.dist.rank}, "
f"currank_total_requests = {self.executor_request_queue.num_fetch_requests_cur_rank}/"
f"{self.executor_request_queue.num_fetch_requests}, "
f"host_step_time = {host_step_time}ms, "
f"host_step_time = {host_step_time:.3f} ms, "
f"prev_device_step_time = {prev_device_step_time}, "
f"timestamp = {formatted_timestamp}, "
f"num_scheduled_requests: {self.num_scheduled_requests}, "
Expand Down Expand Up @@ -1302,7 +1304,6 @@ def _kv_connector_start_batch(self, scheduled_batch):
if self.kv_connector_manager:
self.kv_connector_manager.take_scheduled_requests_pending_load(
scheduled_batch)
self.kv_connector_manager.handle_metadata()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May you explain why is this removed?

self.kv_connector_manager.worker.start_load_kv(
torch.cuda.current_stream())

Expand Down Expand Up @@ -1348,6 +1349,10 @@ def _executor_loop(self):
finished_requests = []

can_queue = self._can_queue(scheduled_batch)

if self.kv_connector_manager:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we moving handle_metadata around? This will break all other implementations of the kv connector.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we moving handle_metadata around? This will break all other implementations of the kv connector.

Can the unit tests already added to TRT-LLM detect and ensure that functionality is not broken?

self.kv_connector_manager.handle_metadata()

if can_queue:
if self.kv_cache_transceiver:
# For generation requests which have completed KV cache transfer
Expand Down Expand Up @@ -1578,6 +1583,9 @@ def _executor_loop_overlap(self):

self._pause_requests(scheduled_batch.paused_requests)

if self.kv_connector_manager:
self.kv_connector_manager.handle_metadata()

can_queue = self._can_queue(scheduled_batch)
if can_queue:
if self.kv_cache_transceiver:
Expand Down Expand Up @@ -2634,6 +2642,9 @@ def _handle_responses(self):
self.ctx_in_transmission_counter))
else:
requests_to_terminate.append(request)

if self.kv_connector_manager is not None:
self.resource_manager.free_slot_only(request)
else:
new_active_requests.append(request)

Expand Down
1 change: 1 addition & 0 deletions tensorrt_llm/_torch/pyexecutor/py_executor_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def _get_mapping(_mapping: Mapping) -> Mapping:
tp_size=tensorrt_llm.mpi_world_size(),
gpus_per_node=tensorrt_llm.default_gpus_per_node(),
rank=tensorrt_llm.mpi_rank())
executor_config.mapping = mapping
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code change should keep in mind that other types of connector exist too. Overwriting executor_config.mapping here may not be the intent of other Connectors. Maybe you can guard this exclusively for FlexKV connector?

else:
mapping = copy.deepcopy(_mapping)
mapping.rank = tensorrt_llm.mpi_rank()
Expand Down
10 changes: 10 additions & 0 deletions tensorrt_llm/_torch/pyexecutor/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,16 @@ def reorder_pipeline(self,
for resource_manager in resource_manager_list:
self.resource_managers.move_to_end(resource_manager)

def free_slot_only(self, request: LlmRequest):
"""Only free the slot for the request, without freeing other resources.
This is used to release the slot early when decode finishes, before
the put task completes.
"""
seq_slot_manager = self.get_resource_manager(
ResourceManagerType.SEQ_SLOT_MANAGER)
if seq_slot_manager is not None:
seq_slot_manager.free_resources(request)


class PeftCacheManager(BaseResourceManager):

Expand Down