Skip to content

Commit 6097426

Browse files
committed
WIP: Fix gpu leak
1 parent 1a9aeab commit 6097426

File tree

2 files changed

+51
-11
lines changed

2 files changed

+51
-11
lines changed

lib/kvbm/src/block_manager/vllm/connector/leader.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -507,31 +507,33 @@ impl Leader for KvConnectorLeader {
507507
// grab the slot
508508
let shared_slot = self.slot_manager().get_slot(&request_id)?;
509509

510-
// mark the slot as finished
510+
// RACE FIX: Acquire lock BEFORE marking as finished
511+
// This ensures we check state and prevent new operations from being created
511512
let mut slot = shared_slot
512513
.lock()
513514
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;
514-
slot.mark_as_finished(self.iteration_counter)?;
515515

516-
// todo: allow the request to resolve when it should exit
517-
// the request may have some outstanding operations
518-
// we would like to inform it to shutdown, then have it signal to the work that is officially gone,
519-
// then we can remove the slot and trigger the worker to clean up as well.
516+
// Mark the slot as finished (sets state to Finishing if there are operations,
517+
// or Finished if all operations are complete)
518+
slot.mark_as_finished(self.iteration_counter)?;
520519

521520
// remove the request from the inflight requests
522521
self.inflight_requests.remove(&request_id);
523522

524-
// remove it from the manager as we will never use it again
525-
self.slot_manager().remove_slot(&request_id)?;
526-
527523
// if the slot has finished, we can return false to vllm, indicating all gpu blocks are free to be reused
528524
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
529525
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
530526
// of the connector api which will be used to inform vllm that the request is finished.
531527
if let SlotState::Finished = slot.state() {
528+
// All operations complete - safe to remove slot and tell vLLM blocks are free
529+
self.slot_manager().remove_slot(&request_id)?;
532530
Ok(false)
533531
} else {
534532
debug_assert!(matches!(slot.state(), SlotState::Finishing));
533+
// Still has pending operations - keep slot alive for worker to process
534+
// RACE FIX: Don't remove slot here! Worker needs it to process the finish event.
535+
// Worker will remove it after verifying all operations are complete.
536+
// The lock on the slot prevents new operations from being created in offload_blocks()
535537
Ok(true)
536538
}
537539
}

lib/kvbm/src/block_manager/vllm/connector/leader/slot.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,12 @@ impl VllmConnectorSlot {
989989
block_ids: &[BlockId],
990990
token_blocks: &[TokenBlock],
991991
) -> Result<(), SlotError> {
992+
// RACE FIX: Check if slot is in Finishing state before creating operations
993+
// If we're finishing, don't create new operations - they won't be processed
994+
if matches!(self.state, SlotState::Finishing | SlotState::Finished) {
995+
return Ok(());
996+
}
997+
992998
assert!(block_ids.len() == token_blocks.len());
993999
let operation_id = uuid::Uuid::new_v4();
9941000

@@ -1173,8 +1179,8 @@ impl LocalTransferEngine {
11731179
task_token: CancellationToken,
11741180
kvbm_metrics: KvbmMetrics,
11751181
) -> anyhow::Result<()> {
1176-
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel();
1177-
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel();
1182+
let (onboard_tx, mut onboard_rx) = mpsc::unbounded_channel::<LocalOnboardRequest>();
1183+
let (offload_tx, mut offload_rx) = mpsc::unbounded_channel::<LocalOffloadRequest>();
11781184

11791185
// Clone resources needed for tasks
11801186
let block_manager_offload = self.block_manager.clone();
@@ -1212,6 +1218,10 @@ impl LocalTransferEngine {
12121218
tracing::debug!("LocalOffloadTask: received cancellation signal");
12131219
break;
12141220
}
1221+
1222+
let request_id = req.request_id.clone();
1223+
let operation_id = req.operation_id;
1224+
12151225
if let Err(e) = process_offload_request(
12161226
req,
12171227
&block_manager_offload,
@@ -1221,6 +1231,34 @@ impl LocalTransferEngine {
12211231
.await
12221232
{
12231233
tracing::error!("LocalOffloadTask: error processing request: {:?}", e);
1234+
1235+
// FIX: Notify scheduler that this operation is "complete" (even though it failed)
1236+
// Create a fake/immediate transfer request that completes instantly
1237+
// This increments the workers' completed counter so they can progress
1238+
use dynamo_llm::block_manager::connector::protocol::{LeaderTransferRequest, RequestType};
1239+
use dynamo_llm::block_manager::distributed::{BlockTransferRequest, BlockTransferPool};
1240+
1241+
let fake_xfer = BlockTransferRequest {
1242+
from_pool: BlockTransferPool::Device, // Use valid Device->Host transfer type
1243+
to_pool: BlockTransferPool::Host, // (offload path, but no blocks)
1244+
blocks: vec![], // Empty - nothing to transfer
1245+
connector_req: Some(LeaderTransferRequest {
1246+
request_id: request_id.clone(),
1247+
uuid: operation_id,
1248+
requirement: None,
1249+
request_type: RequestType::Immediate, // Immediate = completes instantly
1250+
}),
1251+
};
1252+
1253+
match leader_offload.transfer_blocks_request(fake_xfer).await {
1254+
Ok(notify_receiver) => {
1255+
// Wait for the fake transfer to "complete" (should be instant)
1256+
let _ = notify_receiver.await;
1257+
}
1258+
Err(_xfer_err) => {
1259+
// Failed to create completion notification - error already logged above
1260+
}
1261+
}
12241262
}
12251263
}
12261264
Ok(())

0 commit comments

Comments
 (0)