11use std:: {
22 collections:: BTreeMap ,
33 fmt:: Display ,
4- num:: NonZeroU32 ,
54 sync:: {
5+ atomic:: {
6+ AtomicU64 ,
7+ Ordering ,
8+ } ,
69 Arc ,
7- LazyLock ,
810 } ,
911 time:: Duration ,
1012} ;
1113
12- use cmd_util:: env:: env_config;
1314use common:: {
1415 self ,
1516 bootstrap_model:: index:: database_index:: IndexedFields ,
1617 knobs:: {
1718 INDEX_BACKFILL_CHUNK_RATE ,
1819 INDEX_BACKFILL_CHUNK_SIZE ,
20+ INDEX_BACKFILL_PROGRESS_INTERVAL ,
1921 INDEX_BACKFILL_WORKERS ,
2022 } ,
2123 persistence:: {
@@ -45,11 +47,10 @@ use common::{
4547 value:: TabletId ,
4648} ;
4749use futures:: {
50+ future,
4851 pin_mut,
49- stream:: {
50- self ,
51- FusedStream ,
52- } ,
52+ stream,
53+ Stream ,
5354 StreamExt ,
5455 TryStreamExt ,
5556} ;
@@ -68,18 +69,6 @@ use crate::{
6869
6970pub const PERFORM_BACKFILL_LABEL : & str = "perform_backfill" ;
7071
71- static ENTRIES_PER_SECOND : LazyLock < NonZeroU32 > = LazyLock :: new ( || {
72- NonZeroU32 :: new (
73- ( * INDEX_BACKFILL_CHUNK_RATE * * INDEX_BACKFILL_CHUNK_SIZE )
74- . try_into ( )
75- . unwrap ( ) ,
76- )
77- . unwrap ( )
78- } ) ;
79-
80- static INDEX_WORKER_SLEEP_TIME : LazyLock < Duration > =
81- LazyLock :: new ( || Duration :: from_millis ( env_config ( "INDEX_WORKER_SLEEP_TIME_MS" , 0 ) ) ) ;
82-
8372#[ derive( Clone ) ]
8473pub enum IndexSelector {
8574 All ( IndexRegistry ) ,
@@ -175,8 +164,10 @@ impl<RT: Runtime> IndexWriter<RT> {
175164 retention_validator : Arc < dyn RetentionValidator > ,
176165 runtime : RT ,
177166 ) -> Self {
167+ let entries_per_second =
168+ INDEX_BACKFILL_CHUNK_RATE . saturating_mul ( * INDEX_BACKFILL_CHUNK_SIZE ) ;
178169 debug_assert ! (
179- ENTRIES_PER_SECOND . get ( ) >= * INDEX_BACKFILL_CHUNK_SIZE as u32 ,
170+ entries_per_second >= * INDEX_BACKFILL_CHUNK_SIZE ,
180171 "Entries per second must be at least {}" ,
181172 * INDEX_BACKFILL_CHUNK_SIZE
182173 ) ;
@@ -186,7 +177,7 @@ impl<RT: Runtime> IndexWriter<RT> {
186177 retention_validator,
187178 rate_limiter : Arc :: new ( new_rate_limiter (
188179 runtime. clone ( ) ,
189- Quota :: per_second ( * ENTRIES_PER_SECOND ) ,
180+ Quota :: per_second ( entries_per_second ) ,
190181 ) ) ,
191182 runtime,
192183 }
@@ -296,74 +287,78 @@ impl<RT: Runtime> IndexWriter<RT> {
296287 snapshot_ts,
297288 self . reader . clone ( ) ,
298289 self . retention_validator . clone ( ) ,
299- * INDEX_BACKFILL_CHUNK_SIZE ,
290+ INDEX_BACKFILL_CHUNK_SIZE . get ( ) as usize ,
300291 ) ;
301292
302- let by_id = index_registry. must_get_by_id ( tablet_id) ?. id ( ) ;
303- let stream = table_iterator
304- . stream_documents_in_table ( tablet_id, by_id, None )
305- . fuse ( ) ;
306- pin_mut ! ( stream) ;
307- let mut index_updates_written = 0 ;
308- let mut last_logged = self . runtime . system_time ( ) ;
309- let mut last_logged_count = 0 ;
310- while !stream. is_done ( ) {
311- if !INDEX_WORKER_SLEEP_TIME . is_zero ( ) {
312- tokio:: time:: sleep ( * INDEX_WORKER_SLEEP_TIME ) . await ;
313- }
314- // Number of documents in the table that have been indexed in this iteration
315- let mut num_docs_indexed = 0u64 ;
316- let mut chunk = Vec :: new ( ) ;
317- while chunk. len ( ) < * INDEX_BACKFILL_CHUNK_SIZE {
293+ let ( index_update_tx, index_update_rx) = mpsc:: channel ( 32 ) ;
294+ let num_docs_indexed = AtomicU64 :: new ( 0 ) ;
295+ let producer = async {
296+ let by_id = index_registry. must_get_by_id ( tablet_id) ?. id ( ) ;
297+ let mut stream =
298+ std:: pin:: pin!( table_iterator. stream_documents_in_table( tablet_id, by_id, None ) ) ;
299+ while let Some ( item) = stream. try_next ( ) . await ? {
318300 let LatestDocument {
319301 ts,
320302 value : document,
321303 ..
322- } = match stream. try_next ( ) . await ? {
323- Some ( d) => d,
324- None => break ,
325- } ;
326- num_docs_indexed += 1 ;
327- let index_updates = index_registry. index_updates ( None , Some ( & document) ) ;
328- chunk. extend (
329- index_updates
330- . into_iter ( )
331- . filter ( |update| index_selector. filter_index_update ( update) )
332- . map ( |update| PersistenceIndexEntry :: from_index_update ( ts, & update) ) ,
304+ } = item;
305+ num_docs_indexed. store (
306+ num_docs_indexed. load ( Ordering :: Relaxed ) + 1 ,
307+ Ordering :: Relaxed ,
333308 ) ;
309+ let index_updates = index_registry. index_updates ( None , Some ( & document) ) ;
310+ for index_update in index_updates
311+ . into_iter ( )
312+ . filter ( |update| index_selector. filter_index_update ( update) )
313+ {
314+ _ = index_update_tx. send ( ( ts, index_update) ) . await ;
315+ }
334316 }
335- if !chunk. is_empty ( ) {
336- index_updates_written += chunk. len ( ) ;
337- self . persistence
338- . write ( & [ ] , & chunk, ConflictStrategy :: Overwrite )
339- . await ?;
340- if let Some ( db) = & database {
341- let mut tx = db. begin_system ( ) . await ?;
342- let mut model = IndexBackfillModel :: new ( & mut tx) ;
343- for index_id in index_selector. index_ids ( ) {
344- model
345- . update_index_backfill_progress ( index_id, tablet_id, num_docs_indexed)
317+ drop ( index_update_tx) ;
318+ Ok ( ( ) )
319+ } ;
320+
321+ let consumer = self . write_index_entries (
322+ format ! ( "for table {tablet_id} at snapshot {snapshot_ts}" ) ,
323+ ReceiverStream :: new ( index_update_rx) ,
324+ index_selector,
325+ false , /* deduplicate */
326+ ) ;
327+ let report_progress = async {
328+ if let Some ( db) = database {
329+ let mut last_num_docs_indexed = 0 ;
330+ loop {
331+ let num_docs_indexed = num_docs_indexed. load ( Ordering :: Relaxed ) ;
332+ if num_docs_indexed > last_num_docs_indexed {
333+ let mut tx = db. begin_system ( ) . await ?;
334+ let mut model = IndexBackfillModel :: new ( & mut tx) ;
335+ for index_id in index_selector. index_ids ( ) {
336+ model
337+ . update_index_backfill_progress (
338+ index_id,
339+ tablet_id,
340+ num_docs_indexed - last_num_docs_indexed,
341+ )
342+ . await ?;
343+ }
344+ db. commit_with_write_source ( tx, "index_worker_backfill_progress" )
346345 . await ?;
346+ last_num_docs_indexed = num_docs_indexed;
347347 }
348- db. commit_with_write_source ( tx, "index_worker_backfill_progress" )
349- . await ?;
348+ self . runtime . wait ( * INDEX_BACKFILL_PROGRESS_INTERVAL ) . await ;
350349 }
350+ } else {
351+ future:: pending :: < anyhow:: Result < !> > ( ) . await
351352 }
352- if last_logged. elapsed ( ) ? >= Duration :: from_secs ( 60 ) {
353- tracing:: info!(
354- "backfilled {index_updates_written} index rows for table {tablet_id} at \
355- snapshot {snapshot_ts} ({} rows/s)",
356- ( index_updates_written - last_logged_count) as f64
357- / last_logged. elapsed( ) ?. as_secs_f64( )
358- ) ;
359- last_logged = self . runtime . system_time ( ) ;
360- last_logged_count = index_updates_written;
353+ } ;
354+ tokio:: select! {
355+ r = future:: try_join( producer, consumer) => {
356+ let ( ( ) , ( ) ) = r?;
357+ } ,
358+ r = report_progress => {
359+ r?
361360 }
362361 }
363- tracing:: info!(
364- "backfilled {index_updates_written} index rows for table {tablet_id} at snapshot \
365- {snapshot_ts}"
366- ) ;
367362 Ok ( ( ) )
368363 }
369364
@@ -415,7 +410,12 @@ impl<RT: Runtime> IndexWriter<RT> {
415410 drop ( tx) ;
416411 Ok ( ( ) )
417412 } ;
418- let consumer = self . write_index_entries ( ReceiverStream :: new ( rx) . fuse ( ) , index_selector) ;
413+ let consumer = self . write_index_entries (
414+ format ! ( "going forward from {start_ts} to {end_ts}" ) ,
415+ ReceiverStream :: new ( rx) ,
416+ index_selector,
417+ false , /* deduplicate */
418+ ) ;
419419
420420 // Consider ourselves successful if both the producer and consumer exit
421421 // successfully.
@@ -515,7 +515,12 @@ impl<RT: Runtime> IndexWriter<RT> {
515515 Ok ( end_ts)
516516 } ;
517517
518- let consumer = self . write_index_entries ( ReceiverStream :: new ( rx) . fuse ( ) , index_selector) ;
518+ let consumer = self . write_index_entries (
519+ format ! ( "going backward from {start_ts} to {end_ts}" ) ,
520+ ReceiverStream :: new ( rx) ,
521+ index_selector,
522+ true , /* deduplicate */
523+ ) ;
519524
520525 // Consider ourselves successful if both the reader and writer exit
521526 // successfully.
@@ -525,12 +530,13 @@ impl<RT: Runtime> IndexWriter<RT> {
525530
526531 async fn write_index_entries (
527532 & self ,
528- updates : impl FusedStream < Item = ( Timestamp , DatabaseIndexUpdate ) > ,
533+ phase : String ,
534+ updates : impl Stream < Item = ( Timestamp , DatabaseIndexUpdate ) > ,
529535 index_selector : & IndexSelector ,
536+ deduplicate : bool ,
530537 ) -> anyhow:: Result < ( ) > {
531- futures:: pin_mut!( updates) ;
532-
533538 let mut last_logged = self . runtime . system_time ( ) ;
539+ let mut last_logged_entries_written = 0 ;
534540 let mut num_entries_written = 0 ;
535541
536542 let updates = updates
@@ -544,7 +550,7 @@ impl<RT: Runtime> IndexWriter<RT> {
544550 } ;
545551 futures:: future:: ready ( result)
546552 } )
547- . chunks ( * INDEX_BACKFILL_CHUNK_SIZE )
553+ . chunks ( INDEX_BACKFILL_CHUNK_SIZE . get ( ) as usize )
548554 . map ( |mut chunk| async move {
549555 let persistence = self . persistence . clone ( ) ;
550556 let rate_limiter = self . rate_limiter . clone ( ) ;
@@ -560,10 +566,12 @@ impl<RT: Runtime> IndexWriter<RT> {
560566 let delay = not_until. wait_time_from ( self . runtime . monotonic_now ( ) . into ( ) ) ;
561567 self . runtime . wait ( delay) . await ;
562568 }
563- // There might be duplicates in the index entries from the
564- // backfill backwards algorithm that need to be deduped.
565- chunk. sort_unstable ( ) ;
566- chunk. dedup ( ) ;
569+ if deduplicate {
570+ // There might be duplicates in the index entries from the
571+ // backfill backwards algorithm that need to be deduped.
572+ chunk. sort_unstable ( ) ;
573+ chunk. dedup ( ) ;
574+ }
567575 persistence
568576 . write ( & [ ] , & chunk, ConflictStrategy :: Overwrite )
569577 . await ?;
@@ -576,15 +584,21 @@ impl<RT: Runtime> IndexWriter<RT> {
576584 let entries_written = result?;
577585 num_entries_written += entries_written;
578586 if last_logged. elapsed ( ) ? >= Duration :: from_secs ( 60 ) {
587+ let now = self . runtime . system_time ( ) ;
579588 tracing:: info!(
580- "Backfilled {} index entries of index {}" ,
581- num_entries_written,
582- index_selector,
589+ "Backfilled {num_entries_written} rows of index {index_selector} {phase} ({} \
590+ rows/s)",
591+ ( num_entries_written - last_logged_entries_written) as f64
592+ / ( now. duration_since( last_logged) . unwrap_or_default( ) ) . as_secs_f64( ) ,
583593 ) ;
584- last_logged = self . runtime . system_time ( ) ;
594+ last_logged = now;
595+ last_logged_entries_written = num_entries_written;
585596 }
586597 }
587598
599+ tracing:: info!(
600+ "Done backfilling {num_entries_written} rows of index {index_selector} {phase}" ,
601+ ) ;
588602 Ok ( ( ) )
589603 }
590604
0 commit comments