diff --git a/src/base/schema.rs b/src/base/schema.rs index c1fb24c5a..b45887988 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -359,7 +359,7 @@ impl std::fmt::Display for FieldSchema { pub struct CollectorSchema { pub fields: Vec, /// If specified, the collector will have an automatically generated UUID field with the given index. - pub auto_uuid_field_idx: Option, + pub auto_uuid_field_idx: Option, } impl std::fmt::Display for CollectorSchema { diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index 60d489a08..b27c00d9e 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -971,6 +971,12 @@ impl AnalyzerContext<'_> { }) .transpose()?; + let value_stable = collector_schema + .auto_uuid_field_idx + .map(|uuid_idx| match &primary_key_def { + AnalyzedPrimaryKeyDef::Fields(fields) => fields.contains(&uuid_idx), + }) + .unwrap_or(false); Ok(async move { trace!("Start building executor for export op `{}`", export_op.name); let (executor, query_target) = executor_fut @@ -990,6 +996,7 @@ impl AnalyzerContext<'_> { primary_key_def, primary_key_type, value_fields: value_fields_idx, + value_stable, }) }) } diff --git a/src/builder/plan.rs b/src/builder/plan.rs index 8b7802210..43306077b 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -109,6 +109,9 @@ pub struct AnalyzedExportOp { pub primary_key_type: ValueType, /// idx for value fields - excluding the primary key field. pub value_fields: Vec, + /// If true, value is never changed on the same primary key. + /// This is guaranteed if the primary key contains auto-generated UUIDs. + pub value_stable: bool, } pub enum AnalyzedReactiveOp { diff --git a/src/execution/indexer.rs b/src/execution/indexer.rs index ef42bc400..f144e2de2 100644 --- a/src/execution/indexer.rs +++ b/src/execution/indexer.rs @@ -226,17 +226,20 @@ async fn precommit_source_tracking_info( .fields .push(value.fields[*field as usize].clone()); } - let curr_fp = Some( - Fingerprinter::default() - .with(&field_values)? - .into_fingerprint(), - ); - let existing_target_keys = target_info.existing_keys_info.remove(&primary_key_json); let existing_staging_target_keys = target_info .existing_staging_keys_info .remove(&primary_key_json); + let curr_fp = if !export_op.value_stable { + Some( + Fingerprinter::default() + .with(&field_values)? + .into_fingerprint(), + ) + } else { + None + }; if existing_target_keys .as_ref() .map(|keys| !keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp))