Skip to content

Commit 1f8c34d

Browse files
authored
feat: improve logic fp: avoid source changes and count fn behavior ver (#1182)
1 parent ee5519f commit 1f8c34d

File tree

6 files changed

+92
-14
lines changed

6 files changed

+92
-14
lines changed

src/builder/analyzer.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,10 +1141,37 @@ pub async fn analyze_flow(
11411141
declarations: declarations_analyzed_ss,
11421142
};
11431143

1144-
let logic_fingerprint = Fingerprinter::default()
1144+
let legacy_fingerprint = Fingerprinter::default()
11451145
.with(&flow_inst)?
11461146
.with(&flow_schema.schema)?
11471147
.into_fingerprint();
1148+
1149+
fn append_reactive_op_scope(
1150+
mut fingerprinter: Fingerprinter,
1151+
reactive_ops: &[NamedSpec<ReactiveOpSpec>],
1152+
) -> Result<Fingerprinter> {
1153+
fingerprinter = fingerprinter.with(&reactive_ops.len())?;
1154+
for reactive_op in reactive_ops.iter() {
1155+
fingerprinter = fingerprinter.with(&reactive_op.name)?;
1156+
match &reactive_op.spec {
1157+
ReactiveOpSpec::Transform(_) => {}
1158+
ReactiveOpSpec::ForEach(foreach_op) => {
1159+
fingerprinter = fingerprinter.with(&foreach_op.field_path)?;
1160+
fingerprinter =
1161+
append_reactive_op_scope(fingerprinter, &foreach_op.op_scope.ops)?;
1162+
}
1163+
ReactiveOpSpec::Collect(collect_op) => {
1164+
fingerprinter = fingerprinter.with(collect_op)?;
1165+
}
1166+
}
1167+
}
1168+
Ok(fingerprinter)
1169+
}
1170+
let current_fingerprinter =
1171+
append_reactive_op_scope(Fingerprinter::default(), &flow_inst.reactive_ops)?
1172+
.with(&flow_inst.export_ops)?
1173+
.with(&flow_inst.declarations)?
1174+
.with(&flow_schema.schema)?;
11481175
let plan_fut = async move {
11491176
let (import_ops, op_scope, export_ops) = try_join3(
11501177
try_join_all(import_ops_futs),
@@ -1153,8 +1180,40 @@ pub async fn analyze_flow(
11531180
)
11541181
.await?;
11551182

1183+
fn append_function_behavior(
1184+
mut fingerprinter: Fingerprinter,
1185+
reactive_ops: &[AnalyzedReactiveOp],
1186+
) -> Result<Fingerprinter> {
1187+
for reactive_op in reactive_ops.iter() {
1188+
match reactive_op {
1189+
AnalyzedReactiveOp::Transform(transform_op) => {
1190+
fingerprinter = fingerprinter.with(&transform_op.name)?.with(
1191+
&transform_op
1192+
.function_exec_info
1193+
.fingerprinter
1194+
.clone()
1195+
.into_fingerprint(),
1196+
)?;
1197+
}
1198+
AnalyzedReactiveOp::ForEach(foreach_op) => {
1199+
fingerprinter = append_function_behavior(
1200+
fingerprinter,
1201+
&foreach_op.op_scope.reactive_ops,
1202+
)?;
1203+
}
1204+
_ => {}
1205+
}
1206+
}
1207+
Ok(fingerprinter)
1208+
}
1209+
let current_fingerprint =
1210+
append_function_behavior(current_fingerprinter, &op_scope.reactive_ops)?
1211+
.into_fingerprint();
11561212
Ok(ExecutionPlan {
1157-
logic_fingerprint,
1213+
logic_fingerprint: ExecutionPlanLogicFingerprint {
1214+
current: current_fingerprint,
1215+
legacy: legacy_fingerprint,
1216+
},
11581217
import_ops,
11591218
op_scope,
11601219
export_ops,

src/builder/plan.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,19 @@ pub struct AnalyzedOpScope {
133133
pub scope_qualifier: String,
134134
}
135135

136+
pub struct ExecutionPlanLogicFingerprint {
137+
pub current: Fingerprint,
138+
pub legacy: Fingerprint,
139+
}
140+
141+
impl ExecutionPlanLogicFingerprint {
142+
pub fn matches(&self, other: impl AsRef<[u8]>) -> bool {
143+
self.current.as_slice() == other.as_ref() || self.legacy.as_slice() == other.as_ref()
144+
}
145+
}
146+
136147
pub struct ExecutionPlan {
137-
pub logic_fingerprint: Fingerprint,
148+
pub logic_fingerprint: ExecutionPlanLogicFingerprint,
138149

139150
pub import_ops: Vec<AnalyzedImportOp>,
140151
pub op_scope: AnalyzedOpScope,

src/execution/indexing_status.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,10 @@ pub async fn get_source_row_indexing_status(
5151
processing_time: l
5252
.process_time_micros
5353
.and_then(chrono::DateTime::<chrono::Utc>::from_timestamp_micros),
54-
is_logic_current: Some(src_eval_ctx.plan.logic_fingerprint.0.as_slice())
55-
== l.process_logic_fingerprint.as_deref(),
54+
is_logic_current: l
55+
.process_logic_fingerprint
56+
.as_ref()
57+
.map_or(false, |fp| src_eval_ctx.plan.logic_fingerprint.matches(fp)),
5658
});
5759
let current = SourceRowInfo {
5860
ordinal: current

src/execution/row_indexer.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,13 @@ impl SourceVersion {
5555
pub fn from_stored(
5656
stored_ordinal: Option<i64>,
5757
stored_fp: &Option<Vec<u8>>,
58-
curr_fp: Fingerprint,
58+
curr_fp: &ExecutionPlanLogicFingerprint,
5959
) -> Self {
6060
Self {
6161
ordinal: Ordinal(stored_ordinal),
6262
kind: match &stored_fp {
6363
Some(stored_fp) => {
64-
if stored_fp.as_slice() == curr_fp.0.as_slice() {
64+
if curr_fp.matches(stored_fp) {
6565
SourceVersionKind::CurrentLogic
6666
} else {
6767
SourceVersionKind::DifferentLogic
@@ -74,7 +74,7 @@ impl SourceVersion {
7474

7575
pub fn from_stored_processing_info(
7676
info: &db_tracking::SourceTrackingInfoForProcessing,
77-
curr_fp: Fingerprint,
77+
curr_fp: &ExecutionPlanLogicFingerprint,
7878
) -> Self {
7979
Self::from_stored(
8080
info.processed_source_ordinal,
@@ -85,7 +85,7 @@ impl SourceVersion {
8585

8686
pub fn from_stored_precommit_info(
8787
info: &db_tracking::SourceTrackingInfoForPrecommit,
88-
curr_fp: Fingerprint,
88+
curr_fp: &ExecutionPlanLogicFingerprint,
8989
) -> Self {
9090
Self::from_stored(
9191
info.processed_source_ordinal,
@@ -240,7 +240,7 @@ impl<'a> RowIndexer<'a> {
240240
Some(info) => {
241241
let existing_version = SourceVersion::from_stored_processing_info(
242242
info,
243-
self.src_eval_ctx.plan.logic_fingerprint,
243+
&self.src_eval_ctx.plan.logic_fingerprint,
244244
);
245245

246246
// First check ordinal-based skipping
@@ -486,7 +486,7 @@ impl<'a> RowIndexer<'a> {
486486
// Check 1: Same check as precommit - verify no newer version exists
487487
let existing_source_version = SourceVersion::from_stored_precommit_info(
488488
&existing_tracking_info,
489-
self.src_eval_ctx.plan.logic_fingerprint,
489+
&self.src_eval_ctx.plan.logic_fingerprint,
490490
);
491491
if existing_source_version.should_skip(source_version, Some(self.update_stats)) {
492492
return Ok(Some(SkippedOr::Skipped(
@@ -537,7 +537,7 @@ impl<'a> RowIndexer<'a> {
537537
let db_setup = &self.setup_execution_ctx.setup_state.tracking_table;
538538
let export_ops = &self.src_eval_ctx.plan.export_ops;
539539
let export_ops_exec_ctx = &self.setup_execution_ctx.export_ops;
540-
let logic_fp = self.src_eval_ctx.plan.logic_fingerprint;
540+
let logic_fp = &self.src_eval_ctx.plan.logic_fingerprint;
541541

542542
let mut txn = self.pool.begin().await?;
543543

@@ -834,7 +834,7 @@ impl<'a> RowIndexer<'a> {
834834
cleaned_staging_target_keys,
835835
source_version.ordinal.into(),
836836
source_fp,
837-
&self.src_eval_ctx.plan.logic_fingerprint.0,
837+
&self.src_eval_ctx.plan.logic_fingerprint.current.0,
838838
precommit_metadata.process_ordinal,
839839
self.process_time.timestamp_micros(),
840840
precommit_metadata.new_target_keys,

src/execution/source_indexer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ impl SourceIndexingContext {
284284
source_version: SourceVersion::from_stored(
285285
key_metadata.processed_source_ordinal,
286286
&key_metadata.process_logic_fingerprint,
287-
plan.logic_fingerprint,
287+
&plan.logic_fingerprint,
288288
),
289289
content_version_fp: key_metadata.processed_source_fp,
290290
},

src/utils/fingerprint.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ impl Fingerprint {
5757
}
5858
}
5959

60+
impl AsRef<[u8]> for Fingerprint {
61+
fn as_ref(&self) -> &[u8] {
62+
&self.0
63+
}
64+
}
65+
6066
impl Serialize for Fingerprint {
6167
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
6268
where

0 commit comments

Comments
 (0)