@@ -48,6 +48,12 @@ impl Default for SourceRowIndexingState {
4848struct SourceIndexingState {
4949 rows : HashMap < value:: KeyValue , SourceRowIndexingState > ,
5050 scan_generation : usize ,
51+
52+ // Set of rows to retry.
53+ // It's for sources that we don't proactively scan all input rows during refresh.
54+ // We need to maintain a list of row keys failed in last processing, to retry them later.
55+ // It's `None` if we don't need this mechanism for failure retry.
56+ rows_to_retry : Option < HashSet < value:: KeyValue > > ,
5157}
5258
5359pub struct SourceIndexingContext {
@@ -113,6 +119,10 @@ impl<'a> LocalSourceRowStateOperator<'a> {
113119 let mut state = self . indexing_state . lock ( ) . unwrap ( ) ;
114120 let touched_generation = state. scan_generation ;
115121
122+ if let Some ( rows_to_retry) = & mut state. rows_to_retry {
123+ rows_to_retry. remove ( self . key ) ;
124+ }
125+
116126 if self . last_source_version == Some ( source_version) {
117127 return Ok ( RowStateAdvanceOutcome :: Noop ) ;
118128 }
@@ -211,6 +221,9 @@ impl<'a> LocalSourceRowStateOperator<'a> {
211221 } else {
212222 indexing_state. rows . remove ( self . key ) ;
213223 }
224+ if let Some ( rows_to_retry) = & mut indexing_state. rows_to_retry {
225+ rows_to_retry. insert ( self . key . clone ( ) ) ;
226+ }
214227 }
215228}
216229
@@ -244,6 +257,7 @@ impl SourceIndexingContext {
244257 let import_op = & plan. import_ops [ source_idx] ;
245258 let mut list_state = db_tracking:: ListTrackedSourceKeyMetadataState :: new ( ) ;
246259 let mut rows = HashMap :: new ( ) ;
260+ let mut rows_to_retry: Option < HashSet < value:: KeyValue > > = None ;
247261 let scan_generation = 0 ;
248262 {
249263 let mut key_metadata_stream = list_state. list (
@@ -257,6 +271,11 @@ impl SourceIndexingContext {
257271 key_metadata. source_key ,
258272 & import_op. primary_key_schema ,
259273 ) ?;
274+ if let Some ( rows_to_retry) = & mut rows_to_retry {
275+ if key_metadata. max_process_ordinal > key_metadata. process_ordinal {
276+ rows_to_retry. insert ( source_pk. clone ( ) ) ;
277+ }
278+ }
260279 rows. insert (
261280 source_pk,
262281 SourceRowIndexingState {
@@ -280,6 +299,7 @@ impl SourceIndexingContext {
280299 state : Mutex :: new ( SourceIndexingState {
281300 rows,
282301 scan_generation,
302+ rows_to_retry,
283303 } ) ,
284304 pending_update : Mutex :: new ( None ) ,
285305 update_sem : Semaphore :: new ( 1 ) ,
0 commit comments