@@ -4,6 +4,7 @@ use std::{
44 BTreeMap ,
55 BTreeSet ,
66 } ,
7+ ops:: Deref ,
78 sync:: Arc ,
89} ;
910
@@ -16,6 +17,7 @@ use common::{
1617 IndexKeyBytes ,
1718 } ,
1819 interval:: Interval ,
20+ knobs:: DOCUMENTS_IN_MEMORY ,
1921 pause:: PauseClient ,
2022 persistence:: {
2123 new_static_repeatable_recent,
@@ -50,6 +52,7 @@ use futures::{
5052use futures_async_stream:: try_stream;
5153use value:: {
5254 InternalDocumentId ,
55+ InternalId ,
5356 TableId ,
5457} ;
5558
@@ -175,7 +178,7 @@ impl<RT: Runtime> TableIterator<RT> {
175178 // (b) have key > cursor.
176179 // We insert skipped documents into future pages of the index walk when we get
177180 // to them.
178- let mut skipped_keys = BTreeMap :: new ( ) ;
181+ let mut skipped_keys = IterationDocuments :: default ( ) ;
179182
180183 loop {
181184 self . pause_client . wait ( "before_index_page" ) . await ;
@@ -196,25 +199,23 @@ impl<RT: Runtime> TableIterator<RT> {
196199 let page: BTreeMap < _ , _ > = page
197200 . into_iter ( )
198201 . filter ( |( _, ts, _) | * ts <= * self . snapshot_ts )
199- . map ( |( index_key, ts, doc) | ( index_key, ( ts, doc) ) )
202+ . map ( |( index_key, ts, doc) | ( index_key, ( ts, IterationDocument :: Full ( doc) ) ) )
200203 . collect ( ) ;
201204
202205 // 2. Find any keys for documents that were skipped by this
203206 // page or will be skipped by future pages.
204207 // These documents are returned with index keys and revisions as
205208 // they existed at snapshot_ts.
206- skipped_keys. extend (
207- self . fetch_skipped_keys (
208- table_id,
209- & indexed_fields,
210- page_start. as_ref ( ) ,
211- * end_ts,
212- new_end_ts,
213- rate_limiter,
214- )
215- . await ?
216- . into_iter ( ) ,
217- ) ;
209+ self . fetch_skipped_keys (
210+ table_id,
211+ & indexed_fields,
212+ page_start. as_ref ( ) ,
213+ * end_ts,
214+ new_end_ts,
215+ rate_limiter,
216+ & mut skipped_keys,
217+ )
218+ . await ?;
218219 if let Some ( ( first_skipped_key, _) ) = skipped_keys. iter ( ) . next ( ) {
219220 // Check all skipped ids are after the old cursor,
220221 // which ensures the yielded output is in index key order.
@@ -225,59 +226,44 @@ impl<RT: Runtime> TableIterator<RT> {
225226 // the current page.
226227 let page_skipped_keys = {
227228 let mut page_skipped_keys = BTreeMap :: new ( ) ;
228- while let Some ( first_skipped_key) = skipped_keys. first_entry ( )
229- && cursor_has_walked ( Some ( page_end) , first_skipped_key. key ( ) )
229+ while let Some ( first_skipped_key) = skipped_keys. keys ( ) . next ( )
230+ && cursor_has_walked ( Some ( page_end) , first_skipped_key)
230231 {
231- let ( key, value) = first_skipped_key. remove_entry ( ) ;
232+ let ( key, value) = skipped_keys
233+ . remove ( & first_skipped_key. clone ( ) )
234+ . context ( "skipped_keys should be nonempty" ) ?;
232235 page_skipped_keys. insert ( key, value) ;
233236 }
234237 page_skipped_keys
235238 } ;
236- // Note we already fetched these documents earlier when calculating
237- // skipped_keys, but we would rather not hold them in memory. Since
238- // skipped documents are relatively rare, an extra fetch
239- // occasionally is worth it for the memory savings.
240- let page_skipped_docs: BTreeMap < _ , _ > = self
241- . load_revisions_at_snapshot_ts ( stream:: iter (
242- page_skipped_keys. into_values ( ) . map ( Ok ) ,
243- ) )
244- . map_ok ( |( doc, ts) | {
245- (
246- doc. index_key ( & indexed_fields, self . persistence . version ( ) )
247- . into_bytes ( ) ,
248- ( ts, doc) ,
249- )
250- } )
251- . try_collect ( )
252- . await ?;
253239 // Merge index walk and skipped keys into BTreeMap, which sorts by index key.
254- let merged_page: BTreeMap < _ , _ > = page. into_iter ( ) . chain ( page_skipped_docs) . collect ( ) ;
240+ let merged_page =
241+ IterationDocuments :: new ( page. into_iter ( ) . chain ( page_skipped_keys) . collect ( ) ) ;
255242
256243 // Sanity check output.
257244 let all_ids: BTreeSet < _ > = merged_page
258- . iter ( )
259- . map ( |( _, ( _ , doc) ) | doc. id ( ) . internal_id ( ) )
245+ . values ( )
246+ . map ( |( _, doc) | doc. internal_id ( ) )
260247 . collect ( ) ;
261248 anyhow:: ensure!(
262249 all_ids. len( ) == merged_page. len( ) ,
263250 "duplicate id in table iterator {merged_page:?}"
264251 ) ;
265252 anyhow:: ensure!(
266- merged_page
267- . iter( )
268- . all( |( _, ( ts, _) ) | * ts <= * self . snapshot_ts) ,
253+ merged_page. values( ) . all( |( ts, _) | * ts <= * self . snapshot_ts) ,
269254 "document after snapshot in table iterator {merged_page:?}"
270255 ) ;
271256 anyhow:: ensure!(
272- merged_page. iter ( ) . all( |( key, _ ) | {
257+ merged_page. keys ( ) . all( |key| {
273258 !cursor_has_walked( page_start. as_ref( ) , key)
274259 && cursor_has_walked( Some ( page_end) , key)
275260 } ) ,
276261 "document outside page in table iterator {merged_page:?}"
277262 ) ;
278263
279- for ( key, ( revision_ts, revision) ) in merged_page {
280- yield ( key, revision_ts, revision) ;
264+ let mut merged_page_docs = self . reload_revisions_at_snapshot_ts ( merged_page) ;
265+ while let Some ( ( key, ts, doc) ) = merged_page_docs. try_next ( ) . await ? {
266+ yield ( key, ts, doc) ;
281267 }
282268 if matches ! ( page_end, CursorPosition :: End ) {
283269 // If we are done, all skipped_keys would be put in this final page.
@@ -301,22 +287,22 @@ impl<RT: Runtime> TableIterator<RT> {
301287 start_ts : Timestamp ,
302288 end_ts : RepeatableTimestamp ,
303289 rate_limiter : & RateLimiter < RT > ,
304- ) -> anyhow:: Result < BTreeMap < IndexKeyBytes , InternalDocumentId > > {
290+ output : & mut IterationDocuments ,
291+ ) -> anyhow:: Result < ( ) > {
305292 let reader = self . persistence . clone ( ) ;
306293 let persistence_version = reader. version ( ) ;
307294 let skipped_revs = self . walk_document_log ( table_id, start_ts, end_ts, rate_limiter) ;
308295 let revisions_at_snapshot = self . load_revisions_at_snapshot_ts ( skipped_revs) ;
309296 pin_mut ! ( revisions_at_snapshot) ;
310- let mut skipped_keys = BTreeMap :: new ( ) ;
311- while let Some ( ( doc, _) ) = revisions_at_snapshot. try_next ( ) . await ? {
297+ while let Some ( ( doc, ts) ) = revisions_at_snapshot. try_next ( ) . await ? {
312298 let index_key = doc
313299 . index_key ( indexed_fields, persistence_version)
314300 . into_bytes ( ) ;
315301 if !cursor_has_walked ( lower_bound, & index_key) {
316- skipped_keys . insert ( index_key, doc. id_with_table_id ( ) ) ;
302+ output . insert ( index_key, ts , doc) ;
317303 }
318304 }
319- Ok ( skipped_keys )
305+ Ok ( ( ) )
320306 }
321307
322308 #[ try_stream( ok = InternalDocumentId , error = anyhow:: Error ) ]
@@ -403,8 +389,9 @@ impl<RT: Runtime> TableIterator<RT> {
403389 Ok ( ( documents_in_page, ts) )
404390 }
405391
406- // Load the revisions of documents visible at `self.snapshot_ts`, skipping
407- // documents that don't exist then.
392+ /// Load the revisions of documents visible at `self.snapshot_ts`.
393+ /// Documents are yielded in the same order as input, skipping duplicates
394+ /// and documents that didn't exist at the snapshot.
408395 #[ try_stream( ok = ( ResolvedDocument , Timestamp ) , error = anyhow:: Error ) ]
409396 async fn load_revisions_at_snapshot_ts < ' a > (
410397 & ' a self ,
@@ -425,16 +412,138 @@ impl<RT: Runtime> TableIterator<RT> {
425412 pin_mut ! ( id_chunks) ;
426413
427414 while let Some ( chunk) = id_chunks. try_next ( ) . await ? {
428- let ids_to_load = chunk. into_iter ( ) . map ( |id| ( id, ts_succ) ) . collect ( ) ;
429- let old_revisions = repeatable_persistence
415+ let ids_to_load = chunk. iter ( ) . map ( |id| ( * id, ts_succ) ) . collect ( ) ;
416+ let mut old_revisions = repeatable_persistence
430417 . previous_revisions ( ids_to_load)
431418 . await ?;
432- for ( _, ( revision_ts, revision) ) in old_revisions. into_iter ( ) {
433- if let Some ( revision) = revision {
419+ // Yield in the same order as the input, skipping duplicates and
420+ // missing documents.
421+ for id in chunk {
422+ if let Some ( ( revision_ts, Some ( revision) ) ) = old_revisions. remove ( & ( id, ts_succ) ) {
434423 yield ( revision, revision_ts) ;
435- }
424+ } ;
425+ }
426+ }
427+ }
428+
429+ #[ try_stream( boxed, ok = ( IndexKeyBytes , Timestamp , ResolvedDocument ) , error = anyhow:: Error ) ]
430+ async fn load_index_entries_at_snapshot_ts (
431+ & self ,
432+ entries : Vec < ( InternalDocumentId , IndexKeyBytes ) > ,
433+ ) {
434+ let ids: Vec < _ > = entries. iter ( ) . map ( |( id, _) | * id) . collect ( ) ;
435+ let mut key_by_id: BTreeMap < _ , _ > = entries. into_iter ( ) . collect ( ) ;
436+ let revisions = self . load_revisions_at_snapshot_ts ( stream:: iter ( ids. into_iter ( ) . map ( Ok ) ) ) ;
437+ pin_mut ! ( revisions) ;
438+ while let Some ( ( doc, ts) ) = revisions. try_next ( ) . await ? {
439+ let key = key_by_id
440+ . remove ( & doc. id_with_table_id ( ) )
441+ . context ( "key_by_id missing" ) ?;
442+ yield ( key, ts, doc) ;
443+ }
444+ }
445+
446+ /// Like `load_revisions_at_snapshot_ts` but doesn't need to fetch
447+ /// if the IterationDocument has the Full document.
448+ #[ try_stream( boxed, ok = ( IndexKeyBytes , Timestamp , ResolvedDocument ) , error = anyhow:: Error ) ]
449+ async fn reload_revisions_at_snapshot_ts ( & self , documents : IterationDocuments ) {
450+ let mut current_batch = Vec :: new ( ) ;
451+ for ( key, ( ts, doc) ) in documents. into_iter ( ) {
452+ match doc {
453+ IterationDocument :: Full ( doc) => {
454+ let mut flush = self . load_index_entries_at_snapshot_ts ( current_batch) ;
455+ while let Some ( ( key, ts, doc) ) = flush. try_next ( ) . await ? {
456+ yield ( key, ts, doc) ;
457+ }
458+ current_batch = Vec :: new ( ) ;
459+ yield ( key, ts, doc) ;
460+ } ,
461+ IterationDocument :: Id ( id) => {
462+ current_batch. push ( ( id, key) ) ;
463+ } ,
436464 }
437465 }
466+ let mut flush = self . load_index_entries_at_snapshot_ts ( current_batch) ;
467+ while let Some ( ( key, ts, doc) ) = flush. try_next ( ) . await ? {
468+ yield ( key, ts, doc) ;
469+ }
470+ }
471+ }
472+
473+ #[ derive( Debug ) ]
474+ enum IterationDocument {
475+ Full ( ResolvedDocument ) ,
476+ Id ( InternalDocumentId ) ,
477+ }
478+
479+ impl IterationDocument {
480+ fn internal_id ( & self ) -> InternalId {
481+ match self {
482+ Self :: Full ( doc) => doc. internal_id ( ) ,
483+ Self :: Id ( id) => id. internal_id ( ) ,
484+ }
485+ }
486+ }
487+
488+ /// To avoid storing too many documents in memory, we evict the document values,
489+ /// leaving only the IDs.
490+ #[ derive( Default , Debug ) ]
491+ struct IterationDocuments {
492+ count_full : usize ,
493+ docs : BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ,
494+ }
495+
496+ impl IterationDocuments {
497+ fn new ( docs : BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ) -> Self {
498+ Self {
499+ count_full : docs
500+ . values ( )
501+ . filter ( |( _, doc) | matches ! ( doc, IterationDocument :: Full ( _) ) )
502+ . count ( ) ,
503+ docs,
504+ }
505+ }
506+
507+ fn insert ( & mut self , index_key : IndexKeyBytes , ts : Timestamp , doc : ResolvedDocument ) {
508+ if self . count_full < * DOCUMENTS_IN_MEMORY {
509+ self . docs
510+ . insert ( index_key, ( ts, IterationDocument :: Full ( doc) ) ) ;
511+ self . count_full += 1 ;
512+ } else {
513+ self . docs . insert (
514+ index_key,
515+ ( ts, IterationDocument :: Id ( doc. id_with_table_id ( ) ) ) ,
516+ ) ;
517+ }
518+ }
519+
520+ fn remove (
521+ & mut self ,
522+ index_key : & IndexKeyBytes ,
523+ ) -> Option < ( IndexKeyBytes , ( Timestamp , IterationDocument ) ) > {
524+ let removed = self . docs . remove_entry ( index_key) ;
525+ if let Some ( ( _, ( _, IterationDocument :: Full ( _) ) ) ) = & removed {
526+ self . count_full -= 1 ;
527+ }
528+ removed
529+ }
530+ }
531+
532+ impl IntoIterator for IterationDocuments {
533+ type IntoIter =
534+ <BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > as IntoIterator >:: IntoIter ;
535+ type Item = ( IndexKeyBytes , ( Timestamp , IterationDocument ) ) ;
536+
537+ fn into_iter ( self ) -> Self :: IntoIter {
538+ self . docs . into_iter ( )
539+ }
540+ }
541+
542+ impl Deref for IterationDocuments {
543+ type Target = BTreeMap < IndexKeyBytes , ( Timestamp , IterationDocument ) > ;
544+
545+ fn deref ( & self ) -> & Self :: Target {
546+ & self . docs
438547 }
439548}
440549
0 commit comments