@@ -989,6 +989,12 @@ impl VllmConnectorSlot {
989989 block_ids : & [ BlockId ] ,
990990 token_blocks : & [ TokenBlock ] ,
991991 ) -> Result < ( ) , SlotError > {
992+ // RACE FIX: Check if slot is in Finishing state before creating operations
993+ // If we're finishing, don't create new operations - they won't be processed
994+ if matches ! ( self . state, SlotState :: Finishing | SlotState :: Finished ) {
995+ return Ok ( ( ) ) ;
996+ }
997+
992998 assert ! ( block_ids. len( ) == token_blocks. len( ) ) ;
993999 let operation_id = uuid:: Uuid :: new_v4 ( ) ;
9941000
@@ -1173,8 +1179,8 @@ impl LocalTransferEngine {
11731179 task_token : CancellationToken ,
11741180 kvbm_metrics : KvbmMetrics ,
11751181 ) -> anyhow:: Result < ( ) > {
1176- let ( onboard_tx, mut onboard_rx) = mpsc:: unbounded_channel ( ) ;
1177- let ( offload_tx, mut offload_rx) = mpsc:: unbounded_channel ( ) ;
1182+ let ( onboard_tx, mut onboard_rx) = mpsc:: unbounded_channel :: < LocalOnboardRequest > ( ) ;
1183+ let ( offload_tx, mut offload_rx) = mpsc:: unbounded_channel :: < LocalOffloadRequest > ( ) ;
11781184
11791185 // Clone resources needed for tasks
11801186 let block_manager_offload = self . block_manager . clone ( ) ;
@@ -1212,6 +1218,10 @@ impl LocalTransferEngine {
12121218 tracing:: debug!( "LocalOffloadTask: received cancellation signal" ) ;
12131219 break ;
12141220 }
1221+
1222+ let request_id = req. request_id . clone ( ) ;
1223+ let operation_id = req. operation_id ;
1224+
12151225 if let Err ( e) = process_offload_request (
12161226 req,
12171227 & block_manager_offload,
@@ -1221,6 +1231,31 @@ impl LocalTransferEngine {
12211231 . await
12221232 {
12231233 tracing:: error!( "LocalOffloadTask: error processing request: {:?}" , e) ;
1234+
1235+ // FIX: Notify scheduler that this operation is "complete" (even though it failed)
1236+ // Create a fake/immediate transfer request that completes instantly
1237+ // This increments the workers' completed counter so they can progress
1238+ let fake_xfer = BlockTransferRequest {
1239+ from_pool : BlockTransferPool :: Device , // Use valid Device->Host transfer type
1240+ to_pool : BlockTransferPool :: Host , // (offload path, but no blocks)
1241+ blocks : vec ! [ ] , // Empty - nothing to transfer
1242+ connector_req : Some ( LeaderTransferRequest {
1243+ request_id : request_id. clone ( ) ,
1244+ uuid : operation_id,
1245+ requirement : None ,
1246+ request_type : RequestType :: Immediate , // Immediate = completes instantly
1247+ } ) ,
1248+ } ;
1249+
1250+ match leader_offload. transfer_blocks_request ( fake_xfer) . await {
1251+ Ok ( notify_receiver) => {
1252+ // Wait for the fake transfer to "complete" (should be instant)
1253+ let _ = notify_receiver. await ;
1254+ }
1255+ Err ( _xfer_err) => {
1256+ // Failed to create completion notification - error already logged above
1257+ }
1258+ }
12241259 }
12251260 }
12261261 Ok ( ( ) )
0 commit comments