@@ -17,7 +17,10 @@ use common::{
1717 IndexKeyBytes ,
1818 } ,
1919 interval:: Interval ,
20- knobs:: DOCUMENTS_IN_MEMORY ,
20+ knobs:: {
21+ DOCUMENTS_IN_MEMORY ,
22+ TABLE_ITERATOR_MAX_RETRIES ,
23+ } ,
2124 persistence:: {
2225 new_static_repeatable_recent,
2326 DocumentLogEntry ,
@@ -41,6 +44,7 @@ use common::{
4144 } ,
4245 value:: ResolvedDocumentId ,
4346} ;
47+ use errors:: ErrorMetadataAnyhowExt ;
4448use futures:: {
4549 pin_mut,
4650 stream,
@@ -563,27 +567,37 @@ impl<RT: Runtime> TableIteratorInner<RT> {
563567 tablet_id : TabletId ,
564568 cursor : & mut TableScanCursor ,
565569 ) -> anyhow:: Result < ( Vec < ( IndexKeyBytes , LatestDocument ) > , RepeatableTimestamp ) > {
566- let ts = self . new_ts ( ) . await ?;
567- let repeatable_persistence = RepeatablePersistence :: new (
568- self . persistence . clone ( ) ,
569- ts,
570- self . retention_validator . clone ( ) ,
571- ) ;
572- let reader = repeatable_persistence. read_snapshot ( ts) ?;
573- let stream = reader. index_scan (
574- index_id,
575- tablet_id,
576- & cursor. interval ( ) ,
577- Order :: Asc ,
578- self . page_size ,
579- ) ;
580- let documents_in_page: Vec < _ > = stream. take ( self . page_size ) . try_collect ( ) . await ?;
581- if documents_in_page. len ( ) < self . page_size {
582- cursor. advance ( CursorPosition :: End ) ?;
583- } else if let Some ( ( index_key, ..) ) = documents_in_page. last ( ) {
584- cursor. advance ( CursorPosition :: After ( index_key. clone ( ) ) ) ?;
570+ for attempt in 0 .. {
571+ let ts = self . new_ts ( ) . await ?;
572+ let repeatable_persistence = RepeatablePersistence :: new (
573+ self . persistence . clone ( ) ,
574+ ts,
575+ self . retention_validator . clone ( ) ,
576+ ) ;
577+ let reader = repeatable_persistence. read_snapshot ( ts) ?;
578+ let stream = reader. index_scan (
579+ index_id,
580+ tablet_id,
581+ & cursor. interval ( ) ,
582+ Order :: Asc ,
583+ self . page_size ,
584+ ) ;
585+ let documents_in_page: Vec < _ > = match stream. take ( self . page_size ) . try_collect ( ) . await {
586+ Ok ( docs) => docs,
587+ Err ( e) if attempt < * TABLE_ITERATOR_MAX_RETRIES && e. is_out_of_retention ( ) => {
588+ tracing:: warn!( "TableIterator hit out-of-retention error {e}, retrying..." ) ;
589+ continue ;
590+ } ,
591+ Err ( e) => return Err ( e) ,
592+ } ;
593+ if documents_in_page. len ( ) < self . page_size {
594+ cursor. advance ( CursorPosition :: End ) ?;
595+ } else if let Some ( ( index_key, ..) ) = documents_in_page. last ( ) {
596+ cursor. advance ( CursorPosition :: After ( index_key. clone ( ) ) ) ?;
597+ }
598+ return Ok ( ( documents_in_page, ts) ) ;
585599 }
586- Ok ( ( documents_in_page , ts ) )
600+ unreachable ! ( )
587601 }
588602
589603 /// Load the revisions of documents visible at `self.snapshot_ts`.
0 commit comments