Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 199 additions & 55 deletions rust/cocoindex/src/builder/analyzer.rs

Large diffs are not rendered by default.

36 changes: 29 additions & 7 deletions rust/cocoindex/src/builder/flow_builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::{
base::schema::EnrichedValueType, prelude::*, py::Pythonized, setup::ObjectSetupChange,
base::schema::EnrichedValueType, builder::plan::FieldDefFingerprint, prelude::*,
py::Pythonized, setup::ObjectSetupChange,
};

use cocoindex_utils::fingerprint::Fingerprinter;
use pyo3::{exceptions::PyException, prelude::*};
use pyo3_async_runtimes::tokio::future_into_py;
use std::{collections::btree_map, ops::Deref};
Expand Down Expand Up @@ -119,8 +121,8 @@ impl DataSlice {
spec::ValueMapping::Field(spec::FieldMapping { scope, field_path }) => {
let data_scope_builder = self.scope.data.lock().unwrap();
let struct_schema = {
let (_, val_type) = data_scope_builder
.analyze_field_path(field_path)
let (_, val_type, _) = data_scope_builder
.analyze_field_path(field_path, self.scope.base_value_def_fp.clone())
.into_py_result()?;
match &val_type.typ {
ValueTypeBuilder::Struct(struct_type) => struct_type,
Expand Down Expand Up @@ -171,7 +173,8 @@ impl DataSlice {
spec::ValueMapping::Constant(c) => c.schema.clone(),
spec::ValueMapping::Field(v) => {
let data_scope_builder = self.scope.data.lock().unwrap();
let (_, val_type) = data_scope_builder.analyze_field_path(&v.field_path)?;
let (_, val_type, _) = data_scope_builder
.analyze_field_path(&v.field_path, self.scope.base_value_def_fp.clone())?;
EnrichedValueType::from_alternative(val_type)?
}
};
Expand Down Expand Up @@ -257,6 +260,7 @@ impl FlowBuilder {
spec::ROOT_SCOPE_NAME.to_string(),
None,
Arc::new(Mutex::new(DataScopeBuilder::new())),
FieldDefFingerprint::default(),
);
let flow_inst_context = build_flow_instance_context(
name,
Expand Down Expand Up @@ -366,7 +370,19 @@ impl FlowBuilder {
{
let mut root_data_scope = self.root_op_scope.data.lock().unwrap();
root_data_scope
.add_field(name.clone(), &value_type)
.add_field(
name.clone(),
&value_type,
FieldDefFingerprint {
source_op_names: HashSet::from([name.clone()]),
fingerprint: Fingerprinter::default()
.with("input")
.into_py_result()?
.with(&name)
.into_py_result()?
.into_fingerprint(),
},
)
.into_py_result()?;
}
let result = Self::last_field_to_data_slice(&self.root_op_scope).into_py_result()?;
Expand Down Expand Up @@ -545,11 +561,17 @@ impl FlowBuilder {
auto_uuid_field,
);
{
// TODO: Pass in the right field def fingerprint
let mut collector = collector.collector.lock().unwrap();
if let Some(collector) = collector.as_mut() {
collector.merge_schema(&collector_schema).into_py_result()?;
collector
.collect(&collector_schema, FieldDefFingerprint::default())
.into_py_result()?;
} else {
*collector = Some(CollectorBuilder::new(Arc::new(collector_schema)));
*collector = Some(CollectorBuilder::new(
Arc::new(collector_schema),
FieldDefFingerprint::default(),
));
}
}

Expand Down
30 changes: 17 additions & 13 deletions rust/cocoindex/src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,21 @@ pub struct AnalyzedOpOutput {
pub field_idx: u32,
}

#[derive(Debug, Clone)]
pub struct FieldDefFingerprint {
pub source_op_names: HashSet<String>,
pub fingerprint: Fingerprint,
}

impl Default for FieldDefFingerprint {
fn default() -> Self {
Self {
source_op_names: HashSet::new(),
fingerprint: Fingerprinter::default().into_fingerprint(),
}
}
}

pub struct AnalyzedImportOp {
pub name: String,
pub executor: Box<dyn SourceExecutor>,
Expand Down Expand Up @@ -121,6 +136,7 @@ pub struct AnalyzedExportOp {
pub value_stable: bool,
/// Fingerprinter of the output value.
pub output_value_fingerprinter: Fingerprinter,
pub def_fp: FieldDefFingerprint,
}

pub struct AnalyzedExportTargetOpGroup {
Expand All @@ -141,20 +157,8 @@ pub struct AnalyzedOpScope {
pub scope_qualifier: String,
}

pub struct ExecutionPlanLogicFingerprint {
pub current: Fingerprint,
pub legacy: Fingerprint,
}

impl ExecutionPlanLogicFingerprint {
pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
self.current.as_slice() == other.as_ref() || self.legacy.as_slice() == other.as_ref()
}
}

pub struct ExecutionPlan {
pub logic_fingerprint: ExecutionPlanLogicFingerprint,

pub legacy_fingerprint: Vec<Fingerprint>,
pub import_ops: Vec<AnalyzedImportOp>,
pub op_scope: AnalyzedOpScope,
pub export_ops: Vec<AnalyzedExportOp>,
Expand Down
10 changes: 10 additions & 0 deletions rust/cocoindex/src/execution/dumper.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::execution::indexing_status::SourceLogicFingerprint;
use crate::prelude::*;

use futures::{StreamExt, future::try_join_all};
Expand Down Expand Up @@ -71,6 +72,7 @@ impl<'a> Dumper<'a> {
import_op: &'a AnalyzedImportOp,
key: &value::KeyValue,
key_aux_info: &serde_json::Value,
source_logic_fp: &SourceLogicFingerprint,
collected_values_buffer: &'b mut Vec<Vec<value::FieldValues>>,
) -> Result<Option<IndexMap<&'b str, TargetExportData<'b>>>>
where
Expand All @@ -83,6 +85,7 @@ impl<'a> Dumper<'a> {
schema: self.schema,
key,
import_op_idx,
source_logic_fp,
},
key_aux_info,
self.setup_execution_ctx,
Expand Down Expand Up @@ -139,6 +142,12 @@ impl<'a> Dumper<'a> {
key_aux_info: serde_json::Value,
file_path: PathBuf,
) -> Result<()> {
let source_logic_fp = SourceLogicFingerprint::new(
self.plan,
import_op_idx,
&self.setup_execution_ctx.export_ops,
self.plan.legacy_fingerprint.clone(),
)?;
let _permit = import_op
.concurrency_controller
.acquire(concur_control::BYTES_UNKNOWN_YET)
Expand All @@ -150,6 +159,7 @@ impl<'a> Dumper<'a> {
import_op,
&key,
&key_aux_info,
&source_logic_fp,
&mut collected_values_buffer,
)
.await
Expand Down
2 changes: 2 additions & 0 deletions rust/cocoindex/src/execution/evaluator.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::execution::indexing_status::SourceLogicFingerprint;
use crate::prelude::*;

use anyhow::{Context, Ok};
Expand Down Expand Up @@ -634,6 +635,7 @@ pub struct SourceRowEvaluationContext<'a> {
pub schema: &'a schema::FlowSchema,
pub key: &'a value::KeyValue,
pub import_op_idx: usize,
pub source_logic_fp: &'a SourceLogicFingerprint,
}

#[derive(Debug)]
Expand Down
37 changes: 36 additions & 1 deletion rust/cocoindex/src/execution/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,41 @@ use crate::prelude::*;
use super::db_tracking;
use super::evaluator;
use futures::try_join;
use utils::fingerprint::{Fingerprint, Fingerprinter};

pub struct SourceLogicFingerprint {
pub current: Fingerprint,
pub legacy: Vec<Fingerprint>,
}

impl SourceLogicFingerprint {
pub fn new(
exec_plan: &plan::ExecutionPlan,
source_idx: usize,
export_exec_ctx: &[exec_ctx::ExportOpExecutionContext],
legacy: Vec<Fingerprint>,
) -> Result<Self> {
let import_op = &exec_plan.import_ops[source_idx];
let mut fp = Fingerprinter::default();
for (export_op, export_op_exec_ctx) in
std::iter::zip(exec_plan.export_ops.iter(), export_exec_ctx.iter())
{
if export_op.def_fp.source_op_names.contains(&import_op.name) {
fp = fp.with(&export_op_exec_ctx.target_id)?;
fp = fp.with(&export_op.def_fp.fingerprint)?;
}
}
Ok(Self {
current: fp.into_fingerprint(),
legacy,
})
}

pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
self.current.as_slice() == other.as_ref()
|| self.legacy.iter().any(|fp| fp.as_slice() == other.as_ref())
}
}

#[derive(Debug, Serialize)]
pub struct SourceRowLastProcessedInfo {
Expand Down Expand Up @@ -54,7 +89,7 @@ pub async fn get_source_row_indexing_status(
is_logic_current: l
.process_logic_fingerprint
.as_ref()
.map_or(false, |fp| src_eval_ctx.plan.logic_fingerprint.matches(fp)),
.map_or(false, |fp| src_eval_ctx.source_logic_fp.matches(fp)),
});
let current = SourceRowInfo {
ordinal: current.ordinal,
Expand Down
20 changes: 11 additions & 9 deletions rust/cocoindex/src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::execution::indexing_status::SourceLogicFingerprint;
use crate::prelude::*;

use base64::Engine;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl SourceVersion {
pub fn from_stored(
stored_ordinal: Option<i64>,
stored_fp: &Option<Vec<u8>>,
curr_fp: &ExecutionPlanLogicFingerprint,
curr_fp: &SourceLogicFingerprint,
) -> Self {
Self {
ordinal: Ordinal(stored_ordinal),
Expand All @@ -74,7 +75,7 @@ impl SourceVersion {

pub fn from_stored_processing_info(
info: &db_tracking::SourceTrackingInfoForProcessing,
curr_fp: &ExecutionPlanLogicFingerprint,
curr_fp: &SourceLogicFingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
Expand All @@ -85,7 +86,7 @@ impl SourceVersion {

pub fn from_stored_precommit_info(
info: &db_tracking::SourceTrackingInfoForPrecommit,
curr_fp: &ExecutionPlanLogicFingerprint,
curr_fp: &SourceLogicFingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
Expand Down Expand Up @@ -240,7 +241,7 @@ impl<'a> RowIndexer<'a> {
Some(info) => {
let existing_version = SourceVersion::from_stored_processing_info(
info,
&self.src_eval_ctx.plan.logic_fingerprint,
&self.src_eval_ctx.source_logic_fp,
);

// First check ordinal-based skipping
Expand Down Expand Up @@ -486,7 +487,7 @@ impl<'a> RowIndexer<'a> {
// Check 1: Same check as precommit - verify no newer version exists
let existing_source_version = SourceVersion::from_stored_precommit_info(
&existing_tracking_info,
&self.src_eval_ctx.plan.logic_fingerprint,
&self.src_eval_ctx.source_logic_fp,
);
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
return Ok(Some(SkippedOr::Skipped(
Expand Down Expand Up @@ -537,7 +538,6 @@ impl<'a> RowIndexer<'a> {
let db_setup = &self.setup_execution_ctx.setup_state.tracking_table;
let export_ops = &self.src_eval_ctx.plan.export_ops;
let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops;
let logic_fp = &self.src_eval_ctx.plan.logic_fingerprint;

let mut txn = self.pool.begin().await?;

Expand All @@ -551,8 +551,10 @@ impl<'a> RowIndexer<'a> {
if self.mode == super::source_indexer::UpdateMode::Normal
&& let Some(tracking_info) = &tracking_info
{
let existing_source_version =
SourceVersion::from_stored_precommit_info(&tracking_info, logic_fp);
let existing_source_version = SourceVersion::from_stored_precommit_info(
&tracking_info,
&self.src_eval_ctx.source_logic_fp,
);
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
return Ok(SkippedOr::Skipped(
existing_source_version,
Expand Down Expand Up @@ -834,7 +836,7 @@ impl<'a> RowIndexer<'a> {
cleaned_staging_target_keys,
source_version.ordinal.into(),
source_fp,
&self.src_eval_ctx.plan.logic_fingerprint.current.0,
&self.src_eval_ctx.source_logic_fp.current.0,
precommit_metadata.process_ordinal,
self.process_time.timestamp_micros(),
precommit_metadata.new_target_keys,
Expand Down
18 changes: 16 additions & 2 deletions rust/cocoindex/src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{execution::row_indexer::ContentHashBasedCollapsingBaseline, prelude::*};
use crate::{
execution::{
indexing_status::SourceLogicFingerprint, row_indexer::ContentHashBasedCollapsingBaseline,
},
prelude::*,
};
use utils::batching;

use futures::future::Ready;
Expand Down Expand Up @@ -62,6 +67,7 @@ pub struct SourceIndexingContext {
needs_to_track_rows_to_retry: bool,

update_once_batcher: batching::Batcher<UpdateOnceRunner>,
source_logic_fp: SourceLogicFingerprint,
}

pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
Expand Down Expand Up @@ -258,6 +264,12 @@ impl SourceIndexingContext {
let mut rows = HashMap::new();
let mut rows_to_retry: Option<HashSet<value::KeyValue>> = None;
let scan_generation = 0;
let source_logic_fp = SourceLogicFingerprint::new(
&plan,
source_idx,
&setup_execution_ctx.export_ops,
plan.legacy_fingerprint.clone(),
)?;
{
let mut key_metadata_stream = list_state.list(
setup_execution_ctx.import_ops[source_idx].source_id,
Expand All @@ -282,7 +294,7 @@ impl SourceIndexingContext {
source_version: SourceVersion::from_stored(
key_metadata.processed_source_ordinal,
&key_metadata.process_logic_fingerprint,
&plan.logic_fingerprint,
&source_logic_fp,
),
content_version_fp: key_metadata.processed_source_fp,
},
Expand All @@ -307,6 +319,7 @@ impl SourceIndexingContext {
UpdateOnceRunner,
batching::BatchingOptions::default(),
),
source_logic_fp,
}))
}

Expand Down Expand Up @@ -347,6 +360,7 @@ impl SourceIndexingContext {
schema,
key: &row_input.key,
import_op_idx: self.source_idx,
source_logic_fp: &self.source_logic_fp,
};
let process_time = chrono::Utc::now();
let operation_in_process_stats_cloned = operation_in_process_stats.clone();
Expand Down
Loading
Loading