@@ -166,7 +166,8 @@ impl<'a> LocalSourceRowStateOperator<'a> {
166166 }
167167}
168168
169- pub struct ProcessSourceKeyInput {
169+ pub struct ProcessSourceRowInput {
170+ pub key : value:: FullKeyValue ,
170171 /// `key_aux_info` is not available for deletions. It must be provided if `data.value` is `None`.
171172 pub key_aux_info : Option < serde_json:: Value > ,
172173 pub data : interface:: PartialSourceRowData ,
@@ -224,17 +225,16 @@ impl SourceIndexingContext {
224225 } )
225226 }
226227
227- pub async fn process_source_key <
228+ pub async fn process_source_row <
228229 AckFut : Future < Output = Result < ( ) > > + Send + ' static ,
229230 AckFn : FnOnce ( ) -> AckFut ,
230231 > (
231232 self : Arc < Self > ,
232- key : value :: FullKeyValue ,
233+ row_input : ProcessSourceRowInput ,
233234 update_stats : Arc < stats:: UpdateStats > ,
234235 _concur_permit : concur_control:: CombinedConcurrencyControllerPermit ,
235236 ack_fn : Option < AckFn > ,
236237 pool : PgPool ,
237- inputs : ProcessSourceKeyInput ,
238238 ) {
239239 let process = async {
240240 let plan = self . flow . get_execution_plan ( ) . await ?;
@@ -245,7 +245,7 @@ impl SourceIndexingContext {
245245 plan : & plan,
246246 import_op,
247247 schema,
248- key : & key,
248+ key : & row_input . key ,
249249 import_op_idx : self . source_idx ,
250250 } ;
251251 let mut row_indexer = row_indexer:: RowIndexer :: new (
@@ -256,9 +256,9 @@ impl SourceIndexingContext {
256256 ) ?;
257257
258258 let mut row_state_operator =
259- LocalSourceRowStateOperator :: new ( & key, & self . state , & update_stats) ;
259+ LocalSourceRowStateOperator :: new ( & row_input . key , & self . state , & update_stats) ;
260260
261- let source_data = inputs . data ;
261+ let source_data = row_input . data ;
262262 if let Some ( ordinal) = source_data. ordinal
263263 && let Some ( content_version_fp) = & source_data. content_version_fp
264264 {
@@ -295,22 +295,22 @@ impl SourceIndexingContext {
295295 }
296296 }
297297
298- let ( ordinal, value , content_version_fp ) =
298+ let ( ordinal, content_version_fp , value ) =
299299 match ( source_data. ordinal , source_data. value ) {
300300 ( Some ( ordinal) , Some ( value) ) => {
301- ( ordinal, value , source_data. content_version_fp )
301+ ( ordinal, source_data. content_version_fp , value )
302302 }
303303 _ => {
304304 let data = import_op
305305 . executor
306306 . get_value (
307- & key,
308- inputs . key_aux_info . as_ref ( ) . ok_or_else ( || {
307+ & row_input . key ,
308+ row_input . key_aux_info . as_ref ( ) . ok_or_else ( || {
309309 anyhow:: anyhow!(
310310 "`key_aux_info` must be provided when there's no `source_data`"
311311 )
312312 } ) ?,
313- & interface:: SourceExecutorGetOptions {
313+ & interface:: SourceExecutorReadOptions {
314314 include_value : true ,
315315 include_ordinal : true ,
316316 include_content_version_fp : true ,
@@ -320,9 +320,9 @@ impl SourceIndexingContext {
320320 (
321321 data. ordinal
322322 . ok_or_else ( || anyhow:: anyhow!( "ordinal is not available" ) ) ?,
323+ data. content_version_fp ,
323324 data. value
324325 . ok_or_else ( || anyhow:: anyhow!( "value is not available" ) ) ?,
325- data. content_version_fp ,
326326 )
327327 }
328328 } ;
@@ -356,7 +356,8 @@ impl SourceIndexingContext {
356356 "{:?}" ,
357357 e. context( format!(
358358 "Error in processing row from source `{source}` with key: {key}" ,
359- source = self . flow. flow_instance. import_ops[ self . source_idx] . name
359+ source = self . flow. flow_instance. import_ops[ self . source_idx] . name,
360+ key = row_input. key,
360361 ) )
361362 ) ;
362363 }
@@ -366,6 +367,7 @@ impl SourceIndexingContext {
366367 self : & Arc < Self > ,
367368 pool : & PgPool ,
368369 update_stats : & Arc < stats:: UpdateStats > ,
370+ expect_little_diff : bool ,
369371 ) -> Result < ( ) > {
370372 let pending_update_fut = {
371373 let mut pending_update = self . pending_update . lock ( ) . unwrap ( ) ;
@@ -382,7 +384,8 @@ impl SourceIndexingContext {
382384 let mut pending_update = slf. pending_update . lock ( ) . unwrap ( ) ;
383385 * pending_update = None ;
384386 }
385- slf. update_once ( & pool, & update_stats) . await ?;
387+ slf. update_once ( & pool, & update_stats, expect_little_diff)
388+ . await ?;
386389 }
387390 anyhow:: Ok ( ( ) )
388391 } ) ;
@@ -405,24 +408,26 @@ impl SourceIndexingContext {
405408 self : & Arc < Self > ,
406409 pool : & PgPool ,
407410 update_stats : & Arc < stats:: UpdateStats > ,
411+ expect_little_diff : bool ,
408412 ) -> Result < ( ) > {
409413 let plan = self . flow . get_execution_plan ( ) . await ?;
410414 let import_op = & plan. import_ops [ self . source_idx ] ;
411- let rows_stream = import_op
412- . executor
413- . list ( & interface:: SourceExecutorListOptions {
414- include_ordinal : true ,
415- include_content_version_fp : true ,
416- } )
417- . await ?;
415+ let read_options = interface:: SourceExecutorReadOptions {
416+ include_ordinal : true ,
417+ include_content_version_fp : true ,
418+ // When only a little diff is expected and the source provides ordinal, we don't fetch values during `list()` by default,
419+ // as there's a high chance that we don't need the values at all
420+ include_value : !( expect_little_diff && import_op. executor . provides_ordinal ( ) ) ,
421+ } ;
422+ let rows_stream = import_op. executor . list ( & read_options) . await ?;
418423 self . update_with_stream ( import_op, rows_stream, pool, update_stats)
419424 . await
420425 }
421426
422427 async fn update_with_stream (
423428 self : & Arc < Self > ,
424429 import_op : & plan:: AnalyzedImportOp ,
425- mut rows_stream : BoxStream < ' _ , Result < Vec < interface:: PartialSourceRowMetadata > > > ,
430+ mut rows_stream : BoxStream < ' _ , Result < Vec < interface:: PartialSourceRow > > > ,
426431 pool : & PgPool ,
427432 update_stats : & Arc < stats:: UpdateStats > ,
428433 ) -> Result < ( ) > {
@@ -435,7 +440,8 @@ impl SourceIndexingContext {
435440 while let Some ( row) = rows_stream. next ( ) . await {
436441 for row in row? {
437442 let source_version = SourceVersion :: from_current_with_ordinal (
438- row. ordinal
443+ row. data
444+ . ordinal
439445 . ok_or_else ( || anyhow:: anyhow!( "ordinal is not available" ) ) ?,
440446 ) ;
441447 {
@@ -454,20 +460,16 @@ impl SourceIndexingContext {
454460 . concurrency_controller
455461 . acquire ( concur_control:: BYTES_UNKNOWN_YET )
456462 . await ?;
457- join_set. spawn ( self . clone ( ) . process_source_key (
458- row. key ,
463+ join_set. spawn ( self . clone ( ) . process_source_row (
464+ ProcessSourceRowInput {
465+ key : row. key ,
466+ key_aux_info : Some ( row. key_aux_info ) ,
467+ data : row. data ,
468+ } ,
459469 update_stats. clone ( ) ,
460470 concur_permit,
461471 NO_ACK ,
462472 pool. clone ( ) ,
463- ProcessSourceKeyInput {
464- key_aux_info : Some ( row. key_aux_info ) ,
465- data : interface:: PartialSourceRowData {
466- value : None ,
467- ordinal : Some ( source_version. ordinal ) ,
468- content_version_fp : row. content_version_fp ,
469- } ,
470- } ,
471473 ) ) ;
472474 }
473475 }
@@ -491,20 +493,20 @@ impl SourceIndexingContext {
491493 } ;
492494 for ( key, source_ordinal) in deleted_key_versions {
493495 let concur_permit = import_op. concurrency_controller . acquire ( Some ( || 0 ) ) . await ?;
494- join_set. spawn ( self . clone ( ) . process_source_key (
495- key,
496- update_stats. clone ( ) ,
497- concur_permit,
498- NO_ACK ,
499- pool. clone ( ) ,
500- ProcessSourceKeyInput {
496+ join_set. spawn ( self . clone ( ) . process_source_row (
497+ ProcessSourceRowInput {
498+ key,
501499 key_aux_info : None ,
502500 data : interface:: PartialSourceRowData {
503- value : Some ( interface:: SourceValue :: NonExistence ) ,
504501 ordinal : Some ( source_ordinal) ,
505502 content_version_fp : None ,
503+ value : Some ( interface:: SourceValue :: NonExistence ) ,
506504 } ,
507505 } ,
506+ update_stats. clone ( ) ,
507+ concur_permit,
508+ NO_ACK ,
509+ pool. clone ( ) ,
508510 ) ) ;
509511 }
510512 while let Some ( result) = join_set. join_next ( ) . await {
0 commit comments