@@ -10,7 +10,6 @@ use tokio_stream::StreamExt;
1010
1111use super :: * ;
1212use crate :: Component ;
13- use llm_rs:: kv_router:: indexer:: KvIndexerInterface ;
1413use llm_rs:: kv_router:: protocols:: compute_block_hash_for_seq;
1514use rs:: pipeline:: { AsyncEngine , SingleIn } ;
1615use tracing;
@@ -678,186 +677,6 @@ impl Drop for RadixTree {
678677 }
679678}
680679
681- #[ pyclass]
682- pub ( crate ) struct KvIndexer {
683- inner : Arc < llm_rs:: kv_router:: indexer:: KvIndexer > ,
684- }
685-
686- #[ pymethods]
687- impl KvIndexer {
688- #[ new]
689- #[ pyo3( signature = ( component, kv_block_size, consumer_uuid=None ) ) ]
690- fn new (
691- component : Component ,
692- kv_block_size : usize ,
693- consumer_uuid : Option < String > ,
694- ) -> PyResult < Self > {
695- let runtime = pyo3_async_runtimes:: tokio:: get_runtime ( ) ;
696- runtime. block_on ( async {
697- let cancellation_token = component. inner . drt ( ) . runtime ( ) . child_token ( ) ;
698- let kv_indexer_metrics =
699- llm_rs:: kv_router:: indexer:: KvIndexerMetrics :: from_component ( & component. inner ) ;
700- let inner: Arc < llm_rs:: kv_router:: indexer:: KvIndexer > =
701- llm_rs:: kv_router:: indexer:: KvIndexer :: new (
702- cancellation_token. clone ( ) ,
703- kv_block_size as u32 ,
704- kv_indexer_metrics,
705- )
706- . into ( ) ;
707-
708- // Use the shared start_kv_router_background function for event consumption
709- // Pass None for snapshot_tx and get_workers_tx to skip snapshot handling in Python bindings
710- llm_rs:: kv_router:: subscriber:: start_kv_router_background (
711- component. inner . clone ( ) ,
712- consumer_uuid. unwrap_or_else ( || uuid:: Uuid :: new_v4 ( ) . to_string ( ) ) ,
713- inner. event_sender ( ) ,
714- inner. remove_worker_sender ( ) ,
715- None ,
716- None ,
717- cancellation_token,
718- None ,
719- true ,
720- )
721- . await
722- . map_err ( to_pyerr) ?;
723-
724- Ok ( Self { inner } )
725- } )
726- }
727-
728- fn block_size ( & self ) -> usize {
729- self . inner . block_size ( ) as usize
730- }
731-
732- fn find_matches < ' p > ( & self , py : Python < ' p > , sequence : Vec < u64 > ) -> PyResult < Bound < ' p , PyAny > > {
733- let indexer = self . inner . clone ( ) ;
734- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
735- let local_block_hashes: Vec < llm_rs:: kv_router:: protocols:: LocalBlockHash > = sequence
736- . into_iter ( )
737- . map ( llm_rs:: kv_router:: protocols:: LocalBlockHash )
738- . collect ( ) ;
739-
740- let rs_overlap_scores = indexer
741- . find_matches ( local_block_hashes)
742- . await
743- . map_err ( to_pyerr) ?;
744- Ok ( OverlapScores {
745- inner : rs_overlap_scores,
746- } )
747- } )
748- }
749-
750- fn find_matches_for_request < ' p > (
751- & self ,
752- py : Python < ' p > ,
753- token_ids : Vec < u32 > ,
754- _lora_id : u64 ,
755- ) -> PyResult < Bound < ' p , PyAny > > {
756- let indexer = self . inner . clone ( ) ;
757- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
758- let rs_overlap_scores = indexer
759- . find_matches_for_request ( token_ids. as_slice ( ) )
760- . await
761- . map_err ( to_pyerr) ?;
762- Ok ( OverlapScores {
763- inner : rs_overlap_scores,
764- } )
765- } )
766- }
767- }
768-
769- /// Bindings for the approximate KV indexer. This is a wrapper around KvIndexer
770- /// that uses TTL-based expiration and pruning instead of receiving KV events from workers.
771- #[ pyclass]
772- pub ( crate ) struct ApproxKvIndexer {
773- inner : Arc < llm_rs:: kv_router:: indexer:: KvIndexer > ,
774- }
775-
776- #[ pymethods]
777- impl ApproxKvIndexer {
778- #[ new]
779- #[ pyo3( signature = ( component, kv_block_size, router_ttl_secs=120.0 , router_max_tree_size=1048576 , router_prune_target_ratio=0.8 ) ) ]
780- fn new (
781- component : Component ,
782- kv_block_size : usize ,
783- router_ttl_secs : f64 ,
784- router_max_tree_size : usize ,
785- router_prune_target_ratio : f64 ,
786- ) -> PyResult < Self > {
787- let runtime = pyo3_async_runtimes:: tokio:: get_runtime ( ) ;
788- runtime. block_on ( async {
789- let cancellation_token = component. inner . drt ( ) . runtime ( ) . child_token ( ) ;
790- let kv_indexer_metrics =
791- llm_rs:: kv_router:: indexer:: KvIndexerMetrics :: from_component ( & component. inner ) ;
792-
793- // Build PruneConfig with the provided parameters
794- let prune_config = llm_rs:: kv_router:: approx:: PruneConfig {
795- ttl : std:: time:: Duration :: from_secs_f64 ( router_ttl_secs) ,
796- max_tree_size : router_max_tree_size,
797- prune_target_ratio : router_prune_target_ratio,
798- } ;
799-
800- // Create KvIndexer with pruning enabled, but DO NOT subscribe to events
801- let inner: Arc < llm_rs:: kv_router:: indexer:: KvIndexer > =
802- llm_rs:: kv_router:: indexer:: KvIndexer :: new_with_frequency (
803- cancellation_token. clone ( ) ,
804- None , // expiration_duration - not used with prune_config
805- kv_block_size as u32 ,
806- kv_indexer_metrics,
807- Some ( prune_config) ,
808- )
809- . into ( ) ;
810-
811- // Note: We deliberately do NOT call start_kv_router_background here
812- // because ApproxKvIndexer doesn't use KV events from workers
813-
814- Ok ( Self { inner } )
815- } )
816- }
817-
818- fn block_size ( & self ) -> usize {
819- self . inner . block_size ( ) as usize
820- }
821-
822- fn find_matches_for_request < ' p > (
823- & self ,
824- py : Python < ' p > ,
825- token_ids : Vec < u32 > ,
826- ) -> PyResult < Bound < ' p , PyAny > > {
827- let indexer = self . inner . clone ( ) ;
828- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
829- let rs_overlap_scores = indexer
830- . find_matches_for_request ( token_ids. as_slice ( ) )
831- . await
832- . map_err ( to_pyerr) ?;
833- Ok ( OverlapScores {
834- inner : rs_overlap_scores,
835- } )
836- } )
837- }
838-
839- #[ pyo3( signature = ( tokens, worker_id, dp_rank=0 ) ) ]
840- fn process_routing_decision_for_request < ' p > (
841- & self ,
842- py : Python < ' p > ,
843- tokens : Vec < u32 > ,
844- worker_id : WorkerId ,
845- dp_rank : DpRank ,
846- ) -> PyResult < Bound < ' p , PyAny > > {
847- let indexer = self . inner . clone ( ) ;
848- let block_size = self . inner . block_size ( ) ;
849- pyo3_async_runtimes:: tokio:: future_into_py ( py, async move {
850- let worker = llm_rs:: kv_router:: protocols:: WorkerWithDpRank :: new ( worker_id, dp_rank) ;
851- let mut tokens_with_hashes = TokensWithHashes :: new ( tokens, block_size) ;
852- indexer
853- . process_routing_decision_for_request ( & mut tokens_with_hashes, worker)
854- . await
855- . map_err ( to_pyerr) ?;
856- Ok ( ( ) )
857- } )
858- }
859- }
860-
861680/// Helper function to create a KV router from an endpoint using the ModelManager
862681/// to ensure proper etcd registration.
863682/// Infers worker type using endpoint naming and router config:
0 commit comments