Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions src/builder/analyzer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,10 +1141,37 @@ pub async fn analyze_flow(
declarations: declarations_analyzed_ss,
};

let logic_fingerprint = Fingerprinter::default()
let legacy_fingerprint = Fingerprinter::default()
.with(&flow_inst)?
.with(&flow_schema.schema)?
.into_fingerprint();

fn append_reactive_op_scope(
mut fingerprinter: Fingerprinter,
reactive_ops: &[NamedSpec<ReactiveOpSpec>],
) -> Result<Fingerprinter> {
fingerprinter = fingerprinter.with(&reactive_ops.len())?;
for reactive_op in reactive_ops.iter() {
fingerprinter = fingerprinter.with(&reactive_op.name)?;
match &reactive_op.spec {
ReactiveOpSpec::Transform(_) => {}
ReactiveOpSpec::ForEach(foreach_op) => {
fingerprinter = fingerprinter.with(&foreach_op.field_path)?;
fingerprinter =
append_reactive_op_scope(fingerprinter, &foreach_op.op_scope.ops)?;
}
ReactiveOpSpec::Collect(collect_op) => {
fingerprinter = fingerprinter.with(collect_op)?;
}
}
}
Ok(fingerprinter)
}
let current_fingerprinter =
append_reactive_op_scope(Fingerprinter::default(), &flow_inst.reactive_ops)?
.with(&flow_inst.export_ops)?
.with(&flow_inst.declarations)?
.with(&flow_schema.schema)?;
let plan_fut = async move {
let (import_ops, op_scope, export_ops) = try_join3(
try_join_all(import_ops_futs),
Expand All @@ -1153,8 +1180,40 @@ pub async fn analyze_flow(
)
.await?;

fn append_function_behavior(
mut fingerprinter: Fingerprinter,
reactive_ops: &[AnalyzedReactiveOp],
) -> Result<Fingerprinter> {
for reactive_op in reactive_ops.iter() {
match reactive_op {
AnalyzedReactiveOp::Transform(transform_op) => {
fingerprinter = fingerprinter.with(&transform_op.name)?.with(
&transform_op
.function_exec_info
.fingerprinter
.clone()
.into_fingerprint(),
)?;
}
AnalyzedReactiveOp::ForEach(foreach_op) => {
fingerprinter = append_function_behavior(
fingerprinter,
&foreach_op.op_scope.reactive_ops,
)?;
}
_ => {}
}
}
Ok(fingerprinter)
}
let current_fingerprint =
append_function_behavior(current_fingerprinter, &op_scope.reactive_ops)?
.into_fingerprint();
Ok(ExecutionPlan {
logic_fingerprint,
logic_fingerprint: ExecutionPlanLogicFingerprint {
current: current_fingerprint,
legacy: legacy_fingerprint,
},
import_ops,
op_scope,
export_ops,
Expand Down
13 changes: 12 additions & 1 deletion src/builder/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,19 @@ pub struct AnalyzedOpScope {
pub scope_qualifier: String,
}

pub struct ExecutionPlanLogicFingerprint {
pub current: Fingerprint,
pub legacy: Fingerprint,
}

impl ExecutionPlanLogicFingerprint {
pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
self.current.as_slice() == other.as_ref() || self.legacy.as_slice() == other.as_ref()
}
}

pub struct ExecutionPlan {
pub logic_fingerprint: Fingerprint,
pub logic_fingerprint: ExecutionPlanLogicFingerprint,

pub import_ops: Vec<AnalyzedImportOp>,
pub op_scope: AnalyzedOpScope,
Expand Down
6 changes: 4 additions & 2 deletions src/execution/indexing_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ pub async fn get_source_row_indexing_status(
processing_time: l
.process_time_micros
.and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros),
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
== l.process_logic_fingerprint.as_deref(),
is_logic_current: l
.process_logic_fingerprint
.as_ref()
.map_or(false, |fp| src_eval_ctx.plan.logic_fingerprint.matches(fp)),
});
let current = SourceRowInfo {
ordinal: current
Expand Down
16 changes: 8 additions & 8 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ impl SourceVersion {
pub fn from_stored(
stored_ordinal: Option<i64>,
stored_fp: &Option<Vec<u8>>,
curr_fp: Fingerprint,
curr_fp: &ExecutionPlanLogicFingerprint,
) -> Self {
Self {
ordinal: Ordinal(stored_ordinal),
kind: match &stored_fp {
Some(stored_fp) => {
if stored_fp.as_slice() == curr_fp.0.as_slice() {
if curr_fp.matches(stored_fp) {
SourceVersionKind::CurrentLogic
} else {
SourceVersionKind::DifferentLogic
Expand All @@ -74,7 +74,7 @@ impl SourceVersion {

pub fn from_stored_processing_info(
info: &db_tracking::SourceTrackingInfoForProcessing,
curr_fp: Fingerprint,
curr_fp: &ExecutionPlanLogicFingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
Expand All @@ -85,7 +85,7 @@ impl SourceVersion {

pub fn from_stored_precommit_info(
info: &db_tracking::SourceTrackingInfoForPrecommit,
curr_fp: Fingerprint,
curr_fp: &ExecutionPlanLogicFingerprint,
) -> Self {
Self::from_stored(
info.processed_source_ordinal,
Expand Down Expand Up @@ -240,7 +240,7 @@ impl<'a> RowIndexer<'a> {
Some(info) => {
let existing_version = SourceVersion::from_stored_processing_info(
info,
self.src_eval_ctx.plan.logic_fingerprint,
&self.src_eval_ctx.plan.logic_fingerprint,
);

// First check ordinal-based skipping
Expand Down Expand Up @@ -486,7 +486,7 @@ impl<'a> RowIndexer<'a> {
// Check 1: Same check as precommit - verify no newer version exists
let existing_source_version = SourceVersion::from_stored_precommit_info(
&existing_tracking_info,
self.src_eval_ctx.plan.logic_fingerprint,
&self.src_eval_ctx.plan.logic_fingerprint,
);
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
return Ok(Some(SkippedOr::Skipped(
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<'a> RowIndexer<'a> {
let db_setup = &self.setup_execution_ctx.setup_state.tracking_table;
let export_ops = &self.src_eval_ctx.plan.export_ops;
let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops;
let logic_fp = self.src_eval_ctx.plan.logic_fingerprint;
let logic_fp = &self.src_eval_ctx.plan.logic_fingerprint;

let mut txn = self.pool.begin().await?;

Expand Down Expand Up @@ -834,7 +834,7 @@ impl<'a> RowIndexer<'a> {
cleaned_staging_target_keys,
source_version.ordinal.into(),
source_fp,
&self.src_eval_ctx.plan.logic_fingerprint.0,
&self.src_eval_ctx.plan.logic_fingerprint.current.0,
precommit_metadata.process_ordinal,
self.process_time.timestamp_micros(),
precommit_metadata.new_target_keys,
Expand Down
2 changes: 1 addition & 1 deletion src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl SourceIndexingContext {
source_version: SourceVersion::from_stored(
key_metadata.processed_source_ordinal,
&key_metadata.process_logic_fingerprint,
plan.logic_fingerprint,
&plan.logic_fingerprint,
),
content_version_fp: key_metadata.processed_source_fp,
},
Expand Down
6 changes: 6 additions & 0 deletions src/utils/fingerprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ impl Fingerprint {
}
}

impl AsRef<[u8]> for Fingerprint {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl Serialize for Fingerprint {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down