@@ -263,46 +263,58 @@ impl KvEventPublisher {
263263 #[ pyo3( signature = ( event_id, token_ids, num_block_tokens, block_hashes, lora_id, parent_hash=None ) ) ]
264264 fn publish_stored (
265265 & mut self ,
266- _py : Python ,
266+ py : Python ,
267267 event_id : u64 ,
268268 token_ids : Vec < u32 > ,
269269 num_block_tokens : Vec < u64 > ,
270270 block_hashes : Vec < i64 > ,
271271 lora_id : u64 ,
272272 parent_hash : Option < i64 > ,
273273 ) -> PyResult < ( ) > {
274- let block_hashes_u64: Vec < u64 > = block_hashes. iter ( ) . map ( |& h| h as u64 ) . collect ( ) ;
275- let event = KvCacheEvent {
276- event_id,
277- data : KvCacheEventData :: Stored ( KvCacheStoreData {
278- parent_hash : parent_hash. map ( ExternalSequenceBlockHash :: from) ,
279- blocks : create_stored_blocks (
280- self . kv_block_size as u32 ,
281- & token_ids,
282- & num_block_tokens,
283- & block_hashes_u64,
284- lora_id,
285- & self . warning_count ,
286- ) ,
287- } ) ,
288- dp_rank : self . dp_rank ,
289- } ;
274+ let kv_block_size = self . kv_block_size as u32 ;
275+ let dp_rank = self . dp_rank ;
276+ let warning_count = self . warning_count . clone ( ) ;
277+ let inner = self . inner . clone ( ) ;
278+
279+ py. allow_threads ( || {
280+ let block_hashes_u64: Vec < u64 > = block_hashes. iter ( ) . map ( |& h| h as u64 ) . collect ( ) ;
281+ let event = KvCacheEvent {
282+ event_id,
283+ data : KvCacheEventData :: Stored ( KvCacheStoreData {
284+ parent_hash : parent_hash. map ( ExternalSequenceBlockHash :: from) ,
285+ blocks : create_stored_blocks (
286+ kv_block_size,
287+ & token_ids,
288+ & num_block_tokens,
289+ & block_hashes_u64,
290+ lora_id,
291+ & warning_count,
292+ ) ,
293+ } ) ,
294+ dp_rank,
295+ } ;
290296
291- self . inner . publish ( event) . map_err ( to_pyerr)
297+ inner. publish ( event) . map_err ( to_pyerr)
298+ } )
292299 }
293300
294- fn publish_removed ( & self , _py : Python , event_id : u64 , block_hashes : Vec < i64 > ) -> PyResult < ( ) > {
295- let block_hashes: Vec < ExternalSequenceBlockHash > = block_hashes
296- . into_iter ( )
297- . map ( ExternalSequenceBlockHash :: from)
298- . collect ( ) ;
299- let event = KvCacheEvent {
300- event_id,
301- data : KvCacheEventData :: Removed ( KvCacheRemoveData { block_hashes } ) ,
302- dp_rank : self . dp_rank ,
303- } ;
301+ fn publish_removed ( & self , py : Python , event_id : u64 , block_hashes : Vec < i64 > ) -> PyResult < ( ) > {
302+ let dp_rank = self . dp_rank ;
303+ let inner = self . inner . clone ( ) ;
304+
305+ py. allow_threads ( || {
306+ let block_hashes: Vec < ExternalSequenceBlockHash > = block_hashes
307+ . into_iter ( )
308+ . map ( ExternalSequenceBlockHash :: from)
309+ . collect ( ) ;
310+ let event = KvCacheEvent {
311+ event_id,
312+ data : KvCacheEventData :: Removed ( KvCacheRemoveData { block_hashes } ) ,
313+ dp_rank,
314+ } ;
304315
305- self . inner . publish ( event) . map_err ( to_pyerr)
316+ inner. publish ( event) . map_err ( to_pyerr)
317+ } )
306318 }
307319}
308320
0 commit comments