@@ -20,18 +20,25 @@ use super::{
2020} ;
2121
2222use crate :: ops:: interface;
23- struct SourceRowIndexingState {
23+
24+ #[ derive( Default ) ]
25+ struct SourceRowVersionState {
2426 source_version : SourceVersion ,
2527 content_version_fp : Option < Vec < u8 > > ,
28+ }
29+ struct SourceRowIndexingState {
30+ version_state : SourceRowVersionState ,
2631 processing_sem : Arc < Semaphore > ,
2732 touched_generation : usize ,
2833}
2934
3035impl Default for SourceRowIndexingState {
3136 fn default ( ) -> Self {
3237 Self {
33- source_version : SourceVersion :: default ( ) ,
34- content_version_fp : None ,
38+ version_state : SourceRowVersionState {
39+ source_version : SourceVersion :: default ( ) ,
40+ content_version_fp : None ,
41+ } ,
3542 processing_sem : Arc :: new ( Semaphore :: new ( 1 ) ) ,
3643 touched_generation : 0 ,
3744 }
@@ -62,13 +69,19 @@ struct LocalSourceRowStateOperator<'a> {
6269 processing_sem : Option < Arc < Semaphore > > ,
6370 processing_sem_permit : Option < OwnedSemaphorePermit > ,
6471 last_source_version : Option < SourceVersion > ,
72+
73+ // `None` means no advance yet.
74+ // `Some(None)` means the state before advance is `None`.
75+ // `Some(Some(version_state))` means the state before advance is `Some(version_state)`.
76+ prev_version_state : Option < Option < SourceRowVersionState > > ,
77+
78+ to_remove_entry_on_success : bool ,
6579}
6680
6781enum RowStateAdvanceOutcome {
6882 Skipped ,
6983 Advanced {
70- prev_source_version : Option < SourceVersion > ,
71- prev_content_version_fp : Option < Vec < u8 > > ,
84+ prev_version_state : Option < SourceRowVersionState > ,
7285 } ,
7386 Noop ,
7487}
@@ -86,6 +99,8 @@ impl<'a> LocalSourceRowStateOperator<'a> {
8699 processing_sem : None ,
87100 processing_sem_permit : None ,
88101 last_source_version : None ,
102+ prev_version_state : None ,
103+ to_remove_entry_on_success : false ,
89104 }
90105 }
91106 async fn advance (
@@ -108,6 +123,7 @@ impl<'a> LocalSourceRowStateOperator<'a> {
108123 if !force_reload
109124 && entry
110125 . get ( )
126+ . version_state
111127 . source_version
112128 . should_skip ( & source_version, Some ( self . update_stats . as_ref ( ) ) )
113129 {
@@ -126,14 +142,20 @@ impl<'a> LocalSourceRowStateOperator<'a> {
126142
127143 let entry_mut = entry. get_mut ( ) ;
128144 let outcome = RowStateAdvanceOutcome :: Advanced {
129- prev_source_version : Some ( std:: mem:: take ( & mut entry_mut. source_version ) ) ,
130- prev_content_version_fp : entry_mut. content_version_fp . take ( ) ,
145+ prev_version_state : Some ( std:: mem:: take ( & mut entry_mut. version_state ) ) ,
131146 } ;
132147 if source_version. kind == row_indexer:: SourceVersionKind :: NonExistence {
133- entry. remove ( ) ;
134- } else {
135- entry_mut. source_version = source_version;
136- entry_mut. content_version_fp = content_version_fp. cloned ( ) ;
148+ self . to_remove_entry_on_success = true ;
149+ }
150+ let prev_version_state = std:: mem:: replace (
151+ & mut entry_mut. version_state ,
152+ SourceRowVersionState {
153+ source_version,
154+ content_version_fp : content_version_fp. cloned ( ) ,
155+ } ,
156+ ) ;
157+ if self . prev_version_state . is_none ( ) {
158+ self . prev_version_state = Some ( Some ( prev_version_state) ) ;
137159 }
138160 ( sem, outcome)
139161 }
@@ -143,18 +165,22 @@ impl<'a> LocalSourceRowStateOperator<'a> {
143165 return Ok ( RowStateAdvanceOutcome :: Skipped ) ;
144166 }
145167 let new_entry = SourceRowIndexingState {
146- source_version,
147- content_version_fp : content_version_fp. cloned ( ) ,
168+ version_state : SourceRowVersionState {
169+ source_version,
170+ content_version_fp : content_version_fp. cloned ( ) ,
171+ } ,
148172 touched_generation,
149173 ..Default :: default ( )
150174 } ;
151175 let sem = new_entry. processing_sem . clone ( ) ;
152176 entry. insert ( new_entry) ;
177+ if self . prev_version_state . is_none ( ) {
178+ self . prev_version_state = Some ( None ) ;
179+ }
153180 (
154181 Some ( sem) ,
155182 RowStateAdvanceOutcome :: Advanced {
156- prev_source_version : None ,
157- prev_content_version_fp : None ,
183+ prev_version_state : None ,
158184 } ,
159185 )
160186 }
@@ -166,6 +192,26 @@ impl<'a> LocalSourceRowStateOperator<'a> {
166192 }
167193 Ok ( outcome)
168194 }
195+
196+ fn commit ( self ) {
197+ if self . to_remove_entry_on_success {
198+ self . indexing_state . lock ( ) . unwrap ( ) . rows . remove ( self . key ) ;
199+ }
200+ }
201+
202+ fn rollback ( self ) {
203+ let Some ( prev_version_state) = self . prev_version_state else {
204+ return ;
205+ } ;
206+ let mut indexing_state = self . indexing_state . lock ( ) . unwrap ( ) ;
207+ if let Some ( prev_version_state) = prev_version_state {
208+ if let Some ( entry) = indexing_state. rows . get_mut ( self . key ) {
209+ entry. version_state = prev_version_state;
210+ }
211+ } else {
212+ indexing_state. rows . remove ( self . key ) ;
213+ }
214+ }
169215}
170216
171217#[ derive( Debug , Clone , Copy , PartialEq , Eq , Default ) ]
@@ -214,12 +260,14 @@ impl SourceIndexingContext {
214260 rows. insert (
215261 source_pk,
216262 SourceRowIndexingState {
217- source_version : SourceVersion :: from_stored (
218- key_metadata. processed_source_ordinal ,
219- & key_metadata. process_logic_fingerprint ,
220- plan. logic_fingerprint ,
221- ) ,
222- content_version_fp : key_metadata. processed_source_fp ,
263+ version_state : SourceRowVersionState {
264+ source_version : SourceVersion :: from_stored (
265+ key_metadata. processed_source_ordinal ,
266+ & key_metadata. process_logic_fingerprint ,
267+ plan. logic_fingerprint ,
268+ ) ,
269+ content_version_fp : key_metadata. processed_source_fp ,
270+ } ,
223271 processing_sem : Arc :: new ( Semaphore :: new ( 1 ) ) ,
224272 touched_generation : scan_generation,
225273 } ,
@@ -271,61 +319,65 @@ impl SourceIndexingContext {
271319 & pool,
272320 ) ?;
273321
322+ let source_data = row_input. data ;
274323 let mut row_state_operator =
275324 LocalSourceRowStateOperator :: new ( & row_input. key , & self . state , & update_stats) ;
276-
277- let source_data = row_input. data ;
278- if let Some ( ordinal) = source_data. ordinal
279- && let Some ( content_version_fp) = & source_data. content_version_fp
280- {
281- let version = SourceVersion :: from_current_with_ordinal ( ordinal) ;
282- match row_state_operator
283- . advance (
284- version,
285- Some ( content_version_fp) ,
286- /*force_reload=*/ mode == UpdateMode :: ReexportTargets ,
287- )
288- . await ?
289- {
290- RowStateAdvanceOutcome :: Skipped => {
291- return anyhow:: Ok ( ( ) ) ;
292- }
293- RowStateAdvanceOutcome :: Advanced {
294- prev_source_version : Some ( prev_source_version) ,
295- prev_content_version_fp : Some ( prev_content_version_fp) ,
296- } => {
297- // Fast path optimization: may collapse the row based on source version fingerprint.
298- // Still need to update the tracking table as the processed ordinal advanced.
299- if mode == UpdateMode :: Normal
325+ let result = {
326+ let row_state_operator = & mut row_state_operator;
327+ let row_key = & row_input. key ;
328+ async move {
329+ if let Some ( ordinal) = source_data. ordinal
330+ && let Some ( content_version_fp) = & source_data. content_version_fp
331+ {
332+ let version = SourceVersion :: from_current_with_ordinal ( ordinal) ;
333+ match row_state_operator
334+ . advance (
335+ version,
336+ Some ( content_version_fp) ,
337+ /*force_reload=*/ mode == UpdateMode :: ReexportTargets ,
338+ )
339+ . await ?
340+ {
341+ RowStateAdvanceOutcome :: Skipped => {
342+ return anyhow:: Ok ( ( ) ) ;
343+ }
344+ RowStateAdvanceOutcome :: Advanced {
345+ prev_version_state : Some ( prev_version_state) ,
346+ } => {
347+ // Fast path optimization: may collapse the row based on source version fingerprint.
348+ // Still need to update the tracking table as the processed ordinal advanced.
349+ if let Some ( prev_content_version_fp) =
350+ & prev_version_state. content_version_fp
351+ && mode == UpdateMode :: Normal
300352 && row_indexer
301353 . try_collapse (
302354 & version,
303355 content_version_fp. as_slice ( ) ,
304- & prev_source_version ,
356+ & prev_version_state . source_version ,
305357 ContentHashBasedCollapsingBaseline :: ProcessedSourceFingerprint (
306- & prev_content_version_fp,
358+ prev_content_version_fp,
307359 ) ,
308360 )
309361 . await ?
310362 . is_some ( )
311363 {
312364 return Ok ( ( ) ) ;
313365 }
366+ }
367+ _ => { }
368+ }
314369 }
315- _ => { }
316- }
317- }
318370
319- let ( ordinal, content_version_fp, value) =
320- match ( source_data. ordinal , source_data. value ) {
321- ( Some ( ordinal) , Some ( value) ) => {
322- ( ordinal, source_data. content_version_fp , value)
323- }
324- _ => {
325- let data = import_op
371+ let ( ordinal, content_version_fp, value) =
372+ match ( source_data. ordinal , source_data. value ) {
373+ ( Some ( ordinal) , Some ( value) ) => {
374+ ( ordinal, source_data. content_version_fp , value)
375+ }
376+ _ => {
377+ let data = import_op
326378 . executor
327379 . get_value (
328- & row_input . key ,
380+ row_key ,
329381 row_input. key_aux_info . as_ref ( ) . ok_or_else ( || {
330382 anyhow:: anyhow!(
331383 "`key_aux_info` must be provided when there's no `source_data`"
@@ -338,37 +390,47 @@ impl SourceIndexingContext {
338390 } ,
339391 )
340392 . await ?;
341- (
342- data. ordinal
343- . ok_or_else ( || anyhow:: anyhow!( "ordinal is not available" ) ) ?,
344- data. content_version_fp ,
345- data. value
346- . ok_or_else ( || anyhow:: anyhow!( "value is not available" ) ) ?,
393+ (
394+ data. ordinal . ok_or_else ( || {
395+ anyhow:: anyhow!( "ordinal is not available" )
396+ } ) ?,
397+ data. content_version_fp ,
398+ data. value
399+ . ok_or_else ( || anyhow:: anyhow!( "value is not available" ) ) ?,
400+ )
401+ }
402+ } ;
403+
404+ let source_version = SourceVersion :: from_current_data ( ordinal, & value) ;
405+ if let RowStateAdvanceOutcome :: Skipped = row_state_operator
406+ . advance (
407+ source_version,
408+ content_version_fp. as_ref ( ) ,
409+ /*force_reload=*/ mode == UpdateMode :: ReexportTargets ,
347410 )
411+ . await ?
412+ {
413+ return Ok ( ( ) ) ;
348414 }
349- } ;
350-
351- let source_version = SourceVersion :: from_current_data ( ordinal, & value) ;
352- if let RowStateAdvanceOutcome :: Skipped = row_state_operator
353- . advance (
354- source_version,
355- content_version_fp. as_ref ( ) ,
356- /*force_reload=*/ mode == UpdateMode :: ReexportTargets ,
357- )
358- . await ?
359- {
360- return Ok ( ( ) ) ;
361- }
362415
363- let result = row_indexer
364- . update_source_row ( & source_version, value, content_version_fp. clone ( ) )
365- . await ?;
366- if let SkippedOr :: Skipped ( version, fp) = result {
367- row_state_operator
368- . advance ( version, fp. as_ref ( ) , /*force_reload=*/ false )
369- . await ?;
416+ let result = row_indexer
417+ . update_source_row ( & source_version, value, content_version_fp. clone ( ) )
418+ . await ?;
419+ if let SkippedOr :: Skipped ( version, fp) = result {
420+ row_state_operator
421+ . advance ( version, fp. as_ref ( ) , /*force_reload=*/ false )
422+ . await ?;
423+ }
424+ Ok ( ( ) )
425+ }
426+ }
427+ . await ;
428+ if result. is_ok ( ) {
429+ row_state_operator. commit ( ) ;
430+ } else {
431+ row_state_operator. rollback ( ) ;
370432 }
371- Ok ( ( ) )
433+ result
372434 } ;
373435 let process_and_ack = async {
374436 process. await ?;
@@ -481,6 +543,7 @@ impl SourceIndexingContext {
481543 row_state. touched_generation = scan_generation;
482544 if update_options. mode == UpdateMode :: Normal
483545 && row_state
546+ . version_state
484547 . source_version
485548 . should_skip ( & source_version, Some ( update_stats. as_ref ( ) ) )
486549 {
@@ -518,7 +581,8 @@ impl SourceIndexingContext {
518581 let state = self . state . lock ( ) . unwrap ( ) ;
519582 for ( key, row_state) in state. rows . iter ( ) {
520583 if row_state. touched_generation < scan_generation {
521- deleted_key_versions. push ( ( key. clone ( ) , row_state. source_version . ordinal ) ) ;
584+ deleted_key_versions
585+ . push ( ( key. clone ( ) , row_state. version_state . source_version . ordinal ) ) ;
522586 }
523587 }
524588 deleted_key_versions
0 commit comments