@@ -35,8 +35,6 @@ use common::{
3535 RetentionValidator ,
3636 TimestampRange ,
3737 } ,
38- persistence_helpers:: stream_revision_pairs,
39- query:: Order ,
4038 runtime:: {
4139 Runtime ,
4240 RuntimeInstant ,
@@ -80,7 +78,6 @@ use futures::{
8078use indexing:: index_registry:: IndexRegistry ;
8179use parking_lot:: Mutex ;
8280use prometheus:: VMHistogram ;
83- use search:: SearchIndexManager ;
8481use usage_tracking:: {
8582 DocInVectorIndex ,
8683 FunctionUsageTracker ,
@@ -94,16 +91,23 @@ use value::{
9491 TableMapping ,
9592 TableName ,
9693} ;
97- use vector:: VectorIndexManager ;
9894
9995use crate :: {
10096 bootstrap_model:: defaults:: BootstrapTableIds ,
10197 database:: {
10298 ConflictingReadWithWriteSource ,
10399 ShutdownSignal ,
104100 } ,
105- metrics,
101+ metrics:: {
102+ self ,
103+ bootstrap_update_timer,
104+ finish_bootstrap_update,
105+ } ,
106106 reads:: ReadSet ,
107+ search_and_vector_bootstrap:: {
108+ stream_revision_pairs_for_indexes,
109+ BootstrappedSearchAndVectorIndexes ,
110+ } ,
107111 snapshot_manager:: SnapshotManager ,
108112 transaction:: FinalTransaction ,
109113 write_log:: {
@@ -270,12 +274,12 @@ impl<RT: Runtime> Committer<RT> {
270274 self . bump_max_repeatable_ts( result) ;
271275 } ,
272276 Some ( CommitterMessage :: FinishSearchAndVectorBootstrap {
273- search_index_manager,
274- vector_index_manager, bootstrap_ts, result,
277+ bootstrapped_indexes,
278+ bootstrap_ts,
279+ result,
275280 } ) => {
276281 self . finish_search_and_vector_bootstrap(
277- search_index_manager,
278- vector_index_manager,
282+ bootstrapped_indexes,
279283 bootstrap_ts,
280284 result
281285 ) . await ;
@@ -293,12 +297,16 @@ impl<RT: Runtime> Committer<RT> {
293297 }
294298
295299 async fn update_indexes_since_bootstrap (
296- search_index_manager : & mut SearchIndexManager ,
297- vector_index_manager : & mut VectorIndexManager ,
300+ BootstrappedSearchAndVectorIndexes {
301+ search_index_manager,
302+ vector_index_manager,
303+ tables_with_indexes,
304+ } : & mut BootstrappedSearchAndVectorIndexes ,
298305 bootstrap_ts : Timestamp ,
299306 persistence : RepeatablePersistence ,
300307 registry : & IndexRegistry ,
301308 ) -> anyhow:: Result < ( ) > {
309+ let _timer = bootstrap_update_timer ( ) ;
302310 anyhow:: ensure!(
303311 !search_index_manager. is_bootstrapping( ) ,
304312 "Trying to update search index while it's still bootstrapping"
@@ -307,13 +315,17 @@ impl<RT: Runtime> Committer<RT> {
307315 !vector_index_manager. is_bootstrapping( ) ,
308316 "Trying to update vector index while it's still bootstrapping"
309317 ) ;
310- let range = ( Bound :: Excluded ( bootstrap_ts) , Bound :: Unbounded ) ;
318+ let range = TimestampRange :: new ( ( Bound :: Excluded ( bootstrap_ts) , Bound :: Unbounded ) ) ? ;
311319
312- let document_stream = persistence . load_documents ( TimestampRange :: new ( range ) ? , Order :: Asc ) ;
313- let revision_stream = stream_revision_pairs ( document_stream , & persistence) ;
320+ let revision_stream =
321+ stream_revision_pairs_for_indexes ( tables_with_indexes , & persistence, range ) ;
314322 futures:: pin_mut!( revision_stream) ;
315323
324+ let mut num_revisions = 0 ;
325+ let mut total_size = 0 ;
316326 while let Some ( revision_pair) = revision_stream. try_next ( ) . await ? {
327+ num_revisions += 1 ;
328+ total_size += revision_pair. document ( ) . map ( |d| d. size ( ) ) . unwrap_or ( 0 ) ;
317329 search_index_manager. update (
318330 registry,
319331 revision_pair. prev_document ( ) ,
@@ -327,13 +339,13 @@ impl<RT: Runtime> Committer<RT> {
327339 WriteTimestamp :: Committed ( revision_pair. ts ( ) ) ,
328340 ) ?;
329341 }
342+ finish_bootstrap_update ( num_revisions, total_size) ;
330343 Ok ( ( ) )
331344 }
332345
333346 async fn finish_search_and_vector_bootstrap (
334347 & mut self ,
335- mut search_index_manager : SearchIndexManager ,
336- mut vector_index_manager : VectorIndexManager ,
348+ mut bootstrapped_indexes : BootstrappedSearchAndVectorIndexes ,
337349 bootstrap_ts : RepeatableTimestamp ,
338350 result : oneshot:: Sender < anyhow:: Result < ( ) > > ,
339351 ) {
@@ -352,8 +364,7 @@ impl<RT: Runtime> Committer<RT> {
352364 ) ;
353365
354366 let res = Self :: update_indexes_since_bootstrap (
355- & mut search_index_manager,
356- & mut vector_index_manager,
367+ & mut bootstrapped_indexes,
357368 * bootstrap_ts,
358369 repeatable_persistence,
359370 & last_snapshot. index_registry ,
@@ -371,8 +382,8 @@ impl<RT: Runtime> Committer<RT> {
371382 panic ! ( "Snapshots were changed concurrently during commit?" ) ;
372383 }
373384 snapshot_manager. overwrite_last_snapshot_search_and_vector_indexes (
374- search_index_manager,
375- vector_index_manager,
385+ bootstrapped_indexes . search_index_manager ,
386+ bootstrapped_indexes . vector_index_manager ,
376387 ) ;
377388 tracing:: info!( "Committed backfilled vector indexes" ) ;
378389 let _ = result. send ( Ok ( ( ) ) ) ;
@@ -814,14 +825,12 @@ impl<RT: Runtime> Clone for CommitterClient<RT> {
814825impl < RT : Runtime > CommitterClient < RT > {
815826 pub async fn finish_search_and_vector_bootstrap (
816827 & self ,
817- search_index_manager : SearchIndexManager ,
818- vector_index_manager : VectorIndexManager ,
828+ bootstrapped_indexes : BootstrappedSearchAndVectorIndexes ,
819829 bootstrap_ts : RepeatableTimestamp ,
820830 ) -> anyhow:: Result < ( ) > {
821831 let ( tx, rx) = oneshot:: channel ( ) ;
822832 let message = CommitterMessage :: FinishSearchAndVectorBootstrap {
823- search_index_manager,
824- vector_index_manager,
833+ bootstrapped_indexes,
825834 bootstrap_ts,
826835 result : tx,
827836 } ;
@@ -977,8 +986,7 @@ enum CommitterMessage {
977986 result : oneshot:: Sender < anyhow:: Result < ( ) > > ,
978987 } ,
979988 FinishSearchAndVectorBootstrap {
980- search_index_manager : SearchIndexManager ,
981- vector_index_manager : VectorIndexManager ,
989+ bootstrapped_indexes : BootstrappedSearchAndVectorIndexes ,
982990 bootstrap_ts : RepeatableTimestamp ,
983991 result : oneshot:: Sender < anyhow:: Result < ( ) > > ,
984992 } ,
0 commit comments