@@ -96,6 +96,7 @@ use crate::{
9696 IndexModel ,
9797 TableModel ,
9898 UserFacingModel ,
99+ VectorIndexFlusher ,
99100} ;
100101
101102const TABLE_NAME : & str = "test" ;
@@ -105,11 +106,6 @@ const INDEXED_FIELD: &str = "embedding";
105106const FILTER_FIELDS : & [ & str ] = & [ "A" , "B" , "C" , "D" ] ;
106107const TABLE_NAMESPACE : TableNamespace = TableNamespace :: test_user ( ) ;
107108
108- enum ScenarioIndexState {
109- None ,
110- Some ,
111- }
112-
113109struct Scenario < RT : Runtime > {
114110 rt : RT ,
115111 database : Database < RT > ,
@@ -119,7 +115,7 @@ struct Scenario<RT: Runtime> {
119115}
120116
121117impl < RT : Runtime > Scenario < RT > {
122- async fn new ( rt : RT , vector_index_state : ScenarioIndexState ) -> anyhow:: Result < Self > {
118+ async fn new ( rt : RT ) -> anyhow:: Result < Self > {
123119 let DbFixtures {
124120 tp,
125121 db,
@@ -145,12 +141,28 @@ impl<RT: Runtime> Scenario<RT> {
145141 searcher,
146142 } ;
147143
148- if let ScenarioIndexState :: Some = vector_index_state {
149- self_. add_vector_index ( true ) . await ?;
150- }
151144 Ok ( self_)
152145 }
153146
147+ async fn new_with_enabled_index ( rt : RT ) -> anyhow:: Result < Self > {
148+ let self_ = Scenario :: new ( rt) . await ?;
149+ self_. add_vector_index ( true ) . await ?;
150+ Ok ( self_)
151+ }
152+
153+ fn new_backfill_flusher ( & self , incremental_index_size : usize ) -> VectorIndexFlusher < RT > {
154+ new_vector_flusher_for_tests (
155+ self . rt . clone ( ) ,
156+ self . database . clone ( ) ,
157+ self . reader . clone ( ) ,
158+ self . search_storage . clone ( ) ,
159+ * VECTOR_INDEX_SIZE_SOFT_LIMIT ,
160+ * MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB ,
161+ incremental_index_size,
162+ FlusherType :: Backfill ,
163+ )
164+ }
165+
154166 async fn add_vector_index ( & self , should_backfill : bool ) -> anyhow:: Result < ( ) > {
155167 let table_name: TableName = TABLE_NAME . parse ( ) ?;
156168 let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
@@ -357,7 +369,7 @@ struct RandomizedTest<RT: Runtime> {
357369impl < RT : Runtime > RandomizedTest < RT > {
358370 async fn new ( rt : RT ) -> anyhow:: Result < Self > {
359371 Ok ( Self {
360- scenario : Scenario :: new ( rt, ScenarioIndexState :: Some ) . await ?,
372+ scenario : Scenario :: new_with_enabled_index ( rt) . await ?,
361373 model : BTreeMap :: new ( ) ,
362374 } )
363375 }
@@ -469,7 +481,7 @@ impl<RT: Runtime> RandomizedTest<RT> {
469481#[ convex_macro:: test_runtime]
470482
471483async fn test_vector_search ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
472- let scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?;
484+ let scenario = Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?;
473485
474486 let mut tx = scenario. database . begin ( Identity :: system ( ) ) . await ?;
475487
@@ -569,7 +581,7 @@ async fn test_vector_search(rt: TestRuntime) -> anyhow::Result<()> {
569581
570582#[ convex_macro:: test_runtime]
571583async fn test_vector_search_compaction ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
572- let mut scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?;
584+ let mut scenario = Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?;
573585
574586 let mut ids = vec ! [ ] ;
575587
@@ -629,7 +641,7 @@ async fn test_vector_search_compaction(rt: TestRuntime) -> anyhow::Result<()> {
629641#[ ignore] // TODO(CX-5143): Re-enable this test after fixing the flake.
630642#[ convex_macro:: prod_rt_test]
631643async fn test_concurrent_index_version_searches ( rt : ProdRuntime ) -> anyhow:: Result < ( ) > {
632- let scenario = Arc :: new ( Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?) ;
644+ let scenario = Arc :: new ( Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?) ;
633645
634646 let mut ids = vec ! [ ] ;
635647 let mut tx = scenario. database . begin ( Identity :: system ( ) ) . await ?;
@@ -708,7 +720,7 @@ async fn test_concurrent_index_version_searches(rt: ProdRuntime) -> anyhow::Resu
708720
709721#[ convex_macro:: test_runtime]
710722async fn test_vector_search_compaction_with_deletes ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
711- let mut scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?;
723+ let mut scenario = Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?;
712724
713725 let mut ids = vec ! [ ] ;
714726
@@ -754,31 +766,19 @@ async fn test_vector_search_compaction_with_deletes(rt: TestRuntime) -> anyhow::
754766
755767#[ convex_macro:: test_runtime]
756768async fn test_index_backfill_is_incremental ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
757- let scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: None ) . await ?;
769+ let scenario = Scenario :: new ( rt. clone ( ) ) . await ?;
758770 let num_parts = 12 ;
759771 let vectors_per_part = 8 ;
760772 let incremental_index_size =
761773 ( DIMENSIONS * ( VECTOR_ELEMENT_SIZE as u32 ) * vectors_per_part) as usize ;
762774
763- // Add the vectors
764775 let ids = scenario
765776 . seed_table_with_vector_data ( num_parts * vectors_per_part)
766777 . await ?;
767778
768- // Add the index
769779 scenario. add_vector_index ( false ) . await ?;
770780
771- // Create flusher
772- let flusher = new_vector_flusher_for_tests (
773- rt. clone ( ) ,
774- scenario. database . clone ( ) ,
775- scenario. reader . clone ( ) ,
776- scenario. search_storage . clone ( ) ,
777- * VECTOR_INDEX_SIZE_SOFT_LIMIT ,
778- * MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB ,
779- incremental_index_size,
780- FlusherType :: Backfill ,
781- ) ;
781+ let flusher = scenario. new_backfill_flusher ( incremental_index_size) ;
782782
783783 let mut backfill_ts = None ;
784784 for i in 0 ..num_parts {
@@ -841,31 +841,19 @@ async fn test_index_backfill_is_incremental(rt: TestRuntime) -> anyhow::Result<(
841841
842842#[ convex_macro:: test_runtime]
843843async fn test_incremental_backfill_with_compaction ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
844- let mut scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: None ) . await ?;
844+ let mut scenario = Scenario :: new ( rt. clone ( ) ) . await ?;
845845 let num_parts = 3 ;
846846 let vectors_per_part = 8 ;
847847 let incremental_index_size =
848848 ( DIMENSIONS * ( VECTOR_ELEMENT_SIZE as u32 ) * vectors_per_part) as usize ;
849849
850- // Add the vectors
851850 let ids = scenario
852851 . seed_table_with_vector_data ( num_parts * vectors_per_part)
853852 . await ?;
854853
855- // Add the index
856854 scenario. add_vector_index ( false ) . await ?;
857855
858- // Create flusher
859- let flusher = new_vector_flusher_for_tests (
860- rt. clone ( ) ,
861- scenario. database . clone ( ) ,
862- scenario. reader . clone ( ) ,
863- scenario. search_storage . clone ( ) ,
864- * VECTOR_INDEX_SIZE_SOFT_LIMIT ,
865- * MULTI_SEGMENT_FULL_SCAN_THRESHOLD_KB ,
866- incremental_index_size,
867- FlusherType :: Backfill ,
868- ) ;
856+ let flusher = scenario. new_backfill_flusher ( incremental_index_size) ;
869857
870858 for _ in 0 ..num_parts {
871859 // Do a backfill iteration
@@ -908,7 +896,7 @@ async fn test_incremental_backfill_with_compaction(rt: TestRuntime) -> anyhow::R
908896
909897#[ convex_macro:: test_runtime]
910898async fn test_empty_multi_segment ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
911- let scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?;
899+ let scenario = Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?;
912900 let query = random_vector ( & mut rt. rng ( ) ) ;
913901 let results = scenario
914902 . search_with_limit ( query, btreeset ! [ ] , Some ( 10 ) )
@@ -920,7 +908,7 @@ async fn test_empty_multi_segment(rt: TestRuntime) -> anyhow::Result<()> {
920908
921909#[ convex_macro:: test_runtime]
922910async fn test_recall_multi_segment ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
923- let scenario = Scenario :: new ( rt. clone ( ) , ScenarioIndexState :: Some ) . await ?;
911+ let scenario = Scenario :: new_with_enabled_index ( rt. clone ( ) ) . await ?;
924912 let mut tx = scenario. database . begin ( Identity :: system ( ) ) . await ?;
925913 let table_number = tx
926914 . table_mapping ( )
0 commit comments