diff --git a/lib/kvbm/src/block_manager/vllm/connector/leader.rs b/lib/kvbm/src/block_manager/vllm/connector/leader.rs index 363be2ace5..95207b5bb5 100644 --- a/lib/kvbm/src/block_manager/vllm/connector/leader.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader.rs @@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader { // grab the slot let shared_slot = self.slot_manager().get_slot(&request_id)?; - // mark the slot as finished + // Acquire lock BEFORE marking as finished + // This ensures we check state and prevent new operations from being created let mut slot = shared_slot .lock() .map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?; - slot.mark_as_finished(self.iteration_counter)?; - // todo: allow the request to resolve when it should exit - // the request may have some outstanding operations - // we would like to inform it to shutdown, then have it signal to the work that is officially gone, - // then we can remove the slot and trigger the worker to clean up as well. + // Mark the slot as finished (sets state to Finishing if there are operations, + // or Finished if all operations are complete) + slot.mark_as_finished(self.iteration_counter)?; // remove the request from the inflight requests self.inflight_requests.remove(&request_id); - // remove it from the manager as we will never use it again - self.slot_manager().remove_slot(&request_id)?; - // if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused // otherwise, we return true, which means there are still outstanding operations on gpu blocks which // must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side // of the connector api which will be used to inform vllm that the request is finished. if let SlotState::Finished = slot.state() { + // All operations complete - safe to remove slot and tell vLLM blocks are free + self.slot_manager().remove_slot(&request_id)?; Ok(false) } else { debug_assert!(matches!(slot.state(), SlotState::Finishing)); + // Still has pending operations - keep slot alive for worker to process + // Don't remove slot here. Worker needs it to process the finish event. + // Worker will remove it after verifying all operations are complete. + // The lock on the slot prevents new operations from being created in offload_blocks() Ok(true) } } diff --git a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs index c633f74d7c..1ddc70584a 100644 --- a/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs +++ b/lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs @@ -712,14 +712,35 @@ impl Slot for VllmConnectorSlot { } fn mark_as_finished(&mut self, _iteration: u64) -> Result<(), SlotError> { - self.state = SlotState::Finishing; - tracing::info!( - request_id = %self.request_id, - "request set to finish: cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", - self.tokens_cached_from_device, - self.tokens_cached_from_host, - self.tokens_cached_from_disk - ); + // Check if there are any pending operations + let has_pending_ops = self + .pending_operations + .as_ref() + .map(|ops| !ops.is_empty()) + .unwrap_or(false); + + if has_pending_ops { + // There are pending operations - need to wait for them to complete + self.state = SlotState::Finishing; + tracing::debug!( + request_id = %self.request_id, + pending_operations = self.pending_operations.as_ref().unwrap().len(), + "request set to finish (with pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", + self.tokens_cached_from_device, + self.tokens_cached_from_host, + self.tokens_cached_from_disk + ); + } else { + // No pending operations - can immediately mark as finished + self.state = SlotState::Finished; + tracing::debug!( + request_id = %self.request_id, + "request set to finished (no pending operations): cached_gpu_tokens: {}; cached_host_tokens: {}; cached_disk_tokens: {}", + self.tokens_cached_from_device, + self.tokens_cached_from_host, + self.tokens_cached_from_disk + ); + } Ok(()) } @@ -989,6 +1010,12 @@ impl VllmConnectorSlot { block_ids: &[BlockId], token_blocks: &[TokenBlock], ) -> Result<(), SlotError> { + // Check if slot is in Finishing state before creating operations + // If we're finishing, don't create new operations + if matches!(self.state, SlotState::Finishing | SlotState::Finished) { + return Ok(()); + } + assert!(block_ids.len() == token_blocks.len()); let operation_id = uuid::Uuid::new_v4(); @@ -1173,8 +1200,8 @@ impl LocalTransferEngine { task_token: CancellationToken, kvbm_metrics: KvbmMetrics, ) -> anyhow::Result<()> { - let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel(); - let (offload_tx, mut offload_rx) = mpsc::unbounded_channel(); + let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::(); + let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::(); // Clone resources needed for tasks let block_manager_offload = self.block_manager.clone(); @@ -1212,6 +1239,10 @@ impl LocalTransferEngine { tracing::debug!("LocalOffloadTask: received cancellation signal"); break; } + + let request_id = req.request_id.clone(); + let operation_id = req.operation_id; + if let Err(e) = process_offload_request( req, &block_manager_offload, @@ -1221,6 +1252,30 @@ impl LocalTransferEngine { .await { tracing::error!("LocalOffloadTask: error processing request: {:?}", e); + + // Create a fake/immediate transfer request that completes instantly. + // Otherwise, worker side might stuck and cause memory leak. + let fake_xfer = BlockTransferRequest { + from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type + to_pool: BlockTransferPool::Host, // (offload path, but no blocks) + blocks: vec![], // Empty - nothing to transfer + connector_req: Some(LeaderTransferRequest { + request_id: request_id.clone(), + uuid: operation_id, + requirement: None, + request_type: RequestType::Immediate, // Immediate = completes instantly + }), + }; + + match leader_offload.transfer_blocks_request(fake_xfer).await { + Ok(notify_receiver) => { + // Wait for the fake transfer to "complete" (should be instant) + let _ = notify_receiver.await; + } + Err(_xfer_err) => { + // Failed to create completion notification - error already logged above + } + } } } Ok(())