diff --git a/src/execution/dumper.rs b/src/execution/dumper.rs index 93f86b93b..5e9853cc3 100644 --- a/src/execution/dumper.rs +++ b/src/execution/dumper.rs @@ -12,7 +12,7 @@ use super::memoization::EvaluationMemoryOptions; use super::row_indexer; use crate::base::{schema, value}; use crate::builder::plan::{AnalyzedImportOp, ExecutionPlan}; -use crate::ops::interface::SourceExecutorListOptions; +use crate::ops::interface::SourceExecutorReadOptions; use crate::utils::yaml_ser::YamlSerializer; #[derive(Debug, Clone, Deserialize)] @@ -193,9 +193,10 @@ impl<'a> Dumper<'a> { let mut rows_stream = import_op .executor - .list(&SourceExecutorListOptions { + .list(&SourceExecutorReadOptions { include_ordinal: false, include_content_version_fp: false, + include_value: false, }) .await?; while let Some(rows) = rows_stream.next().await { diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs index 354586393..f0ea402ff 100644 --- a/src/execution/indexing_status.rs +++ b/src/execution/indexing_status.rs @@ -38,7 +38,7 @@ pub async fn get_source_row_indexing_status( let current_fut = src_eval_ctx.import_op.executor.get_value( src_eval_ctx.key, key_aux_info, - &interface::SourceExecutorGetOptions { + &interface::SourceExecutorReadOptions { include_value: false, include_ordinal: true, include_content_version_fp: false, diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 13e3ff7d6..e306e7dc7 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -1,5 +1,5 @@ use crate::{ - execution::{source_indexer::ProcessSourceKeyInput, stats::UpdateStats}, + execution::{source_indexer::ProcessSourceRowInput, stats::UpdateStats}, prelude::*, }; @@ -192,18 +192,18 @@ impl SourceUpdateTask { .concurrency_controller .acquire(concur_control::BYTES_UNKNOWN_YET) .await?; - tokio::spawn(source_context.clone().process_source_key( - change.key, + tokio::spawn(source_context.clone().process_source_row( + ProcessSourceRowInput { + key: change.key, + key_aux_info: Some(change.key_aux_info), + data: change.data, + }, update_stats.clone(), concur_permit, Some(move || async move { SharedAckFn::ack(&shared_ack_fn).await }), pool.clone(), - ProcessSourceKeyInput { - key_aux_info: Some(change.key_aux_info), - data: change.data, - }, )); } } @@ -242,7 +242,9 @@ impl SourceUpdateTask { let live_mode = self.options.live_mode; async move { let update_stats = Arc::new(stats::UpdateStats::default()); - source_context.update(&pool, &update_stats).await?; + source_context + .update(&pool, &update_stats, /*expect_little_diff=*/ false) + .await?; if update_stats.has_any_change() { status_tx.send_modify(|update| { update.source_updates_num[source_idx] += 1; @@ -260,7 +262,9 @@ impl SourceUpdateTask { interval.tick().await; let update_stats = Arc::new(stats::UpdateStats::default()); - source_context.update(&pool, &update_stats).await?; + source_context + .update(&pool, &update_stats, /*expect_little_diff=*/ true) + .await?; if update_stats.has_any_change() { status_tx.send_modify(|update| { update.source_updates_num[source_idx] += 1; diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index eeb7edf75..eef1d4b21 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -16,7 +16,7 @@ use super::stats; use crate::base::value::{self, FieldValues, KeyValue}; use crate::builder::plan::*; use crate::ops::interface::{ - ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorGetOptions, + ExportTargetMutation, ExportTargetUpsertEntry, Ordinal, SourceExecutorReadOptions, }; use crate::utils::db::WriteAction; use crate::utils::fingerprint::{Fingerprint, Fingerprinter}; @@ -841,7 +841,7 @@ pub async fn evaluate_source_entry_with_memory( .get_value( src_eval_ctx.key, key_aux_info, - &SourceExecutorGetOptions { + &SourceExecutorReadOptions { include_value: true, include_ordinal: false, include_content_version_fp: false, diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index e2ad2b1fd..3cf50e5d9 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -166,7 +166,8 @@ impl<'a> LocalSourceRowStateOperator<'a> { } } -pub struct ProcessSourceKeyInput { +pub struct ProcessSourceRowInput { + pub key: value::FullKeyValue, /// `key_aux_info` is not available for deletions. It must be provided if `data.value` is `None`. pub key_aux_info: Option, pub data: interface::PartialSourceRowData, @@ -224,17 +225,16 @@ impl SourceIndexingContext { }) } - pub async fn process_source_key< + pub async fn process_source_row< AckFut: Future> + Send + 'static, AckFn: FnOnce() -> AckFut, >( self: Arc, - key: value::FullKeyValue, + row_input: ProcessSourceRowInput, update_stats: Arc, _concur_permit: concur_control::CombinedConcurrencyControllerPermit, ack_fn: Option, pool: PgPool, - inputs: ProcessSourceKeyInput, ) { let process = async { let plan = self.flow.get_execution_plan().await?; @@ -245,7 +245,7 @@ impl SourceIndexingContext { plan: &plan, import_op, schema, - key: &key, + key: &row_input.key, import_op_idx: self.source_idx, }; let mut row_indexer = row_indexer::RowIndexer::new( @@ -256,9 +256,9 @@ impl SourceIndexingContext { )?; let mut row_state_operator = - LocalSourceRowStateOperator::new(&key, &self.state, &update_stats); + LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats); - let source_data = inputs.data; + let source_data = row_input.data; if let Some(ordinal) = source_data.ordinal && let Some(content_version_fp) = &source_data.content_version_fp { @@ -295,22 +295,22 @@ impl SourceIndexingContext { } } - let (ordinal, value, content_version_fp) = + let (ordinal, content_version_fp, value) = match (source_data.ordinal, source_data.value) { (Some(ordinal), Some(value)) => { - (ordinal, value, source_data.content_version_fp) + (ordinal, source_data.content_version_fp, value) } _ => { let data = import_op .executor .get_value( - &key, - inputs.key_aux_info.as_ref().ok_or_else(|| { + &row_input.key, + row_input.key_aux_info.as_ref().ok_or_else(|| { anyhow::anyhow!( "`key_aux_info` must be provided when there's no `source_data`" ) })?, - &interface::SourceExecutorGetOptions { + &interface::SourceExecutorReadOptions { include_value: true, include_ordinal: true, include_content_version_fp: true, @@ -320,9 +320,9 @@ impl SourceIndexingContext { ( data.ordinal .ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?, + data.content_version_fp, data.value .ok_or_else(|| anyhow::anyhow!("value is not available"))?, - data.content_version_fp, ) } }; @@ -356,7 +356,8 @@ impl SourceIndexingContext { "{:?}", e.context(format!( "Error in processing row from source `{source}` with key: {key}", - source = self.flow.flow_instance.import_ops[self.source_idx].name + source = self.flow.flow_instance.import_ops[self.source_idx].name, + key = row_input.key, )) ); } @@ -366,6 +367,7 @@ impl SourceIndexingContext { self: &Arc, pool: &PgPool, update_stats: &Arc, + expect_little_diff: bool, ) -> Result<()> { let pending_update_fut = { let mut pending_update = self.pending_update.lock().unwrap(); @@ -382,7 +384,8 @@ impl SourceIndexingContext { let mut pending_update = slf.pending_update.lock().unwrap(); *pending_update = None; } - slf.update_once(&pool, &update_stats).await?; + slf.update_once(&pool, &update_stats, expect_little_diff) + .await?; } anyhow::Ok(()) }); @@ -405,16 +408,18 @@ impl SourceIndexingContext { self: &Arc, pool: &PgPool, update_stats: &Arc, + expect_little_diff: bool, ) -> Result<()> { let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; - let rows_stream = import_op - .executor - .list(&interface::SourceExecutorListOptions { - include_ordinal: true, - include_content_version_fp: true, - }) - .await?; + let read_options = interface::SourceExecutorReadOptions { + include_ordinal: true, + include_content_version_fp: true, + // When only a little diff is expected and the source provides ordinal, we don't fetch values during `list()` by default, + // as there's a high chance that we don't need the values at all + include_value: !(expect_little_diff && import_op.executor.provides_ordinal()), + }; + let rows_stream = import_op.executor.list(&read_options).await?; self.update_with_stream(import_op, rows_stream, pool, update_stats) .await } @@ -422,7 +427,7 @@ impl SourceIndexingContext { async fn update_with_stream( self: &Arc, import_op: &plan::AnalyzedImportOp, - mut rows_stream: BoxStream<'_, Result>>, + mut rows_stream: BoxStream<'_, Result>>, pool: &PgPool, update_stats: &Arc, ) -> Result<()> { @@ -435,7 +440,8 @@ impl SourceIndexingContext { while let Some(row) = rows_stream.next().await { for row in row? { let source_version = SourceVersion::from_current_with_ordinal( - row.ordinal + row.data + .ordinal .ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?, ); { @@ -454,20 +460,16 @@ impl SourceIndexingContext { .concurrency_controller .acquire(concur_control::BYTES_UNKNOWN_YET) .await?; - join_set.spawn(self.clone().process_source_key( - row.key, + join_set.spawn(self.clone().process_source_row( + ProcessSourceRowInput { + key: row.key, + key_aux_info: Some(row.key_aux_info), + data: row.data, + }, update_stats.clone(), concur_permit, NO_ACK, pool.clone(), - ProcessSourceKeyInput { - key_aux_info: Some(row.key_aux_info), - data: interface::PartialSourceRowData { - value: None, - ordinal: Some(source_version.ordinal), - content_version_fp: row.content_version_fp, - }, - }, )); } } @@ -491,20 +493,20 @@ impl SourceIndexingContext { }; for (key, source_ordinal) in deleted_key_versions { let concur_permit = import_op.concurrency_controller.acquire(Some(|| 0)).await?; - join_set.spawn(self.clone().process_source_key( - key, - update_stats.clone(), - concur_permit, - NO_ACK, - pool.clone(), - ProcessSourceKeyInput { + join_set.spawn(self.clone().process_source_row( + ProcessSourceRowInput { + key, key_aux_info: None, data: interface::PartialSourceRowData { - value: Some(interface::SourceValue::NonExistence), ordinal: Some(source_ordinal), content_version_fp: None, + value: Some(interface::SourceValue::NonExistence), }, }, + update_stats.clone(), + concur_permit, + NO_ACK, + pool.clone(), )); } while let Some(result) = join_set.join_next().await { diff --git a/src/ops/interface.rs b/src/ops/interface.rs index f592247b5..ab4478770 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -48,13 +48,14 @@ impl TryFrom> for Ordinal { } } -pub struct PartialSourceRowMetadata { - pub key: FullKeyValue, - /// Auxiliary information for the source row, to be used when reading the content. - /// e.g. it can be used to uniquely identify version of the row. - /// Use serde_json::Value::Null to represent no auxiliary information. - pub key_aux_info: serde_json::Value, +#[derive(Debug)] +pub enum SourceValue { + Existence(FieldValues), + NonExistence, +} +#[derive(Debug, Default)] +pub struct PartialSourceRowData { pub ordinal: Option, /// A content version fingerprint can be anything that changes when the content of the row changes. @@ -64,12 +65,18 @@ pub struct PartialSourceRowMetadata { /// It's optional. The source shouldn't use generic way to compute it, e.g. computing a hash of the content. /// The framework will do so. If there's no fast way to get it from the source, leave it as `None`. pub content_version_fp: Option>, + + pub value: Option, } -#[derive(Debug)] -pub enum SourceValue { - Existence(FieldValues), - NonExistence, +pub struct PartialSourceRow { + pub key: FullKeyValue, + /// Auxiliary information for the source row, to be used when reading the content. + /// e.g. it can be used to uniquely identify version of the row. + /// Use serde_json::Value::Null to represent no auxiliary information. + pub key_aux_info: serde_json::Value, + + pub data: PartialSourceRowData, } impl SourceValue { @@ -108,23 +115,22 @@ pub struct SourceChangeMessage { } #[derive(Debug, Default)] -pub struct SourceExecutorListOptions { +pub struct SourceExecutorReadOptions { + /// When set to true, the implementation must return a non-None `ordinal`. pub include_ordinal: bool, - pub include_content_version_fp: bool, -} -#[derive(Debug, Default)] -pub struct SourceExecutorGetOptions { - pub include_ordinal: bool, - pub include_value: bool, + /// When set to true, the implementation has the discretion to decide whether or not to return a non-None `content_version_fp`. + /// The guideline is to return it only if it's very efficient to get it. + /// If it's returned in `list()`, it must be returned in `get_value()`. pub include_content_version_fp: bool, -} -#[derive(Debug, Default)] -pub struct PartialSourceRowData { - pub value: Option, - pub ordinal: Option, - pub content_version_fp: Option>, + /// For get calls, when set to true, the implementation must return a non-None `value`. + /// + /// For list calls, when set to true, the implementation has the discretion to decide whether or not to include it. + /// The guideline is to only include it if a single "list() with content" call is significantly more efficient than "list() without content + series of get_value()" calls. + /// + /// Even if `list()` already returns `value` when it's true, `get_value()` must still return `value` when it's true. + pub include_value: bool, } #[async_trait] @@ -132,15 +138,15 @@ pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. async fn list( &self, - options: &SourceExecutorListOptions, - ) -> Result>>>; + options: &SourceExecutorReadOptions, + ) -> Result>>>; // Get the value for the given key. async fn get_value( &self, key: &FullKeyValue, key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result; async fn change_stream( @@ -148,6 +154,8 @@ pub trait SourceExecutor: Send + Sync { ) -> Result>>> { Ok(None) } + + fn provides_ordinal(&self) -> bool; } #[async_trait] diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index cc132c2db..466d14557 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -63,8 +63,8 @@ fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal { impl SourceExecutor for Executor { async fn list( &self, - _options: &SourceExecutorListOptions, - ) -> Result>>> { + _options: &SourceExecutorReadOptions, + ) -> Result>>> { let stream = try_stream! { let mut continuation_token = None; loop { @@ -85,11 +85,14 @@ impl SourceExecutor for Executor { // Only include files (not folders) if key.ends_with('/') { continue; } if self.pattern_matcher.is_file_included(key) { - batch.push(PartialSourceRowMetadata { + batch.push(PartialSourceRow { key: FullKeyValue::from_single_part(key.to_string()), key_aux_info: serde_json::Value::Null, - ordinal: obj.last_modified().map(datetime_to_ordinal), - content_version_fp: None, + data: PartialSourceRowData { + ordinal: obj.last_modified().map(datetime_to_ordinal), + content_version_fp: None, + value: None, + }, }); } } @@ -112,7 +115,7 @@ impl SourceExecutor for Executor { &self, key: &FullKeyValue, _key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result { let key_str = key.single_part()?.str_value()?; if !self.pattern_matcher.is_file_included(key_str) { @@ -185,6 +188,10 @@ impl SourceExecutor for Executor { }; Ok(Some(stream.boxed())) } + + fn provides_ordinal(&self) -> bool { + true + } } #[derive(Debug, Deserialize)] diff --git a/src/ops/sources/azure_blob.rs b/src/ops/sources/azure_blob.rs index c6ee5ebe7..1ab4e9200 100644 --- a/src/ops/sources/azure_blob.rs +++ b/src/ops/sources/azure_blob.rs @@ -42,8 +42,8 @@ fn datetime_to_ordinal(dt: &time::OffsetDateTime) -> Ordinal { impl SourceExecutor for Executor { async fn list( &self, - _options: &SourceExecutorListOptions, - ) -> Result>>> { + _options: &SourceExecutorReadOptions, + ) -> Result>>> { let stream = try_stream! { let mut continuation_token: Option = None; loop { @@ -75,11 +75,14 @@ impl SourceExecutor for Executor { if self.pattern_matcher.is_file_included(key) { let ordinal = Some(datetime_to_ordinal(&blob.properties.last_modified)); - batch.push(PartialSourceRowMetadata { + batch.push(PartialSourceRow { key: FullKeyValue::from_single_part(key.clone()), key_aux_info: serde_json::Value::Null, - ordinal, - content_version_fp: None, + data: PartialSourceRowData { + ordinal, + content_version_fp: None, + value: None, + }, }); } } @@ -101,7 +104,7 @@ impl SourceExecutor for Executor { &self, key: &FullKeyValue, _key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result { let key_str = key.single_part()?.str_value()?; if !self.pattern_matcher.is_file_included(key_str) { @@ -163,6 +166,10 @@ impl SourceExecutor for Executor { // Azure Blob Storage doesn't have built-in change notifications like S3+SQS Ok(None) } + + fn provides_ordinal(&self) -> bool { + true + } } pub struct Factory; diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index 28c2cbb09..b96f5515c 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -115,7 +115,7 @@ impl Executor { file: File, new_folder_ids: &mut Vec>, seen_ids: &mut HashSet>, - ) -> Result> { + ) -> Result> { if file.trashed == Some(true) { return Ok(None); } @@ -133,11 +133,14 @@ impl Executor { new_folder_ids.push(id); None } else if is_supported_file_type(&mime_type) { - Some(PartialSourceRowMetadata { + Some(PartialSourceRow { key: FullKeyValue::from_single_part(id), key_aux_info: serde_json::Value::Null, - ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, - content_version_fp: None, + data: PartialSourceRowData { + ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, + content_version_fp: None, + value: None, + }, }) } else { None @@ -290,8 +293,8 @@ fn optional_modified_time(include_ordinal: bool) -> &'static str { impl SourceExecutor for Executor { async fn list( &self, - options: &SourceExecutorListOptions, - ) -> Result>>> { + options: &SourceExecutorReadOptions, + ) -> Result>>> { let mut seen_ids = HashSet::new(); let mut folder_ids = self.root_folder_ids.clone(); let fields = format!( @@ -327,7 +330,7 @@ impl SourceExecutor for Executor { &self, key: &FullKeyValue, _key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result { let file_id = key.single_part()?.str_value()?; let fields = format!( @@ -432,6 +435,10 @@ impl SourceExecutor for Executor { }; Ok(Some(stream.boxed())) } + + fn provides_ordinal(&self) -> bool { + true + } } pub struct Factory; diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index 3e12064dd..514386bdb 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -25,8 +25,8 @@ struct Executor { impl SourceExecutor for Executor { async fn list( &self, - options: &SourceExecutorListOptions, - ) -> Result>>> { + options: &SourceExecutorReadOptions, + ) -> Result>>> { let root_component_size = self.root_path.components().count(); let mut dirs = Vec::new(); dirs.push(Cow::Borrowed(&self.root_path)); @@ -54,11 +54,14 @@ impl SourceExecutor for Executor { } else { None }; - yield vec![PartialSourceRowMetadata { + yield vec![PartialSourceRow { key: FullKeyValue::from_single_part(relative_path.to_string()), key_aux_info: serde_json::Value::Null, - ordinal, - content_version_fp: None, + data: PartialSourceRowData { + ordinal, + content_version_fp: None, + value: None, + }, }]; } } @@ -72,7 +75,7 @@ impl SourceExecutor for Executor { &self, key: &FullKeyValue, _key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result { let path = key.single_part()?.str_value()?.as_ref(); if !self.pattern_matcher.is_file_included(path) { @@ -112,6 +115,10 @@ impl SourceExecutor for Executor { content_version_fp: None, }) } + + fn provides_ordinal(&self) -> bool { + true + } } pub struct Factory; diff --git a/src/ops/sources/postgres.rs b/src/ops/sources/postgres.rs index 342998cb4..6fe32275d 100644 --- a/src/ops/sources/postgres.rs +++ b/src/ops/sources/postgres.rs @@ -39,6 +39,80 @@ struct Executor { table_schema: PostgresTableSchema, } +impl Executor { + /// Append value and ordinal columns to the provided columns vector. + /// Returns the optional index of the ordinal column in the final selection. + fn build_selected_columns( + &self, + columns: &mut Vec, + options: &SourceExecutorReadOptions, + ) -> Option { + let base_len = columns.len(); + if options.include_value { + columns.extend( + self.table_schema + .value_columns + .iter() + .map(|col| format!("\"{}\"", col.schema.name)), + ); + } + + if options.include_ordinal { + if let Some(ord_schema) = &self.table_schema.ordinal_field_schema { + if options.include_value { + if let Some(val_idx) = self.table_schema.ordinal_field_idx { + return Some(base_len + val_idx); + } + } + columns.push(format!("\"{}\"", ord_schema.schema.name)); + return Some(columns.len() - 1); + } + } + + None + } + + /// Decode all value columns from a row, starting at the given index offset. + fn decode_row_data( + &self, + row: &sqlx::postgres::PgRow, + options: &SourceExecutorReadOptions, + ordinal_col_index: Option, + value_start_idx: usize, + ) -> Result { + let value = if options.include_value { + let mut fields = Vec::with_capacity(self.table_schema.value_columns.len()); + for (i, info) in self.table_schema.value_columns.iter().enumerate() { + let value = (info.decoder)(row, value_start_idx + i)?; + fields.push(value); + } + Some(SourceValue::Existence(FieldValues { fields })) + } else { + None + }; + + let ordinal = if options.include_ordinal { + if let (Some(idx), Some(ord_schema)) = ( + ordinal_col_index, + self.table_schema.ordinal_field_schema.as_ref(), + ) { + let val = (ord_schema.decoder)(row, idx)?; + Some(value_to_ordinal(&val)) + } else { + Some(Ordinal::unavailable()) + } + } else { + None + }; + + Ok(PartialSourceRowData { + value, + ordinal, + content_version_fp: None, + }) + } +} + /// Map PostgreSQL data types to CocoIndex BasicValueType and a decoder function fn map_postgres_type_to_cocoindex_and_decoder(pg_type: &str) -> (BasicValueType, PgValueDecoder) { match pg_type { @@ -306,32 +380,21 @@ fn value_to_ordinal(value: &Value) -> Ordinal { impl SourceExecutor for Executor { async fn list( &self, - options: &SourceExecutorListOptions, - ) -> Result>>> { + options: &SourceExecutorReadOptions, + ) -> Result>>> { let stream = try_stream! { - // Build query to select primary key columns + // Build selection including PKs (for keys), and optionally values and ordinal let pk_columns: Vec = self .table_schema .primary_key_columns .iter() .map(|col| format!("\"{}\"", col.schema.name)) .collect(); + let pk_count = pk_columns.len(); + let mut select_parts = pk_columns; + let ordinal_col_index = self.build_selected_columns(&mut select_parts, options); - let mut select_parts = pk_columns.clone(); - let mut ordinal_col_index: Option = None; - if options.include_ordinal - && let Some(ord_schema) = &self.table_schema.ordinal_field_schema - { - // Only append ordinal column if present. - select_parts.push(format!("\"{}\"", ord_schema.schema.name)); - ordinal_col_index = Some(select_parts.len() - 1); - } - - let mut query = format!( - "SELECT {} FROM \"{}\"", - select_parts.join(", "), - self.table_name - ); + let mut query = format!("SELECT {} FROM \"{}\"", select_parts.join(", "), self.table_name); // Add ordering by ordinal column if specified if let Some(ord_schema) = &self.table_schema.ordinal_field_schema { @@ -340,38 +403,21 @@ impl SourceExecutor for Executor { let mut rows = sqlx::query(&query).fetch(&self.db_pool); while let Some(row) = rows.try_next().await? { - let parts = self - .table_schema - .primary_key_columns + // Decode key from PKs (selected first) + let parts = self.table_schema.primary_key_columns .iter() .enumerate() .map(|(i, info)| (info.decoder)(&row, i)?.into_key()) .collect::>>()?; let key = FullKeyValue(parts); - // Compute ordinal if requested - let ordinal = if options.include_ordinal { - if let (Some(col_idx), Some(_ord_schema)) = ( - ordinal_col_index, - self.table_schema.ordinal_field_schema.as_ref(), - ) { - let val = match self.table_schema.ordinal_field_idx { - Some(idx) => (self.table_schema.value_columns[idx].decoder)(&row, col_idx)?, - None => (self.table_schema.ordinal_field_schema.as_ref().unwrap().decoder)(&row, col_idx)?, - }; - Some(value_to_ordinal(&val)) - } else { - Some(Ordinal::unavailable()) - } - } else { - None - }; + // Decode value and ordinal + let data = self.decode_row_data(&row, options, ordinal_col_index, pk_count)?; - yield vec![PartialSourceRowMetadata { + yield vec![PartialSourceRow { key, key_aux_info: serde_json::Value::Null, - ordinal, - content_version_fp: None, + data, }]; } }; @@ -382,29 +428,11 @@ impl SourceExecutor for Executor { &self, key: &FullKeyValue, _key_aux_info: &serde_json::Value, - options: &SourceExecutorGetOptions, + options: &SourceExecutorReadOptions, ) -> Result { let mut qb = sqlx::QueryBuilder::new("SELECT "); let mut selected_columns: Vec = Vec::new(); - - if options.include_value { - selected_columns.extend( - self.table_schema - .value_columns - .iter() - .map(|col| format!("\"{}\"", col.schema.name)), - ); - } - - if options.include_ordinal { - if let Some(ord_schema) = &self.table_schema.ordinal_field_schema { - // Append ordinal column if not already provided by included value columns, - // or when value columns are not selected at all - if self.table_schema.ordinal_field_idx.is_none() || !options.include_value { - selected_columns.push(format!("\"{}\"", ord_schema.schema.name)); - } - } - } + let ordinal_col_index = self.build_selected_columns(&mut selected_columns, options); if selected_columns.is_empty() { qb.push("1"); @@ -440,50 +468,20 @@ impl SourceExecutor for Executor { } let row_opt = qb.build().fetch_optional(&self.db_pool).await?; - - let value = if options.include_value { - match &row_opt { - Some(row) => { - let mut fields = Vec::with_capacity(self.table_schema.value_columns.len()); - for (i, info) in self.table_schema.value_columns.iter().enumerate() { - let value = (info.decoder)(&row, i)?; - fields.push(value); - } - Some(SourceValue::Existence(FieldValues { fields })) - } - None => Some(SourceValue::NonExistence), - } - } else { - None + let data = match &row_opt { + Some(row) => self.decode_row_data(&row, options, ordinal_col_index, 0)?, + None => PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + content_version_fp: None, + }, }; - let ordinal = if options.include_ordinal { - match (&row_opt, &self.table_schema.ordinal_field_schema) { - (Some(row), Some(ord_schema)) => { - // Determine index without scanning the row metadata. - let col_index = if options.include_value { - match self.table_schema.ordinal_field_idx { - Some(idx) => idx, - None => self.table_schema.value_columns.len(), - } - } else { - // Only ordinal was selected - 0 - }; - let val = (ord_schema.decoder)(&row, col_index)?; - Some(value_to_ordinal(&val)) - } - _ => Some(Ordinal::unavailable()), - } - } else { - None - }; + Ok(data) + } - Ok(PartialSourceRowData { - value, - ordinal, - content_version_fp: None, - }) + fn provides_ordinal(&self) -> bool { + self.table_schema.ordinal_field_schema.is_some() } } diff --git a/src/service/flows.rs b/src/service/flows.rs index dbb87b16e..4e58ee2e4 100644 --- a/src/service/flows.rs +++ b/src/service/flows.rs @@ -2,7 +2,7 @@ use crate::prelude::*; use crate::execution::{evaluator, indexing_status, memoization, row_indexer, stats}; use crate::lib_context::LibContext; -use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorListOptions}; +use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorReadOptions}; use axum::{ Json, extract::{Path, State}, @@ -101,9 +101,10 @@ pub async fn get_keys( let mut rows_stream = import_op .executor - .list(&SourceExecutorListOptions { + .list(&SourceExecutorReadOptions { include_ordinal: false, include_content_version_fp: false, + include_value: false, }) .await?; let mut keys = Vec::new();