From 218a4aa98ca7ed8dbd8c483fb4f7a2b672902e8b Mon Sep 17 00:00:00 2001 From: Jiangzhou He Date: Mon, 29 Sep 2025 16:02:16 -0700 Subject: [PATCH] fix: make sure a row is reexported on schema change --- src/builder/analyzer.rs | 5 +++++ src/builder/exec_ctx.rs | 19 +++++++++++-------- src/builder/plan.rs | 2 ++ src/execution/row_indexer.rs | 7 ++++++- src/setup/states.rs | 4 ++-- 5 files changed, 26 insertions(+), 11 deletions(-) diff --git a/src/builder/analyzer.rs b/src/builder/analyzer.rs index d1c9defd6..57be0c16b 100644 --- a/src/builder/analyzer.rs +++ b/src/builder/analyzer.rs @@ -646,6 +646,7 @@ struct ExportDataFieldsInfo { primary_key_schema: Box<[FieldSchema]>, value_fields_idx: Vec, value_stable: bool, + output_value_fingerprinter: Fingerprinter, } impl AnalyzerContext { @@ -866,6 +867,8 @@ impl AnalyzerContext { .as_ref() .map(|uuid_idx| pk_fields_idx.contains(uuid_idx)) .unwrap_or(false); + let output_value_fingerprinter = + Fingerprinter::default().with(&value_fields_schema)?; ( value_fields_schema, ExportDataFieldsInfo { @@ -874,6 +877,7 @@ impl AnalyzerContext { primary_key_schema, value_fields_idx, value_stable, + output_value_fingerprinter, }, ) } @@ -937,6 +941,7 @@ impl AnalyzerContext { primary_key_schema: data_fields_info.primary_key_schema, value_fields: data_fields_info.value_fields_idx, value_stable: data_fields_info.value_stable, + output_value_fingerprinter: data_fields_info.output_value_fingerprinter, }) }) }) diff --git a/src/builder/exec_ctx.rs b/src/builder/exec_ctx.rs index fde184710..de9e6b99f 100644 --- a/src/builder/exec_ctx.rs +++ b/src/builder/exec_ctx.rs @@ -10,6 +10,7 @@ pub struct ImportOpExecutionContext { pub struct ExportOpExecutionContext { pub target_id: i32, + pub schema_version_id: usize, } pub struct FlowSetupExecutionContext { @@ -107,12 +108,12 @@ fn build_import_op_exec_ctx( Ok(ImportOpExecutionContext { source_id }) } -fn build_target_id( +fn build_export_op_exec_ctx( analyzed_target_ss: &AnalyzedTargetSetupState, existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>, metadata: &mut setup::FlowSetupMetadata, target_states: &mut IndexMap, -) -> Result { +) -> Result { let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?; let resource_id = setup::ResourceIdentifier { @@ -121,7 +122,7 @@ fn build_target_id( }; let existing_target_states = existing_target_states.get(&resource_id); let mut compatible_target_ids = HashSet::>::new(); - let mut reusable_schema_version_ids = HashSet::>::new(); + let mut reusable_schema_version_ids = HashSet::>::new(); for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) { let compatibility = if let Some(key_type) = &analyzed_target_ss.key_type && let Some(existing_key_type) = &existing_state.common.key_type @@ -196,7 +197,10 @@ fn build_target_id( }); } } - Ok(target_id) + Ok(ExportOpExecutionContext { + target_id, + schema_version_id, + }) } pub fn build_flow_setup_execution_context( @@ -276,18 +280,17 @@ pub fn build_flow_setup_execution_context( .targets .iter() .map(|analyzed_target_ss| { - let target_id = build_target_id( + build_export_op_exec_ctx( analyzed_target_ss, &target_states_by_name_type, &mut metadata, &mut target_states, - )?; - Ok(ExportOpExecutionContext { target_id }) + ) }) .collect::>>()?; for analyzed_target_ss in analyzed_ss.declarations.iter() { - build_target_id( + build_export_op_exec_ctx( analyzed_target_ss, &target_states_by_name_type, &mut metadata, diff --git a/src/builder/plan.rs b/src/builder/plan.rs index c929891f5..2aa878d50 100644 --- a/src/builder/plan.rs +++ b/src/builder/plan.rs @@ -111,6 +111,8 @@ pub struct AnalyzedExportOp { /// If true, value is never changed on the same primary key. /// This is guaranteed if the primary key contains auto-generated UUIDs. pub value_stable: bool, + /// Fingerprinter of the output value. + pub output_value_fingerprinter: Fingerprinter, } pub struct AnalyzedExportTargetOpGroup { diff --git a/src/execution/row_indexer.rs b/src/execution/row_indexer.rs index a1499ba38..d135ee7bb 100644 --- a/src/execution/row_indexer.rs +++ b/src/execution/row_indexer.rs @@ -599,6 +599,10 @@ impl<'a> RowIndexer<'a> { let mut keys_info = Vec::new(); let collected_values = &data.evaluate_output.collected_values[export_op.input.collector_idx as usize]; + let value_fingerprinter = export_op + .output_value_fingerprinter + .clone() + .with(&export_op_exec_ctx.schema_version_id)?; for value in collected_values.iter() { let primary_key = extract_primary_key_for_export(&export_op.primary_key_def, value)?; @@ -629,7 +633,8 @@ impl<'a> RowIndexer<'a> { let curr_fp = if !export_op.value_stable { Some( - Fingerprinter::default() + value_fingerprinter + .clone() .with(&field_values)? .into_fingerprint(), ) diff --git a/src/setup/states.rs b/src/setup/states.rs index ed79d7f5e..de7154814 100644 --- a/src/setup/states.rs +++ b/src/setup/states.rs @@ -176,8 +176,8 @@ impl Display for ResourceIdentifier { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct TargetSetupStateCommon { pub target_id: i32, - pub schema_version_id: i32, - pub max_schema_version_id: i32, + pub schema_version_id: usize, + pub max_schema_version_id: usize, #[serde(default)] pub setup_by_user: bool, #[serde(default)]