Skip to content

Commit bea12d5

Browse files
authored
Correctly calculate logic fp and improve msgs for stats. (#226)
1 parent 5fd2ee4 commit bea12d5

File tree

3 files changed

+90
-72
lines changed

3 files changed

+90
-72
lines changed

src/builder/analyzer.rs

Lines changed: 57 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,63 +1094,67 @@ pub fn analyze_flow(
10941094
// TODO: Fill it with a meaningful value.
10951095
targets: IndexMap::new(),
10961096
};
1097-
let plan_fut = {
1098-
let analyzer_ctx = AnalyzerContext { registry, flow_ctx };
1099-
let mut root_exec_scope = ExecutionScope {
1100-
name: ROOT_SCOPE_NAME,
1101-
data: &mut root_data_scope,
1102-
};
1103-
let source_ops_futs = flow_inst
1104-
.source_ops
1105-
.iter()
1106-
.map(|source_op| {
1107-
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
1108-
analyzer_ctx.analyze_source_op(
1109-
root_exec_scope.data,
1110-
source_op.clone(),
1111-
Some(&mut setup_state.metadata),
1112-
existing_source_states,
1113-
)
1114-
})
1115-
.collect::<Result<Vec<_>>>()?;
1116-
let op_scope_fut = analyzer_ctx.analyze_op_scope(
1117-
&mut root_exec_scope,
1118-
&flow_inst.reactive_ops,
1119-
RefList::Nil,
1120-
)?;
1121-
let export_ops_futs = flow_inst
1122-
.export_ops
1123-
.iter()
1124-
.map(|export_op| {
1125-
analyzer_ctx.analyze_export_op(
1126-
root_exec_scope.data,
1127-
export_op.clone(),
1128-
Some(&mut setup_state),
1129-
&target_states_by_name_type,
1130-
)
1131-
})
1132-
.collect::<Result<Vec<_>>>()?;
11331097

1134-
let tracking_table_setup = setup_state.tracking_table.clone();
1135-
async move {
1136-
let (source_ops, op_scope, export_ops) = try_join3(
1137-
try_join_all(source_ops_futs),
1138-
op_scope_fut,
1139-
try_join_all(export_ops_futs),
1098+
let analyzer_ctx = AnalyzerContext { registry, flow_ctx };
1099+
let mut root_exec_scope = ExecutionScope {
1100+
name: ROOT_SCOPE_NAME,
1101+
data: &mut root_data_scope,
1102+
};
1103+
let source_ops_futs = flow_inst
1104+
.source_ops
1105+
.iter()
1106+
.map(|source_op| {
1107+
let existing_source_states = source_states_by_name.get(source_op.name.as_str());
1108+
analyzer_ctx.analyze_source_op(
1109+
root_exec_scope.data,
1110+
source_op.clone(),
1111+
Some(&mut setup_state.metadata),
1112+
existing_source_states,
11401113
)
1141-
.await?;
1142-
1143-
Ok(ExecutionPlan {
1144-
tracking_table_setup,
1145-
logic_fingerprint: vec![0; 8], // TODO: Fill it with a meaningful value automatically
1146-
source_ops,
1147-
op_scope,
1148-
export_ops,
1149-
})
1150-
}
1114+
})
1115+
.collect::<Result<Vec<_>>>()?;
1116+
let op_scope_fut = analyzer_ctx.analyze_op_scope(
1117+
&mut root_exec_scope,
1118+
&flow_inst.reactive_ops,
1119+
RefList::Nil,
1120+
)?;
1121+
let export_ops_futs = flow_inst
1122+
.export_ops
1123+
.iter()
1124+
.map(|export_op| {
1125+
analyzer_ctx.analyze_export_op(
1126+
root_exec_scope.data,
1127+
export_op.clone(),
1128+
Some(&mut setup_state),
1129+
&target_states_by_name_type,
1130+
)
1131+
})
1132+
.collect::<Result<Vec<_>>>()?;
1133+
1134+
let tracking_table_setup = setup_state.tracking_table.clone();
1135+
let data_schema = root_data_scope.into_data_schema()?;
1136+
let logic_fingerprint = Fingerprinter::default()
1137+
.with(&flow_inst)?
1138+
.with(&data_schema)?
1139+
.into_fingerprint();
1140+
let plan_fut = async move {
1141+
let (source_ops, op_scope, export_ops) = try_join3(
1142+
try_join_all(source_ops_futs),
1143+
op_scope_fut,
1144+
try_join_all(export_ops_futs),
1145+
)
1146+
.await?;
1147+
1148+
Ok(ExecutionPlan {
1149+
tracking_table_setup,
1150+
logic_fingerprint,
1151+
source_ops,
1152+
op_scope,
1153+
export_ops,
1154+
})
11511155
};
11521156

1153-
Ok((root_data_scope.into_data_schema()?, plan_fut, setup_state))
1157+
Ok((data_schema, plan_fut, setup_state))
11541158
}
11551159

11561160
pub fn analyze_transient_flow<'a>(

src/builder/plan.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::base::schema::ValueType;
66
use crate::base::value;
77
use crate::execution::db_tracking_setup;
88
use crate::ops::interface::*;
9-
use crate::utils::fingerprint::Fingerprinter;
9+
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
1010

1111
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1212
pub struct AnalyzedLocalFieldReference {
@@ -126,7 +126,7 @@ pub struct AnalyzedOpScope {
126126

127127
pub struct ExecutionPlan {
128128
pub tracking_table_setup: db_tracking_setup::TrackingTableSetupState,
129-
pub logic_fingerprint: Vec<u8>,
129+
pub logic_fingerprint: Fingerprint,
130130

131131
pub source_ops: Vec<AnalyzedSourceOp>,
132132
pub op_scope: AnalyzedOpScope,

src/execution/indexer.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,28 +22,39 @@ use super::evaluator::{evaluate_source_entry, ScopeValueBuilder};
2222

2323
#[derive(Debug, Serialize, Default)]
2424
pub struct UpdateStats {
25+
pub num_skipped: AtomicUsize,
2526
pub num_insertions: AtomicUsize,
2627
pub num_deletions: AtomicUsize,
27-
pub num_already_exists: AtomicUsize,
28+
pub num_repreocesses: AtomicUsize,
2829
pub num_errors: AtomicUsize,
2930
}
3031

3132
impl std::fmt::Display for UpdateStats {
3233
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
33-
let num_source_rows = self.num_insertions.load(Relaxed)
34-
+ self.num_deletions.load(Relaxed)
35-
+ self.num_already_exists.load(Relaxed);
36-
write!(f, "{num_source_rows} source rows processed",)?;
37-
if self.num_errors.load(Relaxed) > 0 {
38-
write!(f, " with {} ERRORS", self.num_errors.load(Relaxed))?;
34+
let num_skipped = self.num_skipped.load(Relaxed);
35+
if num_skipped > 0 {
36+
write!(f, "{} rows skipped", num_skipped)?;
37+
}
38+
39+
let num_insertions = self.num_insertions.load(Relaxed);
40+
let num_deletions = self.num_deletions.load(Relaxed);
41+
let num_reprocesses = self.num_repreocesses.load(Relaxed);
42+
let num_source_rows = num_insertions + num_deletions + num_reprocesses;
43+
if num_source_rows > 0 {
44+
if num_skipped > 0 {
45+
write!(f, ", ")?;
46+
}
47+
write!(f, "{num_source_rows} source rows processed",)?;
48+
49+
let num_errors = self.num_errors.load(Relaxed);
50+
if num_errors > 0 {
51+
write!(f, " with {num_errors} ERRORS",)?;
52+
}
53+
write!(
54+
f,
55+
": {num_insertions} added, {num_deletions} removed, {num_reprocesses} repocessed",
56+
)?;
3957
}
40-
write!(
41-
f,
42-
": {} added, {} removed, {} already exists",
43-
self.num_insertions.load(Relaxed),
44-
self.num_deletions.load(Relaxed),
45-
self.num_already_exists.load(Relaxed)
46-
)?;
4758
Ok(())
4859
}
4960
}
@@ -495,8 +506,11 @@ pub async fn update_source_entry(
495506
(Some(source_ordinal), Some(existing_source_ordinal)) => {
496507
if source_ordinal < existing_source_ordinal
497508
|| (source_ordinal == existing_source_ordinal
498-
&& existing_logic_fingerprint == source_op.)
509+
&& existing_logic_fingerprint.as_ref().map(|v| v.as_slice())
510+
== Some(plan.logic_fingerprint.0.as_slice()))
499511
{
512+
// TODO: We should detect based on finer grain fingerprint.
513+
stats.num_skipped.fetch_add(1, Relaxed);
500514
return Ok(());
501515
}
502516
}
@@ -538,7 +552,7 @@ pub async fn update_source_entry(
538552
};
539553
if already_exists {
540554
if output.is_some() {
541-
stats.num_already_exists.fetch_add(1, Relaxed);
555+
stats.num_repreocesses.fetch_add(1, Relaxed);
542556
} else {
543557
stats.num_deletions.fetch_add(1, Relaxed);
544558
}
@@ -590,7 +604,7 @@ pub async fn update_source_entry(
590604
source_op.source_id,
591605
&source_key_json,
592606
source_ordinal.map(|o| o.into()),
593-
&plan.logic_fingerprint,
607+
&plan.logic_fingerprint.0,
594608
precommit_output.metadata,
595609
&process_timestamp,
596610
&plan.tracking_table_setup,

0 commit comments

Comments
 (0)