11use std:: {
2- collections:: Bound ,
2+ collections:: {
3+ BTreeMap ,
4+ Bound ,
5+ } ,
36 future,
47 iter,
58 marker:: PhantomData ,
@@ -10,6 +13,8 @@ use std::{
1013} ;
1114
1215use anyhow:: Context ;
16+ #[ cfg( any( test, feature = "testing" ) ) ]
17+ use common:: pause:: PauseClient ;
1318use common:: {
1419 knobs:: {
1520 DATABASE_WORKERS_MAX_CHECKPOINT_AGE ,
@@ -38,6 +43,7 @@ use futures::{
3843} ;
3944use governor:: Quota ;
4045use keybroker:: Identity ;
46+ use search:: metrics:: SearchType ;
4147use storage:: Storage ;
4248use sync_types:: Timestamp ;
4349use tempfile:: TempDir ;
@@ -59,34 +65,53 @@ use crate::{
5965 SegmentType ,
6066 SnapshotData ,
6167 } ,
68+ writer:: {
69+ SearchIndexMetadataWriter ,
70+ SearchIndexWriteResult ,
71+ } ,
6272 BuildReason ,
6373 MultiSegmentBackfillResult ,
6474 } ,
75+ metrics:: {
76+ log_documents_per_new_search_segment,
77+ log_documents_per_search_index,
78+ log_documents_per_search_segment,
79+ log_non_deleted_documents_per_search_index,
80+ log_non_deleted_documents_per_search_segment,
81+ } ,
6582 Database ,
6683 IndexModel ,
6784 Token ,
6885} ;
6986
87+ #[ cfg( any( test, feature = "testing" ) ) ]
88+ pub ( crate ) const FLUSH_RUNNING_LABEL : & str = "flush_running" ;
89+
7090pub struct SearchFlusher < RT : Runtime , T : SearchIndexConfigParser > {
71- params : Params < RT > ,
91+ params : Params < RT , T :: IndexType > ,
92+ writer : SearchIndexMetadataWriter < RT , T :: IndexType > ,
7293 _config : PhantomData < T > ,
94+ #[ cfg( any( test, feature = "testing" ) ) ]
95+ pause_client : Option < PauseClient > ,
7396}
7497
7598impl < RT : Runtime , T : SearchIndexConfigParser > Deref for SearchFlusher < RT , T > {
76- type Target = Params < RT > ;
99+ type Target = Params < RT , T :: IndexType > ;
77100
78101 fn deref ( & self ) -> & Self :: Target {
79102 & self . params
80103 }
81104}
82105
83106#[ derive( Clone ) ]
84- pub struct Params < RT : Runtime > {
107+ pub struct Params < RT : Runtime , T : SearchIndex > {
85108 runtime : RT ,
86109 database : Database < RT > ,
87110 reader : Arc < dyn PersistenceReader > ,
88111 storage : Arc < dyn Storage > ,
89112 limits : SearchIndexLimits ,
113+ search_type : SearchType ,
114+ build_args : T :: BuildIndexArgs ,
90115}
91116
92117#[ derive( Clone ) ]
@@ -116,6 +141,10 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
116141 reader : Arc < dyn PersistenceReader > ,
117142 storage : Arc < dyn Storage > ,
118143 limits : SearchIndexLimits ,
144+ writer : SearchIndexMetadataWriter < RT , T :: IndexType > ,
145+ search_type : SearchType ,
146+ build_args : <T :: IndexType as SearchIndex >:: BuildIndexArgs ,
147+ #[ cfg( any( test, feature = "testing" ) ) ] pause_client : Option < PauseClient > ,
119148 ) -> Self {
120149 Self {
121150 params : Params {
@@ -124,9 +153,89 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
124153 reader,
125154 storage,
126155 limits,
156+ search_type,
157+ build_args,
127158 } ,
159+ writer,
128160 _config : PhantomData ,
161+ #[ cfg( any( test, feature = "testing" ) ) ]
162+ pause_client,
163+ }
164+ }
165+
166+ fn index_type_name ( & self ) -> & ' static str {
167+ match self . search_type {
168+ SearchType :: Vector => "vector" ,
169+ SearchType :: Text => "text" ,
170+ }
171+ }
172+
173+ pub async fn step ( & mut self ) -> anyhow:: Result < ( BTreeMap < TabletIndexName , u64 > , Token ) > {
174+ let mut metrics = BTreeMap :: new ( ) ;
175+
176+ let ( to_build, token) = self . needs_backfill ( ) . await ?;
177+ let num_to_build = to_build. len ( ) ;
178+ let index_type = self . index_type_name ( ) ;
179+ if num_to_build > 0 {
180+ tracing:: info!( "{num_to_build} {index_type} indexes to build" ) ;
181+ }
182+
183+ #[ cfg( any( test, feature = "testing" ) ) ]
184+ if let Some ( pause_client) = & mut self . pause_client {
185+ pause_client. wait ( FLUSH_RUNNING_LABEL ) . await ;
186+ }
187+
188+ for job in to_build {
189+ let index_name = job. index_name . clone ( ) ;
190+ let num_documents_indexed = self . build_one ( job, self . build_args . clone ( ) ) . await ?;
191+ metrics. insert ( index_name, num_documents_indexed) ;
192+ }
193+
194+ if num_to_build > 0 {
195+ tracing:: info!( "built {num_to_build} {index_type} indexes" ) ;
129196 }
197+
198+ Ok ( ( metrics, token) )
199+ }
200+
201+ pub ( crate ) async fn build_one (
202+ & self ,
203+ job : IndexBuild < T :: IndexType > ,
204+ build_args : <T :: IndexType as SearchIndex >:: BuildIndexArgs ,
205+ ) -> anyhow:: Result < u64 > {
206+ let timer = crate :: metrics:: vector:: build_one_timer ( ) ;
207+
208+ let result = self . build_multipart_segment ( & job, build_args) . await ?;
209+ tracing:: debug!(
210+ "Built a {} segment for: {result:#?}" ,
211+ self . index_type_name( )
212+ ) ;
213+
214+ let SearchIndexWriteResult {
215+ index_stats,
216+ new_segment_stats,
217+ per_segment_stats,
218+ } = self . writer . commit_flush ( & job, result) . await ?;
219+
220+ let new_segment_stats = new_segment_stats. unwrap_or_default ( ) ;
221+ log_documents_per_new_search_segment ( new_segment_stats. num_documents ( ) , self . search_type ) ;
222+
223+ per_segment_stats. into_iter ( ) . for_each ( |stats| {
224+ log_documents_per_search_segment ( stats. num_documents ( ) , self . search_type ) ;
225+ log_non_deleted_documents_per_search_segment (
226+ stats. num_non_deleted_documents ( ) ,
227+ self . search_type ,
228+ ) ;
229+ } ) ;
230+
231+ log_documents_per_search_index ( index_stats. num_documents ( ) , self . search_type ) ;
232+ log_non_deleted_documents_per_search_index (
233+ index_stats. num_non_deleted_documents ( ) ,
234+ self . search_type ,
235+ ) ;
236+ timer. finish ( ) ;
237+
238+ Ok ( new_segment_stats. num_documents ( ) )
130239 }
131240
132241 /// Compute the set of indexes that need to be backfilled.
@@ -351,7 +460,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
351460 }
352461
353462 async fn build_multipart_segment_on_thread (
354- params : Params < RT > ,
463+ params : Params < RT , T :: IndexType > ,
355464 rate_limit_pages_per_second : NonZeroU32 ,
356465 index_name : TabletIndexName ,
357466 by_id : IndexId ,
0 commit comments