Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ struct ExportDataFieldsInfo {
primary_key_schema: Box<[FieldSchema]>,
value_fields_idx: Vec<u32>,
value_stable: bool,
output_value_fingerprinter: Fingerprinter,
}

impl AnalyzerContext {
Expand Down Expand Up @@ -866,6 +867,8 @@ impl AnalyzerContext {
.as_ref()
.map(|uuid_idx| pk_fields_idx.contains(uuid_idx))
.unwrap_or(false);
let output_value_fingerprinter =
Fingerprinter::default().with(&value_fields_schema)?;
(
value_fields_schema,
ExportDataFieldsInfo {
Expand All @@ -874,6 +877,7 @@ impl AnalyzerContext {
primary_key_schema,
value_fields_idx,
value_stable,
output_value_fingerprinter,
},
)
}
Expand Down Expand Up @@ -937,6 +941,7 @@ impl AnalyzerContext {
primary_key_schema: data_fields_info.primary_key_schema,
value_fields: data_fields_info.value_fields_idx,
value_stable: data_fields_info.value_stable,
output_value_fingerprinter: data_fields_info.output_value_fingerprinter,
})
})
})
Expand Down
19 changes: 11 additions & 8 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct ImportOpExecutionContext {

pub struct ExportOpExecutionContext {
pub target_id: i32,
pub schema_version_id: usize,
}

pub struct FlowSetupExecutionContext {
Expand Down Expand Up @@ -107,12 +108,12 @@ fn build_import_op_exec_ctx(
Ok(ImportOpExecutionContext { source_id })
}

fn build_target_id(
fn build_export_op_exec_ctx(
analyzed_target_ss: &AnalyzedTargetSetupState,
existing_target_states: &HashMap<&setup::ResourceIdentifier, Vec<&setup::TargetSetupState>>,
metadata: &mut setup::FlowSetupMetadata,
target_states: &mut IndexMap<setup::ResourceIdentifier, setup::TargetSetupState>,
) -> Result<i32> {
) -> Result<ExportOpExecutionContext> {
let target_factory = get_target_factory(&analyzed_target_ss.target_kind)?;

let resource_id = setup::ResourceIdentifier {
Expand All @@ -121,7 +122,7 @@ fn build_target_id(
};
let existing_target_states = existing_target_states.get(&resource_id);
let mut compatible_target_ids = HashSet::<Option<i32>>::new();
let mut reusable_schema_version_ids = HashSet::<Option<i32>>::new();
let mut reusable_schema_version_ids = HashSet::<Option<usize>>::new();
for existing_state in existing_target_states.iter().flat_map(|v| v.iter()) {
let compatibility = if let Some(key_type) = &analyzed_target_ss.key_type
&& let Some(existing_key_type) = &existing_state.common.key_type
Expand Down Expand Up @@ -196,7 +197,10 @@ fn build_target_id(
});
}
}
Ok(target_id)
Ok(ExportOpExecutionContext {
target_id,
schema_version_id,
})
}

pub fn build_flow_setup_execution_context(
Expand Down Expand Up @@ -276,18 +280,17 @@ pub fn build_flow_setup_execution_context(
.targets
.iter()
.map(|analyzed_target_ss| {
let target_id = build_target_id(
build_export_op_exec_ctx(
analyzed_target_ss,
&target_states_by_name_type,
&mut metadata,
&mut target_states,
)?;
Ok(ExportOpExecutionContext { target_id })
)
})
.collect::<Result<Vec<_>>>()?;

for analyzed_target_ss in analyzed_ss.declarations.iter() {
build_target_id(
build_export_op_exec_ctx(
analyzed_target_ss,
&target_states_by_name_type,
&mut metadata,
Expand Down
2 changes: 2 additions & 0 deletions src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ pub struct AnalyzedExportOp {
/// If true, value is never changed on the same primary key.
/// This is guaranteed if the primary key contains auto-generated UUIDs.
pub value_stable: bool,
/// Fingerprinter of the output value.
pub output_value_fingerprinter: Fingerprinter,
}

pub struct AnalyzedExportTargetOpGroup {
Expand Down
7 changes: 6 additions & 1 deletion src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ impl<'a> RowIndexer<'a> {
let mut keys_info = Vec::new();
let collected_values =
&data.evaluate_output.collected_values[export_op.input.collector_idx as usize];
let value_fingerprinter = export_op
.output_value_fingerprinter
.clone()
.with(&export_op_exec_ctx.schema_version_id)?;
for value in collected_values.iter() {
let primary_key =
extract_primary_key_for_export(&export_op.primary_key_def, value)?;
Expand Down Expand Up @@ -629,7 +633,8 @@ impl<'a> RowIndexer<'a> {

let curr_fp = if !export_op.value_stable {
Some(
Fingerprinter::default()
value_fingerprinter
.clone()
.with(&field_values)?
.into_fingerprint(),
)
Expand Down
4 changes: 2 additions & 2 deletions src/setup/states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ impl Display for ResourceIdentifier {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TargetSetupStateCommon {
pub target_id: i32,
pub schema_version_id: i32,
pub max_schema_version_id: i32,
pub schema_version_id: usize,
pub max_schema_version_id: usize,
#[serde(default)]
pub setup_by_user: bool,
#[serde(default)]
Expand Down
Loading