Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,13 @@
"""
For remote prefill, pull all prompt blocks from remote
asynchronously relative to engine execution.
Args:
request (Request): the request object.
num_computed_tokens (int): the number of locally
computed tokens for this request
Returns:
* the number of tokens that can be loaded from the
* the number of tokens that can be loaded from the
external KV cache beyond what is already computed.
* true if the external KV cache tokens will be loaded
asynchronously (between scheduler steps).
Expand Down Expand Up @@ -687,14 +687,14 @@
blocks from remote.
In particular, handle both homogeneous and heterogeneous TP. The former
requires local rank_i to read from remote rank_i.
The latter, assuming D.world_size > P.world_size, requires that two or
requires local rank_i to read from remote rank_i.
The latter, assuming D.world_size > P.world_size, requires that two or
more local TP worker share the xfer from a single TP worker.
Here's an example:
rank_offset p_remote_tp_rank
(kv split no)
(kv split no)
--------------------------------
0 0 Worker0 ---- 1st half of KV ----> Worker0 [ KV Cache ]
/
Expand All @@ -707,14 +707,14 @@
Decoder TP workers Prefix TP workers
(world_size=4) (world_size=2)
tp_ratio = 4 // 2 = 2
Considering the KV Caches, if P-Worker_i has cache size [2, num_blocksP, kv_heads, block_size, head_dim]
tp_ratio = 4 // 2 = 2
Considering the KV Caches, if P-Worker_i has cache size [2, num_blocksP, kv_heads, block_size, head_dim]
then D-Worker_j has [2, num_blocksD, kv_heads//tp_ratio, block_size, head_dim]. Mind the "HND" layout format.
Assuming num_blocksD >= num_blocksP, D-Worker0 reads from P-Worker0 by preparing the kv_heads//tp_ratio
Assuming num_blocksD >= num_blocksP, D-Worker0 reads from P-Worker0 by preparing the kv_heads//tp_ratio
first heads from all the slots of all the blocks. D-Worker1 will do the same, but reading the second split
along the kv_heads dimension, and so forth until "tp_ratio" D TP workers have pulled from P-Worker0.
along the kv_heads dimension, and so forth until "tp_ratio" D TP workers have pulled from P-Worker0.
Note that the above will also hold true for the homogeneous TP case, where tp_ratio evaluates to 1.
Regarding MLA case, the cache is replicated across TP workers so the rank_offset will just always be 0
Expand Down Expand Up @@ -904,8 +904,14 @@
for handle, _xfer_stime in handles:
xfer_state = self.nixl_wrapper.check_xfer_state(handle)
if xfer_state == "DONE":
logger.info(
"Transfer KVCache for req %s has completed with handle %s",

Check failure on line 908 in vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py:908:81: E501 Line too long (83 > 80)
req_id, handle)
Comment on lines +907 to +909
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This informational log is inside a loop that iterates over transfer handles. In a production environment with many transfers, this could lead to excessive logging and performance degradation. Please consider changing this log to the DEBUG level.

Suggested change
logger.info(
"Transfer KVCache for req %s has completed with handle %s",
req_id, handle)
logger.debug(
"Transfer KVCache for req %s has completed with handle %s",
req_id, handle)

self.nixl_wrapper.release_xfer_handle(handle)
elif xfer_state == "PROC":
logger.info(
"Transfer KVCache for req %s still in progress with handle %s",

Check failure on line 913 in vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py:913:81: E501 Line too long (87 > 80)
req_id, handle)
Comment on lines +912 to +914
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This informational log is inside a loop that iterates over transfer handles. In a production environment with many transfers, this could lead to excessive logging and performance degradation. Please consider changing this log to the DEBUG level.

Suggested change
logger.info(
"Transfer KVCache for req %s still in progress with handle %s",
req_id, handle)
logger.debug(
"Transfer KVCache for req %s still in progress with handle %s",
req_id, handle)

in_progress = True
continue
else:
Expand All @@ -923,7 +929,7 @@
"""
for req_id, meta in metadata.requests.items():
remote_engine_id = meta.remote_engine_id
logger.debug(
logger.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Changing this log from DEBUG to INFO could result in verbose logging in production, as start_load_kv is called frequently. This might impact performance. It's recommended to revert this to DEBUG level.

Suggested change
logger.info(
logger.debug(

"start_load_kv for request %s from remote engine %s. "
"Num local_block_ids: %s. Num remote_block_ids: %s. ", req_id,
remote_engine_id, len(meta.local_block_ids),
Expand Down
6 changes: 4 additions & 2 deletions vllm/v1/core/sched/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,12 @@
is_ready = self._update_waiting_for_remote_kv(request)
if is_ready:
request.status = RequestStatus.WAITING
logger.info("[wxl debug] %s has already received kvcache and enter into waiting status.",
request.request_id)

Check failure on line 347 in vllm/v1/core/sched/scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/v1/core/sched/scheduler.py:347:81: E501 Line too long (105 > 80)
Comment on lines +346 to +347
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

else:
logger.debug(
"%s is still in WAITING_FOR_REMOTE_KVS state.",
logger.info(
"[wxl debug] %s is still in WAITING_FOR_REMOTE_KVS state.",
request.request_id)

Check failure on line 351 in vllm/v1/core/sched/scheduler.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/v1/core/sched/scheduler.py:351:81: E501 Line too long (87 > 80)
Comment on lines +349 to 351
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix, and has been elevated from DEBUG to INFO level. This change could lead to excessive logging in production. It should be reverted to the original DEBUG level and the debug prefix should be removed.

Suggested change
logger.info(
"[wxl debug] %s is still in WAITING_FOR_REMOTE_KVS state.",
request.request_id)
logger.debug(
"%s is still in WAITING_FOR_REMOTE_KVS state.",
request.request_id)

self.waiting.pop_request()
skipped_waiting_requests.prepend_request(request)
continue
Expand Down
3 changes: 2 additions & 1 deletion vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async def add_request(
data_parallel_rank: Optional[int] = None,
) -> RequestOutputCollector:
"""Add new request to the AsyncLLM."""

logger.info("[wxl debug] Request %s start add_request.", request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

if self.errored:
raise EngineDeadError()

Expand Down Expand Up @@ -301,6 +301,7 @@ async def generate(
"""

try:
logger.info("[wxl debug] Request %s start generate.", request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

# We start the output_handler on the first call to generate() so
# we can call __init__ before the event loop, which enables us
# to handle startup failure gracefully in the OpenAI server.
Expand Down
3 changes: 1 addition & 2 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ def add_request(self, request: EngineCoreRequest):
not self.scheduler.get_kv_connector()):
logger.warning("Got kv_transfer_params, but no KVConnector found. "
"Disabling KVTransfer for this request.")

self.scheduler.add_request(req)

def abort_requests(self, request_ids: list[str]):
Expand Down Expand Up @@ -638,7 +637,7 @@ def _process_engine_step(self) -> bool:
def _handle_client_request(self, request_type: EngineCoreRequestType,
request: Any) -> None:
"""Dispatch request from client."""

logger.info("[wxl debug] EngineCore handling client request %s.", request.request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

if request_type == EngineCoreRequestType.ADD:
self.add_request(request)
elif request_type == EngineCoreRequestType.ABORT:
Comment on lines 637 to 643

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Badge Abort requests crash when logging request_id

The new info log dereferences request.request_id before inspecting the request type. When an abort is submitted the request argument is a list[str] of request IDs (see the ABORT branch immediately below), so this line raises AttributeError and prevents _handle_client_request from reaching self.abort_requests. Any client attempting to cancel a request will now crash the engine core loop. Guard the log by checking the request type or by logging the value without assuming a request_id attribute.

Useful? React with 👍 / 👎.

Expand Down
11 changes: 7 additions & 4 deletions vllm/v1/engine/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from vllm.v1.engine.parallel_sampling import ParentRequest
from vllm.v1.metrics.stats import (IterationStats, LoRARequestStates,
RequestStateStats)

from vllm.logger import init_logger
logger = init_logger(__name__)

class RequestOutputCollector:
"""
Expand Down Expand Up @@ -323,6 +324,7 @@
request_index: int = 0,
queue: Optional[RequestOutputCollector] = None,
) -> None:
logger.info("[wxl debug] Request %s start outputprocessor add_request.", request.request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

request_id = request.request_id
if request_id in self.request_states:
raise ValueError(f"Request id {request_id} already running.")
Expand Down Expand Up @@ -351,17 +353,17 @@
1) Compute stats for logging
2) Detokenize
3) Create and handle RequestOutput objects:
* If there is a queue (for usage with AsyncLLM),
* If there is a queue (for usage with AsyncLLM),
put the RequestOutput objects into the queue for
handling by the per-request generate() tasks.

* If there is no queue (for usage with LLMEngine),
* If there is no queue (for usage with LLMEngine),
return a list of RequestOutput objects.

NOTE FOR DEVELOPERS

vLLM V1 minimizes the number of python loops over the full
batch to ensure system overheads are minimized. This is the
batch to ensure system overheads are minimized. This is the
only function that should loop over EngineCoreOutputs.

If you need to touch every element of the batch, do it from
Expand Down Expand Up @@ -433,11 +435,12 @@
self._update_stats_from_finished(req_state, finish_reason,
iteration_stats)

logger.info("[wxl debug] Request %s outputprocessor processed output and will return to client.", req_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

self.lora_states.update_iteration_stats(iteration_stats)

return OutputProcessorOutput(
request_outputs=request_outputs,
reqs_to_abort=reqs_to_abort,

Check failure on line 443 in vllm/v1/engine/output_processor.py

View workflow job for this annotation

GitHub Actions / pre-commit

Ruff (E501)

vllm/v1/engine/output_processor.py:443:81: E501 Line too long (101 > 80)
)

def _update_stats_from_output(self, req_state: RequestState,
Expand Down
4 changes: 3 additions & 1 deletion vllm/v1/engine/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
validate_guidance_grammar)
from vllm.v1.structured_output.backend_xgrammar import (
validate_xgrammar_grammar)
from vllm.logger import init_logger
logger = init_logger(__name__)


class Processor:
Expand Down Expand Up @@ -225,7 +227,7 @@ def process_inputs(
priority: int = 0,
data_parallel_rank: Optional[int] = None,
) -> tuple[Optional[str], EngineCoreRequest]:

logger.info("[wxl debug] Request %s start process_inputs.", request_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This log message appears to be for debugging purposes, as indicated by the [wxl debug] prefix. Such temporary debugging code should be removed before merging into the main branch to avoid cluttering production logs.

# TODO(woosuk): Support pooling models.
# TODO(woosuk): Support encoder-decoder models.
self._validate_lora(lora_request)
Expand Down
Loading