Skip to content

Commit 557f255

Browse files
authored
feat(fine-grain-fp): use fine grain data lineage to detect logic changes (#1292)
* feat(fine-grain-fp): use fine grain data lineage to detect logic changes * fix: incorporate target schema id, and use tuple for encoding * minor consolidate
1 parent 5050026 commit 557f255

File tree

9 files changed

+348
-92
lines changed

9 files changed

+348
-92
lines changed

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 198 additions & 55 deletions
Large diffs are not rendered by default.

rust/cocoindex/src/builder/flow_builder.rs

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
use crate::{
2-
base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange,
2+
base::schema::EnrichedValueType, builder::plan::FieldDefFingerprint, prelude::*,
3+
py::Pythonized, setup::ObjectSetupChange,
34
};
45

6+
use cocoindex_utils::fingerprint::Fingerprinter;
57
use pyo3::{exceptions::PyException, prelude::*};
68
use pyo3_async_runtimes::tokio::future_into_py;
79
use std::{collections::btree_map, ops::Deref};
@@ -119,8 +121,8 @@ impl DataSlice {
119121
spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => {
120122
let data_scope_builder = self.scope.data.lock().unwrap();
121123
let struct_schema = {
122-
let (_, val_type) = data_scope_builder
123-
.analyze_field_path(field_path)
124+
let (_, val_type, _) = data_scope_builder
125+
.analyze_field_path(field_path, self.scope.base_value_def_fp.clone())
124126
.into_py_result()?;
125127
match &val_type.typ {
126128
ValueTypeBuilder::Struct(struct_type) => struct_type,
@@ -171,7 +173,8 @@ impl DataSlice {
171173
spec::ValueMapping::Constant(c) => c.schema.clone(),
172174
spec::ValueMapping::Field(v) => {
173175
let data_scope_builder = self.scope.data.lock().unwrap();
174-
let (_, val_type) = data_scope_builder.analyze_field_path(&v.field_path)?;
176+
let (_, val_type, _) = data_scope_builder
177+
.analyze_field_path(&v.field_path, self.scope.base_value_def_fp.clone())?;
175178
EnrichedValueType::from_alternative(val_type)?
176179
}
177180
};
@@ -257,6 +260,7 @@ impl FlowBuilder {
257260
spec::ROOT_SCOPE_NAME.to_string(),
258261
None,
259262
Arc::new(Mutex::new(DataScopeBuilder::new())),
263+
FieldDefFingerprint::default(),
260264
);
261265
let flow_inst_context = build_flow_instance_context(
262266
name,
@@ -366,7 +370,19 @@ impl FlowBuilder {
366370
{
367371
let mut root_data_scope = self.root_op_scope.data.lock().unwrap();
368372
root_data_scope
369-
.add_field(name.clone(), &value_type)
373+
.add_field(
374+
name.clone(),
375+
&value_type,
376+
FieldDefFingerprint {
377+
source_op_names: HashSet::from([name.clone()]),
378+
fingerprint: Fingerprinter::default()
379+
.with("input")
380+
.into_py_result()?
381+
.with(&name)
382+
.into_py_result()?
383+
.into_fingerprint(),
384+
},
385+
)
370386
.into_py_result()?;
371387
}
372388
let result = Self::last_field_to_data_slice(&self.root_op_scope).into_py_result()?;
@@ -545,11 +561,17 @@ impl FlowBuilder {
545561
auto_uuid_field,
546562
);
547563
{
564+
// TODO: Pass in the right field def fingerprint
548565
let mut collector = collector.collector.lock().unwrap();
549566
if let Some(collector) = collector.as_mut() {
550-
collector.merge_schema(&collector_schema).into_py_result()?;
567+
collector
568+
.collect(&collector_schema, FieldDefFingerprint::default())
569+
.into_py_result()?;
551570
} else {
552-
*collector = Some(CollectorBuilder::new(Arc::new(collector_schema)));
571+
*collector = Some(CollectorBuilder::new(
572+
Arc::new(collector_schema),
573+
FieldDefFingerprint::default(),
574+
));
553575
}
554576
}
555577

rust/cocoindex/src/builder/plan.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,24 @@ pub struct AnalyzedOpOutput {
5353
pub field_idx: u32,
5454
}
5555

56+
/// Tracks which affects value of the field, to detect changes of logic.
57+
#[derive(Debug, Clone)]
58+
pub struct FieldDefFingerprint {
59+
/// Name of sources that affect value of the field.
60+
pub source_op_names: HashSet<String>,
61+
/// Fingerprint of the logic that affects value of the field.
62+
pub fingerprint: Fingerprint,
63+
}
64+
65+
impl Default for FieldDefFingerprint {
66+
fn default() -> Self {
67+
Self {
68+
source_op_names: HashSet::new(),
69+
fingerprint: Fingerprinter::default().into_fingerprint(),
70+
}
71+
}
72+
}
73+
5674
pub struct AnalyzedImportOp {
5775
pub name: String,
5876
pub executor: Box<dyn SourceExecutor>,
@@ -121,6 +139,7 @@ pub struct AnalyzedExportOp {
121139
pub value_stable: bool,
122140
/// Fingerprinter of the output value.
123141
pub output_value_fingerprinter: Fingerprinter,
142+
pub def_fp: FieldDefFingerprint,
124143
}
125144

126145
pub struct AnalyzedExportTargetOpGroup {
@@ -141,20 +160,8 @@ pub struct AnalyzedOpScope {
141160
pub scope_qualifier: String,
142161
}
143162

144-
pub struct ExecutionPlanLogicFingerprint {
145-
pub current: Fingerprint,
146-
pub legacy: Fingerprint,
147-
}
148-
149-
impl ExecutionPlanLogicFingerprint {
150-
pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
151-
self.current.as_slice() == other.as_ref() || self.legacy.as_slice() == other.as_ref()
152-
}
153-
}
154-
155163
pub struct ExecutionPlan {
156-
pub logic_fingerprint: ExecutionPlanLogicFingerprint,
157-
164+
pub legacy_fingerprint: Vec<Fingerprint>,
158165
pub import_ops: Vec<AnalyzedImportOp>,
159166
pub op_scope: AnalyzedOpScope,
160167
pub export_ops: Vec<AnalyzedExportOp>,

rust/cocoindex/src/execution/dumper.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::execution::indexing_status::SourceLogicFingerprint;
12
use crate::prelude::*;
23

34
use futures::{StreamExt, future::try_join_all};
@@ -71,6 +72,7 @@ impl<'a> Dumper<'a> {
7172
import_op: &'a AnalyzedImportOp,
7273
key: &value::KeyValue,
7374
key_aux_info: &serde_json::Value,
75+
source_logic_fp: &SourceLogicFingerprint,
7476
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
7577
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
7678
where
@@ -83,6 +85,7 @@ impl<'a> Dumper<'a> {
8385
schema: self.schema,
8486
key,
8587
import_op_idx,
88+
source_logic_fp,
8689
},
8790
key_aux_info,
8891
self.setup_execution_ctx,
@@ -139,6 +142,12 @@ impl<'a> Dumper<'a> {
139142
key_aux_info: serde_json::Value,
140143
file_path: PathBuf,
141144
) -> Result<()> {
145+
let source_logic_fp = SourceLogicFingerprint::new(
146+
self.plan,
147+
import_op_idx,
148+
&self.setup_execution_ctx.export_ops,
149+
self.plan.legacy_fingerprint.clone(),
150+
)?;
142151
let _permit = import_op
143152
.concurrency_controller
144153
.acquire(concur_control::BYTES_UNKNOWN_YET)
@@ -150,6 +159,7 @@ impl<'a> Dumper<'a> {
150159
import_op,
151160
&key,
152161
&key_aux_info,
162+
&source_logic_fp,
153163
&mut collected_values_buffer,
154164
)
155165
.await

rust/cocoindex/src/execution/evaluator.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::execution::indexing_status::SourceLogicFingerprint;
12
use crate::prelude::*;
23

34
use anyhow::{Context, Ok};
@@ -634,6 +635,7 @@ pub struct SourceRowEvaluationContext<'a> {
634635
pub schema: &'a schema::FlowSchema,
635636
pub key: &'a value::KeyValue,
636637
pub import_op_idx: usize,
638+
pub source_logic_fp: &'a SourceLogicFingerprint,
637639
}
638640

639641
#[derive(Debug)]

rust/cocoindex/src/execution/indexing_status.rs

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,47 @@ use crate::prelude::*;
33
use super::db_tracking;
44
use super::evaluator;
55
use futures::try_join;
6+
use utils::fingerprint::{Fingerprint, Fingerprinter};
7+
8+
pub struct SourceLogicFingerprint {
9+
pub current: Fingerprint,
10+
pub legacy: Vec<Fingerprint>,
11+
}
12+
13+
impl SourceLogicFingerprint {
14+
pub fn new(
15+
exec_plan: &plan::ExecutionPlan,
16+
source_idx: usize,
17+
export_exec_ctx: &[exec_ctx::ExportOpExecutionContext],
18+
legacy: Vec<Fingerprint>,
19+
) -> Result<Self> {
20+
let import_op = &exec_plan.import_ops[source_idx];
21+
let mut fp = Fingerprinter::default();
22+
if exec_plan.import_ops.len() != export_exec_ctx.len() {
23+
bail!("Import op count does not match export op count");
24+
}
25+
for (export_op, export_op_exec_ctx) in
26+
std::iter::zip(exec_plan.export_ops.iter(), export_exec_ctx.iter())
27+
{
28+
if export_op.def_fp.source_op_names.contains(&import_op.name) {
29+
fp = fp.with(&(
30+
&export_op.def_fp.fingerprint,
31+
&export_op_exec_ctx.target_id,
32+
&export_op_exec_ctx.schema_version_id,
33+
))?;
34+
}
35+
}
36+
Ok(Self {
37+
current: fp.into_fingerprint(),
38+
legacy,
39+
})
40+
}
41+
42+
pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
43+
self.current.as_slice() == other.as_ref()
44+
|| self.legacy.iter().any(|fp| fp.as_slice() == other.as_ref())
45+
}
46+
}
647

748
#[derive(Debug, Serialize)]
849
pub struct SourceRowLastProcessedInfo {
@@ -54,7 +95,7 @@ pub async fn get_source_row_indexing_status(
5495
is_logic_current: l
5596
.process_logic_fingerprint
5697
.as_ref()
57-
.map_or(false, |fp| src_eval_ctx.plan.logic_fingerprint.matches(fp)),
98+
.map_or(false, |fp| src_eval_ctx.source_logic_fp.matches(fp)),
5899
});
59100
let current = SourceRowInfo {
60101
ordinal: current.ordinal,

rust/cocoindex/src/execution/row_indexer.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::execution::indexing_status::SourceLogicFingerprint;
12
use crate::prelude::*;
23

34
use base64::Engine;
@@ -55,7 +56,7 @@ impl SourceVersion {
5556
pub fn from_stored(
5657
stored_ordinal: Option<i64>,
5758
stored_fp: &Option<Vec<u8>>,
58-
curr_fp: &ExecutionPlanLogicFingerprint,
59+
curr_fp: &SourceLogicFingerprint,
5960
) -> Self {
6061
Self {
6162
ordinal: Ordinal(stored_ordinal),
@@ -74,7 +75,7 @@ impl SourceVersion {
7475

7576
pub fn from_stored_processing_info(
7677
info: &db_tracking::SourceTrackingInfoForProcessing,
77-
curr_fp: &ExecutionPlanLogicFingerprint,
78+
curr_fp: &SourceLogicFingerprint,
7879
) -> Self {
7980
Self::from_stored(
8081
info.processed_source_ordinal,
@@ -85,7 +86,7 @@ impl SourceVersion {
8586

8687
pub fn from_stored_precommit_info(
8788
info: &db_tracking::SourceTrackingInfoForPrecommit,
88-
curr_fp: &ExecutionPlanLogicFingerprint,
89+
curr_fp: &SourceLogicFingerprint,
8990
) -> Self {
9091
Self::from_stored(
9192
info.processed_source_ordinal,
@@ -240,7 +241,7 @@ impl<'a> RowIndexer<'a> {
240241
Some(info) => {
241242
let existing_version = SourceVersion::from_stored_processing_info(
242243
info,
243-
&self.src_eval_ctx.plan.logic_fingerprint,
244+
&self.src_eval_ctx.source_logic_fp,
244245
);
245246

246247
// First check ordinal-based skipping
@@ -486,7 +487,7 @@ impl<'a> RowIndexer<'a> {
486487
// Check 1: Same check as precommit - verify no newer version exists
487488
let existing_source_version = SourceVersion::from_stored_precommit_info(
488489
&existing_tracking_info,
489-
&self.src_eval_ctx.plan.logic_fingerprint,
490+
&self.src_eval_ctx.source_logic_fp,
490491
);
491492
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
492493
return Ok(Some(SkippedOr::Skipped(
@@ -537,7 +538,6 @@ impl<'a> RowIndexer<'a> {
537538
let db_setup = &self.setup_execution_ctx.setup_state.tracking_table;
538539
let export_ops = &self.src_eval_ctx.plan.export_ops;
539540
let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops;
540-
let logic_fp = &self.src_eval_ctx.plan.logic_fingerprint;
541541

542542
let mut txn = self.pool.begin().await?;
543543

@@ -551,8 +551,10 @@ impl<'a> RowIndexer<'a> {
551551
if self.mode == super::source_indexer::UpdateMode::Normal
552552
&& let Some(tracking_info) = &tracking_info
553553
{
554-
let existing_source_version =
555-
SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp);
554+
let existing_source_version = SourceVersion::from_stored_precommit_info(
555+
&tracking_info,
556+
&self.src_eval_ctx.source_logic_fp,
557+
);
556558
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
557559
return Ok(SkippedOr::Skipped(
558560
existing_source_version,
@@ -834,7 +836,7 @@ impl<'a> RowIndexer<'a> {
834836
cleaned_staging_target_keys,
835837
source_version.ordinal.into(),
836838
source_fp,
837-
&self.src_eval_ctx.plan.logic_fingerprint.current.0,
839+
&self.src_eval_ctx.source_logic_fp.current.0,
838840
precommit_metadata.process_ordinal,
839841
self.process_time.timestamp_micros(),
840842
precommit_metadata.new_target_keys,

rust/cocoindex/src/execution/source_indexer.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
use crate::{execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*};
1+
use crate::{
2+
execution::{
3+
indexing_status::SourceLogicFingerprint, row_indexer::ContentHashBasedCollapsingBaseline,
4+
},
5+
prelude::*,
6+
};
27
use utils::batching;
38

49
use futures::future::Ready;
@@ -62,6 +67,7 @@ pub struct SourceIndexingContext {
6267
needs_to_track_rows_to_retry: bool,
6368

6469
update_once_batcher: batching::Batcher<UpdateOnceRunner>,
70+
source_logic_fp: SourceLogicFingerprint,
6571
}
6672

6773
pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
@@ -258,6 +264,12 @@ impl SourceIndexingContext {
258264
let mut rows = HashMap::new();
259265
let mut rows_to_retry: Option<HashSet<value::KeyValue>> = None;
260266
let scan_generation = 0;
267+
let source_logic_fp = SourceLogicFingerprint::new(
268+
&plan,
269+
source_idx,
270+
&setup_execution_ctx.export_ops,
271+
plan.legacy_fingerprint.clone(),
272+
)?;
261273
{
262274
let mut key_metadata_stream = list_state.list(
263275
setup_execution_ctx.import_ops[source_idx].source_id,
@@ -282,7 +294,7 @@ impl SourceIndexingContext {
282294
source_version: SourceVersion::from_stored(
283295
key_metadata.processed_source_ordinal,
284296
&key_metadata.process_logic_fingerprint,
285-
&plan.logic_fingerprint,
297+
&source_logic_fp,
286298
),
287299
content_version_fp: key_metadata.processed_source_fp,
288300
},
@@ -307,6 +319,7 @@ impl SourceIndexingContext {
307319
UpdateOnceRunner,
308320
batching::BatchingOptions::default(),
309321
),
322+
source_logic_fp,
310323
}))
311324
}
312325

@@ -347,6 +360,7 @@ impl SourceIndexingContext {
347360
schema,
348361
key: &row_input.key,
349362
import_op_idx: self.source_idx,
363+
source_logic_fp: &self.source_logic_fp,
350364
};
351365
let process_time = chrono::Utc::now();
352366
let operation_in_process_stats_cloned = operation_in_process_stats.clone();

0 commit comments

Comments
 (0)