@@ -192,87 +192,6 @@ impl<RT: Runtime> IndexWriter<RT> {
192192 }
193193 }
194194
195- /// Backfill in two steps: first a snapshot at the current time, and then
196- /// walking the log. After the current snapshot is backfilled, index
197- /// snapshot reads at >=ts are valid. The subsequent walking of the log
198- /// extends the earliest allowed snapshot into the past.
199- ///
200- /// The goal of this backfill is to make snapshot reads of `index_name`
201- /// valid between the range of [retention_cutoff, snapshot_ts].
202- /// To support:
203- /// 1. Latest documents written before retention_cutoff. They are still
204- /// latest at `ts = snapshot_ts`, so we compute and write index entries
205- /// when we walk the `snapshot_ts` snapshot.
206- /// 2. Document changes between retention_cutoff and `snapshot_ts`. These
207- /// are handled by walking the documents log for this range in reverse
208- /// and creating index entries. When walking the documents log we start
209- /// at `snapshot_ts`.
210- /// 3. Documents that were latest as of retention_cutoff but were
211- /// overwritten before `snapshot_ts`. These are handled when walking the
212- /// documents log and finding an overwrite.
213- /// 4. Document changes after `snapshot_ts`. These are handled by active
214- /// writes, assuming `snapshot_ts` is after the index was created. If
215- /// there are no active writes, then `backfill_forwards` must be called
216- /// with a timestamp <= `snapshot_ts`.
217- ///
218- /// Takes a an optional database to update progress on the index backfill
219- pub async fn backfill_from_retention_cutoff (
220- & self ,
221- snapshot_ts : RepeatableTimestamp ,
222- index_metadata : & IndexRegistry ,
223- index_selector : IndexSelector ,
224- concurrency : usize ,
225- ) -> anyhow:: Result < ( ) > {
226- let pause_client = self . runtime . pause_client ( ) ;
227- pause_client. wait ( PERFORM_BACKFILL_LABEL ) . await ;
228- // Backfill in two steps: first create index entries for all latest documents,
229- // then create index entries for all documents in the retention range.
230-
231- stream:: iter ( index_selector. iterate_tables ( ) . map ( Ok ) )
232- . try_for_each_concurrent ( concurrency, |table_id| {
233- let index_metadata = index_metadata. clone ( ) ;
234- let index_selector = index_selector. clone ( ) ;
235- let self_ = ( * self ) . clone ( ) ;
236- try_join ( "index_backfill_table_snapshot" , async move {
237- self_
238- . backfill_exact_snapshot_of_table (
239- snapshot_ts,
240- & index_selector,
241- & index_metadata,
242- table_id,
243- None ,
244- )
245- . await
246- . map ( |_docs_indexed| ( ) )
247- } )
248- } )
249- . await ?;
250-
251- let mut min_backfilled_ts = snapshot_ts;
252-
253- // Retry until min_snapshot_ts passes min_backfilled_ts, at which point we
254- // have backfilled the full range of snapshots within retention.
255- loop {
256- let min_snapshot_ts = self . retention_validator . min_snapshot_ts ( ) . await ?;
257- if min_snapshot_ts >= min_backfilled_ts {
258- break ;
259- }
260- // NOTE: ordering Desc is important, to keep the range of valid snapshots
261- // contiguous. If we backfilled in order Asc, then we might see a
262- // document creation before its tombstone, and that document would be
263- // visible at snapshots where it should be deleted.
264- min_backfilled_ts = self
265- . backfill_backwards (
266- min_backfilled_ts,
267- * min_snapshot_ts,
268- index_metadata,
269- & index_selector,
270- )
271- . await ?;
272- }
273- Ok ( ( ) )
274- }
275-
276195 /// Backfill indexes based on a snapshot at the current time. After the
277196 /// current snapshot is backfilled, index snapshot reads at >=ts are
278197 /// valid.
@@ -384,7 +303,6 @@ impl<RT: Runtime> IndexWriter<RT> {
384303 index_registry,
385304 ReceiverStream :: new ( index_update_rx) ,
386305 index_selector,
387- false , /* deduplicate */
388306 ) ;
389307 let ( docs_indexed, _) = future:: try_join ( producer, consumer) . await ?;
390308 Ok ( docs_indexed)
@@ -402,7 +320,7 @@ impl<RT: Runtime> IndexWriter<RT> {
402320 /// - `index_selector`: Subset of `index_registry` to backfill.
403321 ///
404322 /// Preconditions:
405- /// - The selected indexes are fully backfilled for all revisions less than
323+ /// - The selected indexes are fully backfilled for all revisions at
406324 /// `start_ts`.
407325 ///
408326 /// Postconditions:
@@ -439,7 +357,6 @@ impl<RT: Runtime> IndexWriter<RT> {
439357 index_registry,
440358 ReceiverStream :: new ( rx) ,
441359 index_selector,
442- false , /* deduplicate */
443360 ) ;
444361
445362 // Consider ourselves successful if both the producer and consumer exit
@@ -553,7 +470,6 @@ impl<RT: Runtime> IndexWriter<RT> {
553470 index_registry,
554471 ReceiverStream :: new ( rx) ,
555472 index_selector,
556- true , /* deduplicate */
557473 ) ;
558474
559475 // Consider ourselves successful if both the reader and writer exit
@@ -568,7 +484,6 @@ impl<RT: Runtime> IndexWriter<RT> {
568484 index_registry : & IndexRegistry ,
569485 revision_pairs : impl Stream < Item = RevisionPair > ,
570486 index_selector : & IndexSelector ,
571- deduplicate : bool ,
572487 ) -> anyhow:: Result < ( ) > {
573488 let mut last_logged = self . runtime . system_time ( ) ;
574489 let mut last_checkpointed = self . runtime . system_time ( ) ;
@@ -591,7 +506,7 @@ impl<RT: Runtime> IndexWriter<RT> {
591506 let cursor = should_send_progress
592507 . then ( || chunk. last ( ) . map ( |revision_pair| revision_pair. id ) )
593508 . flatten ( ) ;
594- let mut index_updates: Vec < PersistenceIndexEntry > = chunk
509+ let index_updates: Vec < PersistenceIndexEntry > = chunk
595510 . iter ( )
596511 . flat_map ( |revision_pair| {
597512 index_registry
@@ -618,12 +533,6 @@ impl<RT: Runtime> IndexWriter<RT> {
618533 let delay = not_until. wait_time_from ( self . runtime . monotonic_now ( ) . into ( ) ) ;
619534 self . runtime . wait ( delay) . await ;
620535 }
621- if deduplicate {
622- // There might be duplicates in the index entries from the
623- // backfill backwards algorithm that need to be deduped.
624- index_updates. sort_unstable ( ) ;
625- index_updates. dedup ( ) ;
626- }
627536 persistence
628537 . write ( & [ ] , & index_updates, ConflictStrategy :: Overwrite )
629538 . await ?;
0 commit comments