diff --git a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py index 56ae1acf8571..a5157d6671b1 100644 --- a/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py +++ b/vllm/distributed/kv_transfer/kv_connector/v1/nixl_connector.py @@ -205,13 +205,13 @@ def get_num_new_matched_tokens( """ For remote prefill, pull all prompt blocks from remote asynchronously relative to engine execution. - + Args: request (Request): the request object. num_computed_tokens (int): the number of locally computed tokens for this request Returns: - * the number of tokens that can be loaded from the + * the number of tokens that can be loaded from the external KV cache beyond what is already computed. * true if the external KV cache tokens will be loaded asynchronously (between scheduler steps). @@ -687,14 +687,14 @@ def add_remote_agent(self, blocks from remote. In particular, handle both homogeneous and heterogeneous TP. The former - requires local rank_i to read from remote rank_i. - The latter, assuming D.world_size > P.world_size, requires that two or + requires local rank_i to read from remote rank_i. + The latter, assuming D.world_size > P.world_size, requires that two or more local TP worker share the xfer from a single TP worker. Here's an example: rank_offset p_remote_tp_rank - (kv split no) + (kv split no) -------------------------------- 0 0 Worker0 ---- 1st half of KV ----> Worker0 [ KV Cache ] / @@ -707,14 +707,14 @@ def add_remote_agent(self, Decoder TP workers Prefix TP workers (world_size=4) (world_size=2) - tp_ratio = 4 // 2 = 2 - - Considering the KV Caches, if P-Worker_i has cache size [2, num_blocksP, kv_heads, block_size, head_dim] + tp_ratio = 4 // 2 = 2 + + Considering the KV Caches, if P-Worker_i has cache size [2, num_blocksP, kv_heads, block_size, head_dim] then D-Worker_j has [2, num_blocksD, kv_heads//tp_ratio, block_size, head_dim]. Mind the "HND" layout format. - Assuming num_blocksD >= num_blocksP, D-Worker0 reads from P-Worker0 by preparing the kv_heads//tp_ratio + Assuming num_blocksD >= num_blocksP, D-Worker0 reads from P-Worker0 by preparing the kv_heads//tp_ratio first heads from all the slots of all the blocks. D-Worker1 will do the same, but reading the second split - along the kv_heads dimension, and so forth until "tp_ratio" D TP workers have pulled from P-Worker0. - + along the kv_heads dimension, and so forth until "tp_ratio" D TP workers have pulled from P-Worker0. + Note that the above will also hold true for the homogeneous TP case, where tp_ratio evaluates to 1. Regarding MLA case, the cache is replicated across TP workers so the rank_offset will just always be 0 @@ -904,8 +904,14 @@ def _pop_done_transfers( for handle, _xfer_stime in handles: xfer_state = self.nixl_wrapper.check_xfer_state(handle) if xfer_state == "DONE": + logger.info( + "Transfer KVCache for req %s has completed with handle %s", + req_id, handle) self.nixl_wrapper.release_xfer_handle(handle) elif xfer_state == "PROC": + logger.info( + "Transfer KVCache for req %s still in progress with handle %s", + req_id, handle) in_progress = True continue else: @@ -923,7 +929,7 @@ def start_load_kv(self, metadata: NixlConnectorMetadata): """ for req_id, meta in metadata.requests.items(): remote_engine_id = meta.remote_engine_id - logger.debug( + logger.info( "start_load_kv for request %s from remote engine %s. " "Num local_block_ids: %s. Num remote_block_ids: %s. ", req_id, remote_engine_id, len(meta.local_block_ids), diff --git a/vllm/v1/core/sched/scheduler.py b/vllm/v1/core/sched/scheduler.py index fe552db74e2f..e720f8ef30b6 100644 --- a/vllm/v1/core/sched/scheduler.py +++ b/vllm/v1/core/sched/scheduler.py @@ -343,9 +343,11 @@ def schedule(self) -> SchedulerOutput: is_ready = self._update_waiting_for_remote_kv(request) if is_ready: request.status = RequestStatus.WAITING + logger.info("[wxl debug] %s has already received kvcache and enter into waiting status.", + request.request_id) else: - logger.debug( - "%s is still in WAITING_FOR_REMOTE_KVS state.", + logger.info( + "[wxl debug] %s is still in WAITING_FOR_REMOTE_KVS state.", request.request_id) self.waiting.pop_request() skipped_waiting_requests.prepend_request(request) diff --git a/vllm/v1/engine/async_llm.py b/vllm/v1/engine/async_llm.py index 3754570dfaaa..792e46bd762c 100644 --- a/vllm/v1/engine/async_llm.py +++ b/vllm/v1/engine/async_llm.py @@ -224,7 +224,7 @@ async def add_request( data_parallel_rank: Optional[int] = None, ) -> RequestOutputCollector: """Add new request to the AsyncLLM.""" - + logger.info("[wxl debug] Request %s start add_request.", request_id) if self.errored: raise EngineDeadError() @@ -301,6 +301,7 @@ async def generate( """ try: + logger.info("[wxl debug] Request %s start generate.", request_id) # We start the output_handler on the first call to generate() so # we can call __init__ before the event loop, which enables us # to handle startup failure gracefully in the OpenAI server. diff --git a/vllm/v1/engine/core.py b/vllm/v1/engine/core.py index e2fdf6f8a11c..00180c30c0e1 100644 --- a/vllm/v1/engine/core.py +++ b/vllm/v1/engine/core.py @@ -195,7 +195,6 @@ def add_request(self, request: EngineCoreRequest): not self.scheduler.get_kv_connector()): logger.warning("Got kv_transfer_params, but no KVConnector found. " "Disabling KVTransfer for this request.") - self.scheduler.add_request(req) def abort_requests(self, request_ids: list[str]): @@ -638,7 +637,7 @@ def _process_engine_step(self) -> bool: def _handle_client_request(self, request_type: EngineCoreRequestType, request: Any) -> None: """Dispatch request from client.""" - + logger.info("[wxl debug] EngineCore handling client request %s.", request.request_id) if request_type == EngineCoreRequestType.ADD: self.add_request(request) elif request_type == EngineCoreRequestType.ABORT: diff --git a/vllm/v1/engine/output_processor.py b/vllm/v1/engine/output_processor.py index 2bcd61d1f0aa..6f8939756ea4 100644 --- a/vllm/v1/engine/output_processor.py +++ b/vllm/v1/engine/output_processor.py @@ -19,7 +19,8 @@ from vllm.v1.engine.parallel_sampling import ParentRequest from vllm.v1.metrics.stats import (IterationStats, LoRARequestStates, RequestStateStats) - +from vllm.logger import init_logger +logger = init_logger(__name__) class RequestOutputCollector: """ @@ -323,6 +324,7 @@ def add_request( request_index: int = 0, queue: Optional[RequestOutputCollector] = None, ) -> None: + logger.info("[wxl debug] Request %s start outputprocessor add_request.", request.request_id) request_id = request.request_id if request_id in self.request_states: raise ValueError(f"Request id {request_id} already running.") @@ -351,17 +353,17 @@ def process_outputs( 1) Compute stats for logging 2) Detokenize 3) Create and handle RequestOutput objects: - * If there is a queue (for usage with AsyncLLM), + * If there is a queue (for usage with AsyncLLM), put the RequestOutput objects into the queue for handling by the per-request generate() tasks. - * If there is no queue (for usage with LLMEngine), + * If there is no queue (for usage with LLMEngine), return a list of RequestOutput objects. NOTE FOR DEVELOPERS vLLM V1 minimizes the number of python loops over the full - batch to ensure system overheads are minimized. This is the + batch to ensure system overheads are minimized. This is the only function that should loop over EngineCoreOutputs. If you need to touch every element of the batch, do it from @@ -433,6 +435,7 @@ def process_outputs( self._update_stats_from_finished(req_state, finish_reason, iteration_stats) + logger.info("[wxl debug] Request %s outputprocessor processed output and will return to client.", req_id) self.lora_states.update_iteration_stats(iteration_stats) return OutputProcessorOutput( diff --git a/vllm/v1/engine/processor.py b/vllm/v1/engine/processor.py index 9fc52543efde..669cddee38a6 100644 --- a/vllm/v1/engine/processor.py +++ b/vllm/v1/engine/processor.py @@ -25,6 +25,8 @@ validate_guidance_grammar) from vllm.v1.structured_output.backend_xgrammar import ( validate_xgrammar_grammar) +from vllm.logger import init_logger +logger = init_logger(__name__) class Processor: @@ -225,7 +227,7 @@ def process_inputs( priority: int = 0, data_parallel_rank: Optional[int] = None, ) -> tuple[Optional[str], EngineCoreRequest]: - + logger.info("[wxl debug] Request %s start process_inputs.", request_id) # TODO(woosuk): Support pooling models. # TODO(woosuk): Support encoder-decoder models. self._validate_lora(lora_request)