Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions lib/kvbm/src/block_manager/vllm/connector/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader {
// grab the slot
let shared_slot = self.slot_manager().get_slot(&request_id)?;

// mark the slot as finished
// RACE FIX: Acquire lock BEFORE marking as finished
// This ensures we check state and prevent new operations from being created
let mut slot = shared_slot
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
slot.mark_as_finished(self.iteration_counter)?;

// todo: allow the request to resolve when it should exit
// the request may have some outstanding operations
// we would like to inform it to shutdown, then have it signal to the work that is officially gone,
// then we can remove the slot and trigger the worker to clean up as well.
// Mark the slot as finished (sets state to Finishing if there are operations,
// or Finished if all operations are complete)
slot.mark_as_finished(self.iteration_counter)?;

// remove the request from the inflight requests
self.inflight_requests.remove(&request_id);

// remove it from the manager as we will never use it again
self.slot_manager().remove_slot(&request_id)?;

// if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
// of the connector api which will be used to inform vllm that the request is finished.
if let SlotState::Finished = slot.state() {
// All operations complete - safe to remove slot and tell vLLM blocks are free
self.slot_manager().remove_slot(&request_id)?;
Ok(false)
} else {
debug_assert!(matches!(slot.state(), SlotState::Finishing));
// Still has pending operations - keep slot alive for worker to process
// RACE FIX: Don't remove slot here! Worker needs it to process the finish event.
// Worker will remove it after verifying all operations are complete.
// The lock on the slot prevents new operations from being created in offload_blocks()
Ok(true)
}
}
Expand Down
39 changes: 37 additions & 2 deletions lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,12 @@ impl VllmConnectorSlot {
block_ids: &[BlockId],
token_blocks: &[TokenBlock],
) -> Result<(), SlotError> {
// RACE FIX: Check if slot is in Finishing state before creating operations
// If we're finishing, don't create new operations - they won't be processed
if matches!(self.state, SlotState::Finishing | SlotState::Finished) {
return Ok(());
}

assert!(block_ids.len() == token_blocks.len());
let operation_id = uuid::Uuid::new_v4();

Expand Down Expand Up @@ -1173,8 +1179,8 @@ impl LocalTransferEngine {
task_token: CancellationToken,
kvbm_metrics: KvbmMetrics,
) -> anyhow::Result<()> {
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel();
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::<LocalOnboardRequest>();
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::<LocalOffloadRequest>();

// Clone resources needed for tasks
let block_manager_offload = self.block_manager.clone();
Expand Down Expand Up @@ -1212,6 +1218,10 @@ impl LocalTransferEngine {
tracing::debug!("LocalOffloadTask: received cancellation signal");
break;
}

let request_id = req.request_id.clone();
let operation_id = req.operation_id;

if let Err(e) = process_offload_request(
req,
&block_manager_offload,
Expand All @@ -1221,6 +1231,31 @@ impl LocalTransferEngine {
.await
{
tracing::error!("LocalOffloadTask: error processing request: {:?}", e);

// FIX: Notify scheduler that this operation is "complete" (even though it failed)
// Create a fake/immediate transfer request that completes instantly
// This increments the workers' completed counter so they can progress
let fake_xfer = BlockTransferRequest {
from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type
to_pool: BlockTransferPool::Host, // (offload path, but no blocks)
blocks: vec![], // Empty - nothing to transfer
connector_req: Some(LeaderTransferRequest {
request_id: request_id.clone(),
uuid: operation_id,
requirement: None,
request_type: RequestType::Immediate, // Immediate = completes instantly
}),
};

match leader_offload.transfer_blocks_request(fake_xfer).await {
Ok(notify_receiver) => {
// Wait for the fake transfer to "complete" (should be instant)
let _ = notify_receiver.await;
}
Err(_xfer_err) => {
// Failed to create completion notification - error already logged above
}
}
}
}
Ok(())
Expand Down
Loading