Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/base/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ impl std::fmt::Display for FieldSchema {
pub struct CollectorSchema {
pub fields: Vec<FieldSchema>,
/// If specified, the collector will have an automatically generated UUID field with the given index.
pub auto_uuid_field_idx: Option<usize>,
pub auto_uuid_field_idx: Option<u32>,
}

impl std::fmt::Display for CollectorSchema {
Expand Down
7 changes: 7 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,12 @@ impl AnalyzerContext<'_> {
})
.transpose()?;

let value_stable = collector_schema
.auto_uuid_field_idx
.map(|uuid_idx| match &primary_key_def {
AnalyzedPrimaryKeyDef::Fields(fields) => fields.contains(&uuid_idx),
})
.unwrap_or(false);
Ok(async move {
trace!("Start building executor for export op `{}`", export_op.name);
let (executor, query_target) = executor_fut
Expand All @@ -990,6 +996,7 @@ impl AnalyzerContext<'_> {
primary_key_def,
primary_key_type,
value_fields: value_fields_idx,
value_stable,
})
})
}
Expand Down
3 changes: 3 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ pub struct AnalyzedExportOp {
pub primary_key_type: ValueType,
/// idx for value fields - excluding the primary key field.
pub value_fields: Vec<u32>,
/// If true, value is never changed on the same primary key.
/// This is guaranteed if the primary key contains auto-generated UUIDs.
pub value_stable: bool,
}

pub enum AnalyzedReactiveOp {
Expand Down
15 changes: 9 additions & 6 deletions src/execution/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,20 @@ async fn precommit_source_tracking_info(
.fields
.push(value.fields[*field as usize].clone());
}
let curr_fp = Some(
Fingerprinter::default()
.with(&field_values)?
.into_fingerprint(),
);

let existing_target_keys = target_info.existing_keys_info.remove(&primary_key_json);
let existing_staging_target_keys = target_info
.existing_staging_keys_info
.remove(&primary_key_json);

let curr_fp = if !export_op.value_stable {
Some(
Fingerprinter::default()
.with(&field_values)?
.into_fingerprint(),
)
} else {
None
};
if existing_target_keys
.as_ref()
.map(|keys| !keys.is_empty() && keys.iter().all(|(_, fp)| fp == &curr_fp))
Expand Down