diff --git a/src/execution/indexing_status.rs b/src/execution/indexing_status.rs index f6db8c21f..22eaeffca 100644 --- a/src/execution/indexing_status.rs +++ b/src/execution/indexing_status.rs @@ -6,14 +6,14 @@ use futures::try_join; #[derive(Debug, Serialize)] pub struct SourceRowLastProcessedInfo { - pub source_ordinal: Option, + pub source_ordinal: interface::Ordinal, pub processing_time: Option>, pub is_logic_current: bool, } #[derive(Debug, Serialize)] pub struct SourceRowInfo { - pub ordinal: Option, + pub ordinal: interface::Ordinal, } #[derive(Debug, Serialize)] @@ -43,7 +43,7 @@ pub async fn get_source_row_indexing_status( let (last_processed, current) = try_join!(last_processed_fut, current_fut)?; let last_processed = last_processed.map(|l| SourceRowLastProcessedInfo { - source_ordinal: l.processed_source_ordinal.map(interface::Ordinal), + source_ordinal: interface::Ordinal(l.processed_source_ordinal), processing_time: l .process_time_micros .map(chrono::DateTime::::from_timestamp_micros) @@ -51,9 +51,13 @@ pub async fn get_source_row_indexing_status( is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice()) == l.process_logic_fingerprint.as_ref().map(|b| b.as_slice()), }); - let current = current.map(|c| SourceRowInfo { ordinal: c.ordinal }); + let current = SourceRowInfo { + ordinal: current + .ordinal + .ok_or(anyhow::anyhow!("Ordinal is unavailable for the source"))?, + }; Ok(SourceRowIndexingStatus { last_processed, - current, + current: Some(current), }) } diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 3d825db1b..1e6632a4a 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -93,9 +93,12 @@ async fn update_source( async move { let mut change_stream = change_stream; while let Some(change) = change_stream.next().await { - source_context - .process_change(change, &pool, &source_update_stats) - .map(tokio::spawn); + tokio::spawn(source_context.clone().process_source_key( + change.key, + change.data, + source_update_stats.clone(), + pool.clone(), + )); } Ok(()) } diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index afb258bfe..079a21289 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -43,15 +43,15 @@ pub fn extract_primary_key( #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default)] pub enum SourceVersionKind { #[default] - NonExistent, + UnknownLogic, DifferentLogic, CurrentLogic, - Deleted, + NonExistence, } #[derive(Debug, Clone, Default)] pub struct SourceVersion { - pub ordinal: Option, + pub ordinal: Ordinal, pub kind: SourceVersionKind, } @@ -62,7 +62,7 @@ impl SourceVersion { curr_fp: Fingerprint, ) -> Self { Self { - ordinal: stored_ordinal.map(Ordinal), + ordinal: Ordinal(stored_ordinal), kind: match &stored_fp { Some(stored_fp) => { if stored_fp.as_slice() == curr_fp.0.as_slice() { @@ -71,22 +71,26 @@ impl SourceVersion { SourceVersionKind::DifferentLogic } } - None => SourceVersionKind::NonExistent, + None => SourceVersionKind::UnknownLogic, }, } } - pub fn from_current(ordinal: Option) -> Self { + pub fn from_current(ordinal: Ordinal) -> Self { Self { ordinal, kind: SourceVersionKind::CurrentLogic, } } - pub fn for_deletion(&self) -> Self { + pub fn from_current_data(data: &interface::SourceData) -> Self { + let kind = match &data.value { + interface::SourceValue::Existence(_) => SourceVersionKind::CurrentLogic, + interface::SourceValue::NonExistence => SourceVersionKind::NonExistence, + }; Self { - ordinal: self.ordinal, - kind: SourceVersionKind::Deleted, + ordinal: data.ordinal, + kind, } } @@ -95,7 +99,7 @@ impl SourceVersion { target: &SourceVersion, update_stats: Option<&stats::UpdateStats>, ) -> bool { - let should_skip = match (self.ordinal, target.ordinal) { + let should_skip = match (self.ordinal.0, target.ordinal.0) { (Some(orginal), Some(target_ordinal)) => { orginal > target_ordinal || (orginal == target_ordinal && self.kind >= target.kind) } @@ -418,7 +422,7 @@ async fn commit_source_tracking_info( source_id, source_key_json, cleaned_staging_target_keys, - source_version.ordinal.map(|o| o.into()), + source_version.ordinal.into(), logic_fingerprint, precommit_metadata.process_ordinal, process_timestamp.timestamp_micros(), @@ -460,7 +464,7 @@ pub async fn evaluate_source_entry_with_memory( None }; let memory = EvaluationMemory::new(chrono::Utc::now(), stored_info, options); - let source_value = match src_eval_ctx + let source_value = src_eval_ctx .import_op .executor .get_value( @@ -471,19 +475,20 @@ pub async fn evaluate_source_entry_with_memory( }, ) .await? - { - Some(d) => d - .value - .ok_or_else(|| anyhow::anyhow!("value not returned"))?, - None => return Ok(None), + .value + .ok_or_else(|| anyhow::anyhow!("value not returned"))?; + let output = match source_value { + interface::SourceValue::Existence(source_value) => { + Some(evaluate_source_entry(src_eval_ctx, source_value, &memory).await?) + } + interface::SourceValue::NonExistence => None, }; - let output = evaluate_source_entry(src_eval_ctx, source_value, &memory).await?; - Ok(Some(output)) + Ok(output) } pub async fn update_source_row( src_eval_ctx: &SourceRowEvaluationContext<'_>, - source_value: Option, + source_value: interface::SourceValue, source_version: &SourceVersion, pool: &PgPool, update_stats: &stats::UpdateStats, @@ -517,7 +522,7 @@ pub async fn update_source_row( None => Default::default(), }; let (output, stored_mem_info) = match source_value { - Some(source_value) => { + interface::SourceValue::Existence(source_value) => { let evaluation_memory = EvaluationMemory::new( process_time, memoization_info, @@ -530,7 +535,7 @@ pub async fn update_source_row( evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?; (Some(output), evaluation_memory.into_stored()?) } - None => Default::default(), + interface::SourceValue::NonExistence => Default::default(), }; // Phase 2 (precommit): Update with the memoization info and stage target keys. @@ -601,7 +606,7 @@ pub async fn update_source_row( if let Some(existing_version) = existing_version { if output.is_some() { - if source_version.ordinal.is_none() + if !source_version.ordinal.is_available() || source_version.ordinal != existing_version.ordinal { update_stats.num_updates.inc(1); diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 671e0756e..4f646fac0 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -1,4 +1,4 @@ -use crate::{ops::interface::SourceValueChange, prelude::*}; +use crate::prelude::*; use sqlx::PgPool; use std::collections::{hash_map, HashMap}; @@ -7,7 +7,7 @@ use tokio::{sync::Semaphore, task::JoinSet}; use super::{ db_tracking, evaluator::SourceRowEvaluationContext, - row_indexer::{self, SkippedOr, SourceVersion, SourceVersionKind}, + row_indexer::{self, SkippedOr, SourceVersion}, stats, }; struct SourceRowIndexingState { @@ -79,43 +79,47 @@ impl SourceIndexingContext { }) } - async fn process_source_key( + pub async fn process_source_key( self: Arc, key: value::KeyValue, - source_version: SourceVersion, - value: Option, + source_data: Option, update_stats: Arc, - processing_sem: Arc, pool: PgPool, ) { let process = async { - let permit = processing_sem.acquire().await?; let plan = self.flow.get_execution_plan().await?; let import_op = &plan.import_ops[self.source_idx]; - let source_value = if source_version.kind == row_indexer::SourceVersionKind::Deleted { - None - } else if let Some(value) = value { - Some(value) - } else { - // Even if the source version kind is not Deleted, the source value might be gone when polling. - // In this case, we still use the current source version even if it's already stale - actually this version skew - // also happens for update cases and there's no way to keep them always in sync for many sources. - // - // We only need source version <= actual version for value. - import_op + let schema = &self.flow.data_schema; + let source_data = match source_data { + Some(source_data) => source_data, + None => import_op .executor .get_value( &key, &interface::SourceExecutorGetOptions { include_value: true, - include_ordinal: false, + include_ordinal: true, }, ) .await? - .map(|v| v.value) - .flatten() + .try_into()?, }; - let schema = &self.flow.data_schema; + + let source_version = SourceVersion::from_current_data(&source_data); + let processing_sem = { + let mut state = self.state.lock().unwrap(); + let row_state = state.rows.entry(key.clone()).or_default(); + if row_state + .source_version + .should_skip(&source_version, Some(update_stats.as_ref())) + { + return anyhow::Ok(()); + } + row_state.source_version = source_version.clone(); + row_state.processing_sem.clone() + }; + + let permit = processing_sem.acquire().await?; let result = row_indexer::update_source_row( &SourceRowEvaluationContext { plan: &plan, @@ -123,7 +127,7 @@ impl SourceIndexingContext { schema, key: &key, }, - source_value, + source_data.value, &source_version, &pool, &update_stats, @@ -132,7 +136,7 @@ impl SourceIndexingContext { let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result { Some(existing_source_version) - } else if source_version.kind == row_indexer::SourceVersionKind::Deleted { + } else if source_version.kind == row_indexer::SourceVersionKind::NonExistence { Some(source_version) } else { None @@ -148,7 +152,8 @@ impl SourceIndexingContext { .source_version .should_skip(&target_source_version, None) { - if target_source_version.kind == row_indexer::SourceVersionKind::Deleted + if target_source_version.kind + == row_indexer::SourceVersionKind::NonExistence { entry.remove(); } else { @@ -176,36 +181,30 @@ impl SourceIndexingContext { } } + // Expected to be called during scan, which has no value. fn process_source_key_if_newer( self: &Arc, key: value::KeyValue, source_version: SourceVersion, - value: Option, update_stats: &Arc, pool: &PgPool, ) -> Option + Send + 'static> { - let processing_sem = { + { let mut state = self.state.lock().unwrap(); let scan_generation = state.scan_generation; let row_state = state.rows.entry(key.clone()).or_default(); row_state.touched_generation = scan_generation; if row_state .source_version - .should_skip(&source_version, Some(update_stats)) + .should_skip(&source_version, Some(update_stats.as_ref())) { return None; } - row_state.source_version = source_version.clone(); - row_state.processing_sem.clone() - }; - Some(self.clone().process_source_key( - key, - source_version, - value, - update_stats.clone(), - processing_sem, - pool.clone(), - )) + } + Some( + self.clone() + .process_source_key(key, None, update_stats.clone(), pool.clone()), + ) } pub async fn update( @@ -230,8 +229,10 @@ impl SourceIndexingContext { for row in row? { self.process_source_key_if_newer( row.key, - SourceVersion::from_current(row.ordinal), - None, + SourceVersion::from_current( + row.ordinal + .ok_or_else(|| anyhow::anyhow!("ordinal is not available"))?, + ), update_stats, pool, ) @@ -251,22 +252,23 @@ impl SourceIndexingContext { let mut state = self.state.lock().unwrap(); for (key, row_state) in state.rows.iter_mut() { if row_state.touched_generation < scan_generation { - deleted_key_versions.push(( - key.clone(), - row_state.source_version.for_deletion(), - row_state.processing_sem.clone(), - )); + deleted_key_versions.push((key.clone(), row_state.source_version.ordinal)); } } deleted_key_versions }; - for (key, source_version, processing_sem) in deleted_key_versions { + for (key, source_ordinal) in deleted_key_versions { + // If the source ordinal is unavailable, call without source ordinal so that another polling will be triggered to avoid out-of-order. + let source_data = source_ordinal + .is_available() + .then(|| interface::SourceData { + value: interface::SourceValue::NonExistence, + ordinal: source_ordinal, + }); join_set.spawn(self.clone().process_source_key( key, - source_version, - None, + source_data, update_stats.clone(), - processing_sem, pool.clone(), )); } @@ -280,26 +282,4 @@ impl SourceIndexingContext { Ok(()) } - - pub fn process_change( - self: &Arc, - change: interface::SourceChange, - pool: &PgPool, - update_stats: &Arc, - ) -> Option + Send + 'static> { - let (source_version_kind, value) = match change.value { - SourceValueChange::Upsert(value) => (SourceVersionKind::CurrentLogic, value), - SourceValueChange::Delete => (SourceVersionKind::Deleted, None), - }; - self.process_source_key_if_newer( - change.key, - SourceVersion { - ordinal: change.ordinal, - kind: source_version_kind, - }, - value, - update_stats, - pool, - ) - } } diff --git a/src/ops/interface.rs b/src/ops/interface.rs index a4102e5af..93022664a 100644 --- a/src/ops/interface.rs +++ b/src/ops/interface.rs @@ -16,10 +16,20 @@ pub struct FlowInstanceContext { pub py_exec_ctx: Option>, } -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)] -pub struct Ordinal(pub i64); +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Default)] +pub struct Ordinal(pub Option); -impl From for i64 { +impl Ordinal { + pub fn unavailable() -> Self { + Self(None) + } + + pub fn is_available(&self) -> bool { + self.0.is_some() + } +} + +impl From for Option { fn from(val: Ordinal) -> Self { val.0 } @@ -30,7 +40,7 @@ impl TryFrom for Ordinal { fn try_from(time: SystemTime) -> Result { let duration = time.duration_since(std::time::UNIX_EPOCH)?; - Ok(duration.as_micros().try_into().map(Ordinal)?) + Ok(Ordinal(Some(duration.as_micros().try_into()?))) } } @@ -38,27 +48,51 @@ impl TryFrom> for Ordinal { type Error = anyhow::Error; fn try_from(time: chrono::DateTime) -> Result { - Ok(Ordinal(time.timestamp_micros())) + Ok(Ordinal(Some(time.timestamp_micros()))) } } -pub struct SourceRowMetadata { +pub struct PartialSourceRowMetadata { pub key: KeyValue, - /// None means the ordinal is unavailable. pub ordinal: Option, } -pub enum SourceValueChange { - /// None means value unavailable in this change - needs a separate poll by get_value() API. - Upsert(Option), - Delete, +#[derive(Debug)] +pub enum SourceValue { + Existence(FieldValues), + NonExistence, +} + +impl SourceValue { + pub fn is_existent(&self) -> bool { + matches!(self, Self::Existence(_)) + } + + pub fn as_optional(&self) -> Option<&FieldValues> { + match self { + Self::Existence(value) => Some(value), + Self::NonExistence => None, + } + } + + pub fn into_optional(self) -> Option { + match self { + Self::Existence(value) => Some(value), + Self::NonExistence => None, + } + } +} + +pub struct SourceData { + pub value: SourceValue, + pub ordinal: Ordinal, } pub struct SourceChange { - /// Last update/deletion ordinal. None means unavailable. - pub ordinal: Option, pub key: KeyValue, - pub value: SourceValueChange, + + /// If None, the engine will poll to get the latest existence state and value. + pub data: Option, } #[derive(Debug, Default)] @@ -73,27 +107,39 @@ pub struct SourceExecutorGetOptions { } #[derive(Debug)] -pub struct SourceValue { - // None if not included in the option. - pub value: Option, - // None if unavailable, or not included in the option. +pub struct PartialSourceRowData { + pub value: Option, pub ordinal: Option, } +impl TryFrom for SourceData { + type Error = anyhow::Error; + + fn try_from(data: PartialSourceRowData) -> Result { + Ok(Self { + value: data + .value + .ok_or_else(|| anyhow::anyhow!("value is missing"))?, + ordinal: data + .ordinal + .ok_or_else(|| anyhow::anyhow!("ordinal is missing"))?, + }) + } +} #[async_trait] pub trait SourceExecutor: Send + Sync { /// Get the list of keys for the source. fn list<'a>( &'a self, options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>>; + ) -> BoxStream<'a, Result>>; // Get the value for the given key. async fn get_value( &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result>; + ) -> Result; async fn change_stream(&self) -> Result>> { Ok(None) diff --git a/src/ops/sources/amazon_s3.rs b/src/ops/sources/amazon_s3.rs index 5056ea1bf..da37f193e 100644 --- a/src/ops/sources/amazon_s3.rs +++ b/src/ops/sources/amazon_s3.rs @@ -49,7 +49,7 @@ impl Executor { } fn datetime_to_ordinal(dt: &aws_sdk_s3::primitives::DateTime) -> Ordinal { - Ordinal((dt.as_nanos() / 1000) as i64) + Ordinal(Some((dt.as_nanos() / 1000) as i64)) } #[async_trait] @@ -57,7 +57,7 @@ impl SourceExecutor for Executor { fn list<'a>( &'a self, _options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { + ) -> BoxStream<'a, Result>> { try_stream! { let mut continuation_token = None; loop { @@ -86,7 +86,7 @@ impl SourceExecutor for Executor { .map(|gs| gs.is_match(key)) .unwrap_or(false); if include && !exclude { - batch.push(SourceRowMetadata { + batch.push(PartialSourceRowMetadata { key: KeyValue::Str(key.to_string().into()), ordinal: obj.last_modified().map(datetime_to_ordinal), }); @@ -110,10 +110,13 @@ impl SourceExecutor for Executor { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result> { + ) -> Result { let key_str = key.str_value()?; if !self.is_file_included(key_str) { - return Ok(None); + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + }); } let resp = self .client @@ -123,11 +126,13 @@ impl SourceExecutor for Executor { .send() .await; let obj = match resp { - Ok(o) => o, - Err(e) => { - warn!("Failed to fetch S3 object {}: {}", key_str, e); - return Ok(None); + Err(e) if e.as_service_error().map_or(false, |e| e.is_no_such_key()) => { + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + }); } + r => r?, }; let ordinal = if options.include_ordinal { obj.last_modified().map(datetime_to_ordinal) @@ -136,21 +141,15 @@ impl SourceExecutor for Executor { }; let value = if options.include_value { let bytes = obj.body.collect().await?.into_bytes(); - Some(if self.binary { + Some(SourceValue::Existence(if self.binary { fields_value!(bytes.to_vec()) } else { - match String::from_utf8(bytes.to_vec()) { - Ok(s) => fields_value!(s), - Err(e) => { - warn!("Failed to decode S3 object {} as UTF-8: {}", key_str, e); - return Ok(None); - } - } - }) + fields_value!(String::from_utf8_lossy(&bytes).to_string()) + })) } else { None }; - Ok(Some(SourceValue { value, ordinal })) + Ok(PartialSourceRowData { value, ordinal }) } async fn change_stream(&self) -> Result>> { @@ -236,17 +235,12 @@ impl Executor { { continue; } - if record.event_name.starts_with("ObjectCreated:") { - changes.push(SourceChange { - key: KeyValue::Str(record.s3.object.key.into()), - ordinal: None, - value: SourceValueChange::Upsert(None), - }); - } else if record.event_name.starts_with("ObjectDeleted:") { + if record.event_name.starts_with("ObjectCreated:") + || record.event_name.starts_with("ObjectDeleted:") + { changes.push(SourceChange { key: KeyValue::Str(record.s3.object.key.into()), - ordinal: None, - value: SourceValueChange::Delete, + data: None, }); } } diff --git a/src/ops/sources/google_drive.rs b/src/ops/sources/google_drive.rs index ce13e3577..2d901d518 100644 --- a/src/ops/sources/google_drive.rs +++ b/src/ops/sources/google_drive.rs @@ -126,7 +126,7 @@ impl Executor { file: File, new_folder_ids: &mut Vec>, seen_ids: &mut HashSet>, - ) -> Result> { + ) -> Result> { if file.trashed == Some(true) { return Ok(None); } @@ -144,7 +144,7 @@ impl Executor { new_folder_ids.push(id); None } else if is_supported_file_type(&mime_type) { - Some(SourceRowMetadata { + Some(PartialSourceRowMetadata { key: KeyValue::Str(id), ordinal: file.modified_time.map(|t| t.try_into()).transpose()?, }) @@ -220,13 +220,8 @@ impl Executor { let file_id = file.id.ok_or_else(|| anyhow!("File has no id"))?; if self.is_file_covered(&file_id).await? { changes.push(SourceChange { - ordinal: Some(modified_time.try_into()?), key: KeyValue::Str(Arc::from(file_id)), - value: if file.trashed == Some(true) { - SourceValueChange::Delete - } else { - SourceValueChange::Upsert(None) - }, + data: None, }); } } @@ -305,7 +300,7 @@ impl SourceExecutor for Executor { fn list<'a>( &'a self, options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { + ) -> BoxStream<'a, Result>> { let mut seen_ids = HashSet::new(); let mut folder_ids = self.root_folder_ids.clone(); let fields = format!( @@ -341,7 +336,7 @@ impl SourceExecutor for Executor { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result> { + ) -> Result { let file_id = key.str_value()?; let fields = format!( "id,name,mimeType,trashed{}", @@ -359,7 +354,10 @@ impl SourceExecutor for Executor { let file = match resp { Some((_, file)) if file.trashed != Some(true) => file, _ => { - return Ok(None); + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + }); } }; let ordinal = if options.include_ordinal { @@ -411,11 +409,11 @@ impl SourceExecutor for Executor { .into() }, ]; - Some(FieldValues { fields }) + Some(SourceValue::Existence(FieldValues { fields })) } None => None, }; - Ok(Some(SourceValue { value, ordinal })) + Ok(PartialSourceRowData { value, ordinal }) } async fn change_stream(&self) -> Result>> { diff --git a/src/ops/sources/local_file.rs b/src/ops/sources/local_file.rs index acdc61dd2..2ac642507 100644 --- a/src/ops/sources/local_file.rs +++ b/src/ops/sources/local_file.rs @@ -43,7 +43,7 @@ impl SourceExecutor for Executor { fn list<'a>( &'a self, options: &'a SourceExecutorListOptions, - ) -> BoxStream<'a, Result>> { + ) -> BoxStream<'a, Result>> { let root_component_size = self.root_path.components().count(); let mut dirs = Vec::new(); dirs.push(Cow::Borrowed(&self.root_path)); @@ -69,7 +69,7 @@ impl SourceExecutor for Executor { None }; if let Some(relative_path) = relative_path.to_str() { - yield vec![SourceRowMetadata { + yield vec![PartialSourceRowMetadata { key: KeyValue::Str(relative_path.into()), ordinal, }]; @@ -88,9 +88,12 @@ impl SourceExecutor for Executor { &self, key: &KeyValue, options: &SourceExecutorGetOptions, - ) -> Result> { + ) -> Result { if !self.is_file_included(key.str_value()?.as_ref()) { - return Ok(None); + return Ok(PartialSourceRowData { + value: Some(SourceValue::NonExistence), + ordinal: Some(Ordinal::unavailable()), + }); } let path = self.root_path.join(key.str_value()?.as_ref()); let ordinal = if options.include_ordinal { @@ -106,15 +109,17 @@ impl SourceExecutor for Executor { } else { fields_value!(String::from_utf8_lossy(&content).to_string()) }; - Some(content) + Some(SourceValue::Existence(content)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + Some(SourceValue::NonExistence) } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => None, Err(e) => Err(e)?, } } else { None }; - Ok(Some(SourceValue { value, ordinal })) + Ok(PartialSourceRowData { value, ordinal }) } }