diff --git a/rust/cocoindex/src/builder/analyzer.rs b/rust/cocoindex/src/builder/analyzer.rs index a7d57a1d..712e2e1c 100644 --- a/rust/cocoindex/src/builder/analyzer.rs +++ b/rust/cocoindex/src/builder/analyzer.rs @@ -123,6 +123,7 @@ impl TryFrom<&TableSchema> for TableSchemaBuilder { kind: schema.kind, sub_scope: Arc::new(Mutex::new(DataScopeBuilder { data: (&schema.row).try_into()?, + added_fields_def_fp: Default::default(), })), }) } @@ -343,44 +344,88 @@ fn try_merge_collector_schemas( }) } +struct FieldDefFingerprintBuilder { + source_op_names: HashSet, + fingerprinter: Fingerprinter, +} + +impl FieldDefFingerprintBuilder { + pub fn new() -> Self { + Self { + source_op_names: HashSet::new(), + fingerprinter: Fingerprinter::default(), + } + } + + pub fn add(&mut self, key: Option<&str>, def_fp: FieldDefFingerprint) -> Result<()> { + self.source_op_names.extend(def_fp.source_op_names); + let mut fingerprinter = std::mem::take(&mut self.fingerprinter); + if let Some(key) = key { + fingerprinter = fingerprinter.with(key)?; + } + fingerprinter = fingerprinter.with(def_fp.fingerprint.as_slice())?; + self.fingerprinter = fingerprinter; + Ok(()) + } + + pub fn build(self) -> FieldDefFingerprint { + FieldDefFingerprint { + source_op_names: self.source_op_names, + fingerprint: self.fingerprinter.into_fingerprint(), + } + } +} + #[derive(Debug)] pub(super) struct CollectorBuilder { pub schema: Arc, pub is_used: bool, + pub def_fps: Vec, } impl CollectorBuilder { - pub fn new(schema: Arc) -> Self { + pub fn new(schema: Arc, def_fp: FieldDefFingerprint) -> Self { Self { schema, is_used: false, + def_fps: vec![def_fp], } } - pub fn merge_schema(&mut self, schema: &CollectorSchema) -> Result<()> { + pub fn collect(&mut self, schema: &CollectorSchema, def_fp: FieldDefFingerprint) -> Result<()> { if self.is_used { api_bail!("Collector is already used"); } let existing_schema = Arc::make_mut(&mut self.schema); *existing_schema = try_merge_collector_schemas(existing_schema, schema)?; + self.def_fps.push(def_fp); Ok(()) } - pub fn use_schema(&mut self) -> Arc { + pub fn use_collection(&mut self) -> Result<(Arc, FieldDefFingerprint)> { self.is_used = true; - self.schema.clone() + + self.def_fps + .sort_by(|a, b| a.fingerprint.as_slice().cmp(b.fingerprint.as_slice())); + let mut def_fp_builder = FieldDefFingerprintBuilder::new(); + for def_fp in self.def_fps.iter() { + def_fp_builder.add(None, def_fp.clone())?; + } + Ok((self.schema.clone(), def_fp_builder.build())) } } #[derive(Debug)] pub(super) struct DataScopeBuilder { pub data: StructSchemaBuilder, + pub added_fields_def_fp: IndexMap, } impl DataScopeBuilder { pub fn new() -> Self { Self { data: Default::default(), + added_fields_def_fp: Default::default(), } } @@ -392,12 +437,14 @@ impl DataScopeBuilder { &mut self, name: FieldName, value_type: &EnrichedValueType, + def_fp: FieldDefFingerprint, ) -> Result { let field_index = self.data.add_field(FieldSchema { - name, + name: name.clone(), value_type: EnrichedValueType::from_alternative(value_type)?, description: None, })?; + self.added_fields_def_fp.insert(name, def_fp); Ok(AnalyzedOpOutput { field_idx: field_index, }) @@ -407,12 +454,15 @@ impl DataScopeBuilder { pub fn analyze_field_path<'a>( &'a self, field_path: &'_ FieldPath, + base_def_fp: FieldDefFingerprint, ) -> Result<( AnalyzedLocalFieldReference, &'a EnrichedValueType, + FieldDefFingerprint, )> { let mut indices = Vec::with_capacity(field_path.len()); let mut struct_schema = &self.data; + let mut def_fp = base_def_fp; if field_path.is_empty() { bail!("Field path is empty"); @@ -424,6 +474,13 @@ impl DataScopeBuilder { let (field_idx, field) = struct_schema.find_field(field_name).ok_or_else(|| { api_error!("Field {} not found", field_path[0..(i + 1)].join(".")) })?; + if let Some(added_def_fp) = self.added_fields_def_fp.get(field_name) { + def_fp = added_def_fp.clone(); + } else { + def_fp.fingerprint = Fingerprinter::default() + .with(&("field", &def_fp.fingerprint, field_name))? + .into_fingerprint(); + }; indices.push(field_idx); if i + 1 >= field_path.len() { break &field.value_type; @@ -442,6 +499,7 @@ impl DataScopeBuilder { fields_idx: indices, }, value_type, + def_fp, )) } } @@ -463,15 +521,16 @@ impl OpScopeStates { &mut self, collector_name: FieldName, schema: CollectorSchema, + def_fp: FieldDefFingerprint, ) -> Result { let existing_len = self.collectors.len(); let idx = match self.collectors.entry(collector_name) { indexmap::map::Entry::Occupied(mut entry) => { - entry.get_mut().merge_schema(&schema)?; + entry.get_mut().collect(&schema, def_fp)?; entry.index() } indexmap::map::Entry::Vacant(entry) => { - entry.insert(CollectorBuilder::new(Arc::new(schema))); + entry.insert(CollectorBuilder::new(Arc::new(schema), def_fp)); existing_len } }; @@ -483,16 +542,22 @@ impl OpScopeStates { pub fn consume_collector( &mut self, collector_name: &FieldName, - ) -> Result<(AnalyzedLocalCollectorReference, Arc)> { + ) -> Result<( + AnalyzedLocalCollectorReference, + Arc, + FieldDefFingerprint, + )> { let (collector_idx, _, collector) = self .collectors .get_full_mut(collector_name) .ok_or_else(|| api_error!("Collector not found: {}", collector_name))?; + let (schema, def_fp) = collector.use_collection()?; Ok(( AnalyzedLocalCollectorReference { collector_idx: collector_idx as u32, }, - collector.use_schema(), + schema, + def_fp, )) } @@ -522,6 +587,7 @@ pub struct OpScope { pub parent: Option<(Arc, spec::FieldPath)>, pub(super) data: Arc>, pub(super) states: Mutex, + pub(super) base_value_def_fp: FieldDefFingerprint, } struct Iter<'a>(Option<&'a OpScope>); @@ -545,12 +611,14 @@ impl OpScope { name: String, parent: Option<(Arc, spec::FieldPath)>, data: Arc>, + base_value_def_fp: FieldDefFingerprint, ) -> Arc { Arc::new(Self { name, parent, data, states: Mutex::default(), + base_value_def_fp, }) } @@ -558,12 +626,13 @@ impl OpScope { &self, name: FieldName, value_type: EnrichedValueType, + def_fp: FieldDefFingerprint, ) -> Result { let op_output = self .data .lock() .unwrap() - .add_field(name.clone(), &value_type)?; + .add_field(name.clone(), &value_type, def_fp)?; self.states .lock() .unwrap() @@ -591,19 +660,21 @@ impl OpScope { scope_name: String, field_path: &FieldPath, ) -> Result<(AnalyzedLocalFieldReference, Arc)> { - let (local_field_ref, sub_data_scope) = { + let (local_field_ref, sub_data_scope, def_fp) = { let data_scope = self.data.lock().unwrap(); - let (local_field_ref, value_type) = data_scope.analyze_field_path(field_path)?; + let (local_field_ref, value_type, def_fp) = + data_scope.analyze_field_path(field_path, self.base_value_def_fp.clone())?; let sub_data_scope = match &value_type.typ { ValueTypeBuilder::Table(table_type) => table_type.sub_scope.clone(), _ => api_bail!("ForEach only works on collection, field {field_path} is not"), }; - (local_field_ref, sub_data_scope) + (local_field_ref, sub_data_scope, def_fp) }; let sub_op_scope = OpScope::new( scope_name, Some((self.clone(), field_path.clone())), sub_data_scope, + def_fp, ); Ok((local_field_ref, sub_op_scope)) } @@ -639,34 +710,51 @@ fn find_scope<'a>(scope_name: &ScopeName, op_scope: &'a OpScope) -> Result<(u32, fn analyze_struct_mapping( mapping: &StructMapping, op_scope: &OpScope, -) -> Result<(AnalyzedStructMapping, Vec)> { +) -> Result<(AnalyzedStructMapping, Vec, FieldDefFingerprint)> { let mut field_mappings = Vec::with_capacity(mapping.fields.len()); let mut field_schemas = Vec::with_capacity(mapping.fields.len()); + + let mut fields_def_fps = Vec::with_capacity(mapping.fields.len()); for field in mapping.fields.iter() { - let (field_mapping, value_type) = analyze_value_mapping(&field.spec, op_scope)?; + let (field_mapping, value_type, field_def_fp) = + analyze_value_mapping(&field.spec, op_scope)?; field_mappings.push(field_mapping); field_schemas.push(FieldSchema { name: field.name.clone(), value_type, description: None, }); + fields_def_fps.push((field.name.as_str(), field_def_fp)); + } + fields_def_fps.sort_by_key(|(name, _)| *name); + let mut def_fp_builder = FieldDefFingerprintBuilder::new(); + for (name, def_fp) in fields_def_fps { + def_fp_builder.add(Some(name), def_fp)?; } Ok(( AnalyzedStructMapping { fields: field_mappings, }, field_schemas, + def_fp_builder.build(), )) } fn analyze_value_mapping( value_mapping: &ValueMapping, op_scope: &OpScope, -) -> Result<(AnalyzedValueMapping, EnrichedValueType)> { +) -> Result<(AnalyzedValueMapping, EnrichedValueType, FieldDefFingerprint)> { let result = match value_mapping { ValueMapping::Constant(v) => { let value = value::Value::from_json(v.value.clone(), &v.schema.typ)?; - (AnalyzedValueMapping::Constant { value }, v.schema.clone()) + let value_mapping = AnalyzedValueMapping::Constant { value }; + let def_fp = FieldDefFingerprint { + source_op_names: HashSet::new(), + fingerprint: Fingerprinter::default() + .with(&("constant", &v.value, &v.schema.without_attrs()))? + .into_fingerprint(), + }; + (value_mapping, v.schema.clone(), def_fp) } ValueMapping::Field(v) => { @@ -675,14 +763,14 @@ fn analyze_value_mapping( None => (0, op_scope), }; let data_scope = op_scope.data.lock().unwrap(); - let (local_field_ref, value_type) = data_scope.analyze_field_path(&v.field_path)?; - ( - AnalyzedValueMapping::Field(AnalyzedFieldReference { - local: local_field_ref, - scope_up_level, - }), - EnrichedValueType::from_alternative(value_type)?, - ) + let (local_field_ref, value_type, def_fp) = + data_scope.analyze_field_path(&v.field_path, op_scope.base_value_def_fp.clone())?; + let schema = EnrichedValueType::from_alternative(value_type)?; + let value_mapping = AnalyzedValueMapping::Field(AnalyzedFieldReference { + local: local_field_ref, + scope_up_level, + }); + (value_mapping, schema, def_fp) } }; Ok(result) @@ -691,17 +779,21 @@ fn analyze_value_mapping( fn analyze_input_fields( arg_bindings: &[OpArgBinding], op_scope: &OpScope, -) -> Result> { - let mut input_field_schemas = Vec::with_capacity(arg_bindings.len()); +) -> Result<(Vec, FieldDefFingerprint)> { + let mut op_arg_schemas = Vec::with_capacity(arg_bindings.len()); + let mut def_fp_builder = FieldDefFingerprintBuilder::new(); for arg_binding in arg_bindings.iter() { - let (analyzed_value, value_type) = analyze_value_mapping(&arg_binding.value, op_scope)?; - input_field_schemas.push(OpArgSchema { + let (analyzed_value, value_type, def_fp) = + analyze_value_mapping(&arg_binding.value, op_scope)?; + let op_arg_schema = OpArgSchema { name: arg_binding.arg_name.clone(), value_type, analyzed_value: analyzed_value.clone(), - }); + }; + def_fp_builder.add(arg_binding.arg_name.0.as_ref().map(|n| n.as_str()), def_fp)?; + op_arg_schemas.push(op_arg_schema); } - Ok(input_field_schemas) + Ok((op_arg_schemas, def_fp_builder.build())) } fn add_collector( @@ -709,13 +801,14 @@ fn add_collector( collector_name: FieldName, schema: CollectorSchema, op_scope: &OpScope, + def_fp: FieldDefFingerprint, ) -> Result { let (scope_up_level, scope) = find_scope(scope_name, op_scope)?; let local_ref = scope .states .lock() .unwrap() - .add_collector(collector_name, schema)?; + .add_collector(collector_name, schema, def_fp)?; Ok(AnalyzedCollectorReference { local: local_ref, scope_up_level, @@ -729,6 +822,7 @@ struct ExportDataFieldsInfo { value_fields_idx: Vec, value_stable: bool, output_value_fingerprinter: Fingerprinter, + def_fp: FieldDefFingerprint, } impl AnalyzerContext { @@ -748,7 +842,13 @@ impl AnalyzerContext { let op_name = import_op.name; let primary_key_schema = Box::from(output_type.typ.key_schema()); - let output = op_scope.add_op_output(op_name.clone(), output_type)?; + let def_fp = FieldDefFingerprint { + source_op_names: HashSet::from([op_name.clone()]), + fingerprint: Fingerprinter::default() + .with(&("import", &op_name))? + .into_fingerprint(), + }; + let output = op_scope.add_op_output(op_name.clone(), output_type, def_fp)?; let concur_control_options = import_op .spec @@ -781,12 +881,11 @@ impl AnalyzerContext { op_scope: &Arc, reactive_op: &NamedSpec, ) -> Result>> { - let op_scope_clone = op_scope.clone(); let reactive_op_clone = reactive_op.clone(); let reactive_op_name = reactive_op.name.clone(); let result_fut = match reactive_op_clone.spec { ReactiveOpSpec::Transform(op) => { - let input_field_schemas = + let (input_field_schemas, input_def_fp) = analyze_input_fields(&op.inputs, op_scope).with_context(|| { format!("Preparing inputs for transform op: {}", reactive_op_name) })?; @@ -805,8 +904,23 @@ impl AnalyzerContext { .with(&op.op)? .with(&build_output.output_type.without_attrs())? .with(&build_output.behavior_version)?; - let output = - op_scope.add_op_output(reactive_op_name.clone(), build_output.output_type)?; + + let def_fp = FieldDefFingerprint { + source_op_names: input_def_fp.source_op_names, + fingerprint: Fingerprinter::default() + .with(&( + "transform", + &op.op, + &input_def_fp.fingerprint, + &build_output.behavior_version, + ))? + .into_fingerprint(), + }; + let output = op_scope.add_op_output( + reactive_op_name.clone(), + build_output.output_type, + def_fp, + )?; let op_name = reactive_op_name.clone(); let op_kind = op.op.kind.clone(); @@ -889,22 +1003,33 @@ impl AnalyzerContext { } ReactiveOpSpec::Collect(op) => { - let (struct_mapping, fields_schema) = - analyze_struct_mapping(&op.input, &op_scope_clone)?; + let (struct_mapping, fields_schema, mut def_fp) = + analyze_struct_mapping(&op.input, op_scope)?; let has_auto_uuid_field = op.auto_uuid_field.is_some(); + def_fp.fingerprint = Fingerprinter::default() + .with(&( + "collect", + &def_fp.fingerprint, + &fields_schema, + &has_auto_uuid_field, + ))? + .into_fingerprint(); let fingerprinter = Fingerprinter::default().with(&fields_schema)?; + let input_field_names: Vec = fields_schema.iter().map(|f| f.name.clone()).collect(); let collector_ref = add_collector( &op.scope_name, op.collector_name.clone(), CollectorSchema::from_fields(fields_schema, op.auto_uuid_field.clone()), - &op_scope_clone, + op_scope, + def_fp, )?; + let op_scope = op_scope.clone(); async move { // Get the merged collector schema after adding let collector_schema: Arc = { - let scope = find_scope(&op.scope_name, &op_scope_clone)?.1; + let scope = find_scope(&op.scope_name, &op_scope)?.1; let states = scope.states.lock().unwrap(); let collector = states.collectors.get(&op.collector_name).unwrap(); collector.schema.clone() @@ -955,11 +1080,12 @@ impl AnalyzerContext { let mut data_fields_infos = Vec::::new(); for idx in export_op_group.op_idx.iter() { let export_op = &flow_inst.export_ops[*idx]; - let (local_collector_ref, collector_schema) = op_scope - .states - .lock() - .unwrap() - .consume_collector(&export_op.spec.collector_name)?; + let (local_collector_ref, collector_schema, def_fp) = + op_scope + .states + .lock() + .unwrap() + .consume_collector(&export_op.spec.collector_name)?; let (value_fields_schema, data_collection_info) = match &export_op.spec.index_options.primary_key_fields { Some(fields) => { @@ -1002,6 +1128,7 @@ impl AnalyzerContext { value_fields_idx, value_stable, output_value_fingerprinter, + def_fp, }, ) } @@ -1071,6 +1198,14 @@ impl AnalyzerContext { }; targets_analyzed_ss[*idx] = Some(export_op_ss); + let def_fp = FieldDefFingerprint { + source_op_names: data_fields_info.def_fp.source_op_names, + fingerprint: Fingerprinter::default() + .with("export")? + .with(&data_fields_info.def_fp.fingerprint)? + .with(&export_op.spec.target)? + .into_fingerprint(), + }; Ok(async move { trace!("Start building executor for export op `{op_name}`"); let export_context = data_coll_output @@ -1088,6 +1223,7 @@ impl AnalyzerContext { value_fields: data_fields_info.value_fields_idx, value_stable: data_fields_info.value_stable, output_value_fingerprinter: data_fields_info.output_value_fingerprinter, + def_fp, }) }) }) @@ -1183,7 +1319,12 @@ pub async fn analyze_flow( flow_ctx, }; let root_data_scope = Arc::new(Mutex::new(DataScopeBuilder::new())); - let root_op_scope = OpScope::new(ROOT_SCOPE_NAME.to_string(), None, root_data_scope); + let root_op_scope = OpScope::new( + ROOT_SCOPE_NAME.to_string(), + None, + root_data_scope, + FieldDefFingerprint::default(), + ); let mut import_ops_futs = Vec::with_capacity(flow_inst.import_ops.len()); for import_op in flow_inst.import_ops.iter() { import_ops_futs.push( @@ -1260,7 +1401,7 @@ pub async fn analyze_flow( declarations: declarations_analyzed_ss, }; - let legacy_fingerprint = Fingerprinter::default() + let legacy_fingerprint_v1 = Fingerprinter::default() .with(&flow_inst)? .with(&flow_schema.schema)? .into_fingerprint(); @@ -1325,14 +1466,11 @@ pub async fn analyze_flow( } Ok(fingerprinter) } - let current_fingerprint = + let legacy_fingerprint_v2 = append_function_behavior(current_fingerprinter, &op_scope.reactive_ops)? .into_fingerprint(); Ok(ExecutionPlan { - logic_fingerprint: ExecutionPlanLogicFingerprint { - current: current_fingerprint, - legacy: legacy_fingerprint, - }, + legacy_fingerprint: vec![legacy_fingerprint_v1, legacy_fingerprint_v2], import_ops, op_scope, export_ops, @@ -1358,18 +1496,23 @@ pub async fn analyze_transient_flow<'a>( }; let mut input_fields = vec![]; for field in flow_inst.input_fields.iter() { - let analyzed_field = root_data_scope.add_field(field.name.clone(), &field.value_type)?; + let analyzed_field = root_data_scope.add_field( + field.name.clone(), + &field.value_type, + FieldDefFingerprint::default(), + )?; input_fields.push(analyzed_field); } let root_op_scope = OpScope::new( ROOT_SCOPE_NAME.to_string(), None, Arc::new(Mutex::new(root_data_scope)), + FieldDefFingerprint::default(), ); let op_scope_fut = analyzer_ctx .analyze_op_scope(&root_op_scope, &flow_inst.reactive_ops) .await?; - let (output_value, output_type) = + let (output_value, output_type, _) = analyze_value_mapping(&flow_inst.output_value, &root_op_scope)?; let data_schema = build_flow_schema(&root_op_scope)?; let plan_fut = async move { diff --git a/rust/cocoindex/src/builder/flow_builder.rs b/rust/cocoindex/src/builder/flow_builder.rs index b6d38bb2..c60c88f4 100644 --- a/rust/cocoindex/src/builder/flow_builder.rs +++ b/rust/cocoindex/src/builder/flow_builder.rs @@ -1,7 +1,9 @@ use crate::{ - base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange, + base::schema::EnrichedValueType, builder::plan::FieldDefFingerprint, prelude::*, + py::Pythonized, setup::ObjectSetupChange, }; +use cocoindex_utils::fingerprint::Fingerprinter; use pyo3::{exceptions::PyException, prelude::*}; use pyo3_async_runtimes::tokio::future_into_py; use std::{collections::btree_map, ops::Deref}; @@ -119,8 +121,8 @@ impl DataSlice { spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => { let data_scope_builder = self.scope.data.lock().unwrap(); let struct_schema = { - let (_, val_type) = data_scope_builder - .analyze_field_path(field_path) + let (_, val_type, _) = data_scope_builder + .analyze_field_path(field_path, self.scope.base_value_def_fp.clone()) .into_py_result()?; match &val_type.typ { ValueTypeBuilder::Struct(struct_type) => struct_type, @@ -171,7 +173,8 @@ impl DataSlice { spec::ValueMapping::Constant(c) => c.schema.clone(), spec::ValueMapping::Field(v) => { let data_scope_builder = self.scope.data.lock().unwrap(); - let (_, val_type) = data_scope_builder.analyze_field_path(&v.field_path)?; + let (_, val_type, _) = data_scope_builder + .analyze_field_path(&v.field_path, self.scope.base_value_def_fp.clone())?; EnrichedValueType::from_alternative(val_type)? } }; @@ -257,6 +260,7 @@ impl FlowBuilder { spec::ROOT_SCOPE_NAME.to_string(), None, Arc::new(Mutex::new(DataScopeBuilder::new())), + FieldDefFingerprint::default(), ); let flow_inst_context = build_flow_instance_context( name, @@ -366,7 +370,19 @@ impl FlowBuilder { { let mut root_data_scope = self.root_op_scope.data.lock().unwrap(); root_data_scope - .add_field(name.clone(), &value_type) + .add_field( + name.clone(), + &value_type, + FieldDefFingerprint { + source_op_names: HashSet::from([name.clone()]), + fingerprint: Fingerprinter::default() + .with("input") + .into_py_result()? + .with(&name) + .into_py_result()? + .into_fingerprint(), + }, + ) .into_py_result()?; } let result = Self::last_field_to_data_slice(&self.root_op_scope).into_py_result()?; @@ -545,11 +561,17 @@ impl FlowBuilder { auto_uuid_field, ); { + // TODO: Pass in the right field def fingerprint let mut collector = collector.collector.lock().unwrap(); if let Some(collector) = collector.as_mut() { - collector.merge_schema(&collector_schema).into_py_result()?; + collector + .collect(&collector_schema, FieldDefFingerprint::default()) + .into_py_result()?; } else { - *collector = Some(CollectorBuilder::new(Arc::new(collector_schema))); + *collector = Some(CollectorBuilder::new( + Arc::new(collector_schema), + FieldDefFingerprint::default(), + )); } } diff --git a/rust/cocoindex/src/builder/plan.rs b/rust/cocoindex/src/builder/plan.rs index 7a2fbddc..bf7cbab0 100644 --- a/rust/cocoindex/src/builder/plan.rs +++ b/rust/cocoindex/src/builder/plan.rs @@ -53,6 +53,24 @@ pub struct AnalyzedOpOutput { pub field_idx: u32, } +/// Tracks which affects value of the field, to detect changes of logic. +#[derive(Debug, Clone)] +pub struct FieldDefFingerprint { + /// Name of sources that affect value of the field. + pub source_op_names: HashSet, + /// Fingerprint of the logic that affects value of the field. + pub fingerprint: Fingerprint, +} + +impl Default for FieldDefFingerprint { + fn default() -> Self { + Self { + source_op_names: HashSet::new(), + fingerprint: Fingerprinter::default().into_fingerprint(), + } + } +} + pub struct AnalyzedImportOp { pub name: String, pub executor: Box, @@ -121,6 +139,7 @@ pub struct AnalyzedExportOp { pub value_stable: bool, /// Fingerprinter of the output value. pub output_value_fingerprinter: Fingerprinter, + pub def_fp: FieldDefFingerprint, } pub struct AnalyzedExportTargetOpGroup { @@ -141,20 +160,8 @@ pub struct AnalyzedOpScope { pub scope_qualifier: String, } -pub struct ExecutionPlanLogicFingerprint { - pub current: Fingerprint, - pub legacy: Fingerprint, -} - -impl ExecutionPlanLogicFingerprint { - pub fn matches(&self, other: impl AsRef<[u8]>) -> bool { - self.current.as_slice() == other.as_ref() || self.legacy.as_slice() == other.as_ref() - } -} - pub struct ExecutionPlan { - pub logic_fingerprint: ExecutionPlanLogicFingerprint, - + pub legacy_fingerprint: Vec, pub import_ops: Vec, pub op_scope: AnalyzedOpScope, pub export_ops: Vec, diff --git a/rust/cocoindex/src/execution/dumper.rs b/rust/cocoindex/src/execution/dumper.rs index a4549023..a1607d0e 100644 --- a/rust/cocoindex/src/execution/dumper.rs +++ b/rust/cocoindex/src/execution/dumper.rs @@ -1,3 +1,4 @@ +use crate::execution::indexing_status::SourceLogicFingerprint; use crate::prelude::*; use futures::{StreamExt, future::try_join_all}; @@ -71,6 +72,7 @@ impl<'a> Dumper<'a> { import_op: &'a AnalyzedImportOp, key: &value::KeyValue, key_aux_info: &serde_json::Value, + source_logic_fp: &SourceLogicFingerprint, collected_values_buffer: &'b mut Vec>, ) -> Result>>> where @@ -83,6 +85,7 @@ impl<'a> Dumper<'a> { schema: self.schema, key, import_op_idx, + source_logic_fp, }, key_aux_info, self.setup_execution_ctx, @@ -139,6 +142,12 @@ impl<'a> Dumper<'a> { key_aux_info: serde_json::Value, file_path: PathBuf, ) -> Result<()> { + let source_logic_fp = SourceLogicFingerprint::new( + self.plan, + import_op_idx, + &self.setup_execution_ctx.export_ops, + self.plan.legacy_fingerprint.clone(), + )?; let _permit = import_op .concurrency_controller .acquire(concur_control::BYTES_UNKNOWN_YET) @@ -150,6 +159,7 @@ impl<'a> Dumper<'a> { import_op, &key, &key_aux_info, + &source_logic_fp, &mut collected_values_buffer, ) .await diff --git a/rust/cocoindex/src/execution/evaluator.rs b/rust/cocoindex/src/execution/evaluator.rs index f3d83f46..23eec838 100644 --- a/rust/cocoindex/src/execution/evaluator.rs +++ b/rust/cocoindex/src/execution/evaluator.rs @@ -1,3 +1,4 @@ +use crate::execution::indexing_status::SourceLogicFingerprint; use crate::prelude::*; use anyhow::{Context, Ok}; @@ -634,6 +635,7 @@ pub struct SourceRowEvaluationContext<'a> { pub schema: &'a schema::FlowSchema, pub key: &'a value::KeyValue, pub import_op_idx: usize, + pub source_logic_fp: &'a SourceLogicFingerprint, } #[derive(Debug)] diff --git a/rust/cocoindex/src/execution/indexing_status.rs b/rust/cocoindex/src/execution/indexing_status.rs index 39daf8ab..b0138032 100644 --- a/rust/cocoindex/src/execution/indexing_status.rs +++ b/rust/cocoindex/src/execution/indexing_status.rs @@ -3,6 +3,47 @@ use crate::prelude::*; use super::db_tracking; use super::evaluator; use futures::try_join; +use utils::fingerprint::{Fingerprint, Fingerprinter}; + +pub struct SourceLogicFingerprint { + pub current: Fingerprint, + pub legacy: Vec, +} + +impl SourceLogicFingerprint { + pub fn new( + exec_plan: &plan::ExecutionPlan, + source_idx: usize, + export_exec_ctx: &[exec_ctx::ExportOpExecutionContext], + legacy: Vec, + ) -> Result { + let import_op = &exec_plan.import_ops[source_idx]; + let mut fp = Fingerprinter::default(); + if exec_plan.import_ops.len() != export_exec_ctx.len() { + bail!("Import op count does not match export op count"); + } + for (export_op, export_op_exec_ctx) in + std::iter::zip(exec_plan.export_ops.iter(), export_exec_ctx.iter()) + { + if export_op.def_fp.source_op_names.contains(&import_op.name) { + fp = fp.with(&( + &export_op.def_fp.fingerprint, + &export_op_exec_ctx.target_id, + &export_op_exec_ctx.schema_version_id, + ))?; + } + } + Ok(Self { + current: fp.into_fingerprint(), + legacy, + }) + } + + pub fn matches(&self, other: impl AsRef<[u8]>) -> bool { + self.current.as_slice() == other.as_ref() + || self.legacy.iter().any(|fp| fp.as_slice() == other.as_ref()) + } +} #[derive(Debug, Serialize)] pub struct SourceRowLastProcessedInfo { @@ -54,7 +95,7 @@ pub async fn get_source_row_indexing_status( is_logic_current: l .process_logic_fingerprint .as_ref() - .map_or(false, |fp| src_eval_ctx.plan.logic_fingerprint.matches(fp)), + .map_or(false, |fp| src_eval_ctx.source_logic_fp.matches(fp)), }); let current = SourceRowInfo { ordinal: current.ordinal, diff --git a/rust/cocoindex/src/execution/row_indexer.rs b/rust/cocoindex/src/execution/row_indexer.rs index 09d6a4c4..d2395d00 100644 --- a/rust/cocoindex/src/execution/row_indexer.rs +++ b/rust/cocoindex/src/execution/row_indexer.rs @@ -1,3 +1,4 @@ +use crate::execution::indexing_status::SourceLogicFingerprint; use crate::prelude::*; use base64::Engine; @@ -55,7 +56,7 @@ impl SourceVersion { pub fn from_stored( stored_ordinal: Option, stored_fp: &Option>, - curr_fp: &ExecutionPlanLogicFingerprint, + curr_fp: &SourceLogicFingerprint, ) -> Self { Self { ordinal: Ordinal(stored_ordinal), @@ -74,7 +75,7 @@ impl SourceVersion { pub fn from_stored_processing_info( info: &db_tracking::SourceTrackingInfoForProcessing, - curr_fp: &ExecutionPlanLogicFingerprint, + curr_fp: &SourceLogicFingerprint, ) -> Self { Self::from_stored( info.processed_source_ordinal, @@ -85,7 +86,7 @@ impl SourceVersion { pub fn from_stored_precommit_info( info: &db_tracking::SourceTrackingInfoForPrecommit, - curr_fp: &ExecutionPlanLogicFingerprint, + curr_fp: &SourceLogicFingerprint, ) -> Self { Self::from_stored( info.processed_source_ordinal, @@ -240,7 +241,7 @@ impl<'a> RowIndexer<'a> { Some(info) => { let existing_version = SourceVersion::from_stored_processing_info( info, - &self.src_eval_ctx.plan.logic_fingerprint, + &self.src_eval_ctx.source_logic_fp, ); // First check ordinal-based skipping @@ -486,7 +487,7 @@ impl<'a> RowIndexer<'a> { // Check 1: Same check as precommit - verify no newer version exists let existing_source_version = SourceVersion::from_stored_precommit_info( &existing_tracking_info, - &self.src_eval_ctx.plan.logic_fingerprint, + &self.src_eval_ctx.source_logic_fp, ); if existing_source_version.should_skip(source_version, Some(self.update_stats)) { return Ok(Some(SkippedOr::Skipped( @@ -537,7 +538,6 @@ impl<'a> RowIndexer<'a> { let db_setup = &self.setup_execution_ctx.setup_state.tracking_table; let export_ops = &self.src_eval_ctx.plan.export_ops; let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops; - let logic_fp = &self.src_eval_ctx.plan.logic_fingerprint; let mut txn = self.pool.begin().await?; @@ -551,8 +551,10 @@ impl<'a> RowIndexer<'a> { if self.mode == super::source_indexer::UpdateMode::Normal && let Some(tracking_info) = &tracking_info { - let existing_source_version = - SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp); + let existing_source_version = SourceVersion::from_stored_precommit_info( + &tracking_info, + &self.src_eval_ctx.source_logic_fp, + ); if existing_source_version.should_skip(source_version, Some(self.update_stats)) { return Ok(SkippedOr::Skipped( existing_source_version, @@ -834,7 +836,7 @@ impl<'a> RowIndexer<'a> { cleaned_staging_target_keys, source_version.ordinal.into(), source_fp, - &self.src_eval_ctx.plan.logic_fingerprint.current.0, + &self.src_eval_ctx.source_logic_fp.current.0, precommit_metadata.process_ordinal, self.process_time.timestamp_micros(), precommit_metadata.new_target_keys, diff --git a/rust/cocoindex/src/execution/source_indexer.rs b/rust/cocoindex/src/execution/source_indexer.rs index 5ea48e65..4fa11e00 100644 --- a/rust/cocoindex/src/execution/source_indexer.rs +++ b/rust/cocoindex/src/execution/source_indexer.rs @@ -1,4 +1,9 @@ -use crate::{execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*}; +use crate::{ + execution::{ + indexing_status::SourceLogicFingerprint, row_indexer::ContentHashBasedCollapsingBaseline, + }, + prelude::*, +}; use utils::batching; use futures::future::Ready; @@ -62,6 +67,7 @@ pub struct SourceIndexingContext { needs_to_track_rows_to_retry: bool, update_once_batcher: batching::Batcher, + source_logic_fp: SourceLogicFingerprint, } pub const NO_ACK: Option Ready>> = None; @@ -258,6 +264,12 @@ impl SourceIndexingContext { let mut rows = HashMap::new(); let mut rows_to_retry: Option> = None; let scan_generation = 0; + let source_logic_fp = SourceLogicFingerprint::new( + &plan, + source_idx, + &setup_execution_ctx.export_ops, + plan.legacy_fingerprint.clone(), + )?; { let mut key_metadata_stream = list_state.list( setup_execution_ctx.import_ops[source_idx].source_id, @@ -282,7 +294,7 @@ impl SourceIndexingContext { source_version: SourceVersion::from_stored( key_metadata.processed_source_ordinal, &key_metadata.process_logic_fingerprint, - &plan.logic_fingerprint, + &source_logic_fp, ), content_version_fp: key_metadata.processed_source_fp, }, @@ -307,6 +319,7 @@ impl SourceIndexingContext { UpdateOnceRunner, batching::BatchingOptions::default(), ), + source_logic_fp, })) } @@ -347,6 +360,7 @@ impl SourceIndexingContext { schema, key: &row_input.key, import_op_idx: self.source_idx, + source_logic_fp: &self.source_logic_fp, }; let process_time = chrono::Utc::now(); let operation_in_process_stats_cloned = operation_in_process_stats.clone(); diff --git a/rust/cocoindex/src/service/flows.rs b/rust/cocoindex/src/service/flows.rs index d901c500..8ffb4ed3 100644 --- a/rust/cocoindex/src/service/flows.rs +++ b/rust/cocoindex/src/service/flows.rs @@ -1,7 +1,8 @@ +use crate::execution::indexing_status::SourceLogicFingerprint; use crate::prelude::*; use crate::execution::{evaluator, indexing_status, memoization, row_indexer, stats}; -use crate::lib_context::LibContext; +use crate::lib_context::{FlowExecutionContext, LibContext}; use crate::service::query_handler::{QueryHandlerSpec, QueryInput, QueryOutput}; use crate::{base::schema::FlowSchema, ops::interface::SourceExecutorReadOptions}; use axum::{ @@ -150,10 +151,15 @@ struct SourceRowKeyContextHolder<'a> { schema: &'a FlowSchema, key: value::KeyValue, key_aux_info: serde_json::Value, + source_logic_fp: SourceLogicFingerprint, } impl<'a> SourceRowKeyContextHolder<'a> { - async fn create(flow_ctx: &'a FlowContext, source_row_key: SourceRowKeyParams) -> Result { + async fn create( + flow_ctx: &'a FlowContext, + execution_ctx: &FlowExecutionContext, + source_row_key: SourceRowKeyParams, + ) -> Result { let schema = &flow_ctx.flow.data_schema; let import_op_idx = flow_ctx .flow @@ -181,12 +187,19 @@ impl<'a> SourceRowKeyContextHolder<'a> { .map(|s| utils::deser::from_json_str(&s)) .transpose()? .unwrap_or_default(); + let source_logic_fp = SourceLogicFingerprint::new( + &plan, + import_op_idx, + &execution_ctx.setup_execution_context.export_ops, + plan.legacy_fingerprint.clone(), + )?; Ok(Self { plan, import_op_idx, schema, key, key_aux_info, + source_logic_fp, }) } @@ -197,6 +210,7 @@ impl<'a> SourceRowKeyContextHolder<'a> { schema: self.schema, key: &self.key, import_op_idx: self.import_op_idx, + source_logic_fp: &self.source_logic_fp, } } } @@ -207,8 +221,9 @@ pub async fn evaluate_data( State(lib_context): State>, ) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?; let execution_ctx = flow_ctx.use_execution_ctx().await?; + let source_row_key_ctx = + SourceRowKeyContextHolder::create(&flow_ctx, &execution_ctx, query).await?; let evaluate_output = row_indexer::evaluate_source_entry_with_memory( &source_row_key_ctx.as_context(), &source_row_key_ctx.key_aux_info, @@ -258,9 +273,9 @@ pub async fn get_row_indexing_status( State(lib_context): State>, ) -> Result, ApiError> { let flow_ctx = lib_context.get_flow_context(&flow_name)?; - let source_row_key_ctx = SourceRowKeyContextHolder::create(&flow_ctx, query).await?; - let execution_ctx = flow_ctx.use_execution_ctx().await?; + let source_row_key_ctx = + SourceRowKeyContextHolder::create(&flow_ctx, &execution_ctx, query).await?; let indexing_status = indexing_status::get_source_row_indexing_status( &source_row_key_ctx.as_context(), &source_row_key_ctx.key_aux_info,