Skip to content

Commit fb84432

Browse files
committed
skip reprocessing of entire rows based on finegrained lineage information
1 parent 5db5971 commit fb84432

File tree

6 files changed

+140
-29
lines changed

6 files changed

+140
-29
lines changed

src/builder/analyzer.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,17 @@ pub async fn analyze_flow(
11201120
.with(&flow_inst)?
11211121
.with(&flow_schema.schema)?
11221122
.into_fingerprint();
1123+
1124+
// Build a lineage fingerprint that focuses on export-level field lineage.
1125+
// It include export op identity, target kind, primary key schema and the
1126+
// output value fingerprinter (which is derived from the output value schema)
1127+
// so that benign changes (like exclude_pattern on sources or intermediate
1128+
// field ordering) that don't affect exported field lineage will not change
1129+
// this fingerprint.
1130+
let mut lineage_fp = Fingerprinter::default();
1131+
lineage_fp = lineage_fp.with(&flow_inst.export_ops)?;
1132+
lineage_fp = lineage_fp.with(&flow_schema.schema)?;
1133+
let lineage_fingerprint = lineage_fp.into_fingerprint();
11231134
let plan_fut = async move {
11241135
let (import_ops, op_scope, export_ops) = try_join3(
11251136
try_join_all(import_ops_futs),
@@ -1130,6 +1141,7 @@ pub async fn analyze_flow(
11301141

11311142
Ok(ExecutionPlan {
11321143
logic_fingerprint,
1144+
lineage_fingerprint,
11331145
import_ops,
11341146
op_scope,
11351147
export_ops,

src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@ pub struct AnalyzedOpScope {
135135

136136
pub struct ExecutionPlan {
137137
pub logic_fingerprint: Fingerprint,
138+
/// Coarser-grained fingerprint that captures field-level lineage relevant to outputs/exports.
139+
/// Changes that don't affect lineage should not modify this fingerprint.
140+
pub lineage_fingerprint: Fingerprint,
138141

139142
pub import_ops: Vec<AnalyzedImportOp>,
140143
pub op_scope: AnalyzedOpScope,

src/execution/db_tracking.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ pub struct SourceTrackingInfoForProcessing {
8888
pub processed_source_ordinal: Option<i64>,
8989
pub processed_source_fp: Option<Vec<u8>>,
9090
pub process_logic_fingerprint: Option<Vec<u8>>,
91+
pub process_lineage_fingerprint: Option<Vec<u8>>,
9192
pub max_process_ordinal: Option<i64>,
9293
pub process_ordinal: Option<i64>,
9394
}
@@ -99,7 +100,7 @@ pub async fn read_source_tracking_info_for_processing(
99100
pool: &PgPool,
100101
) -> Result<Option<SourceTrackingInfoForProcessing>> {
101102
let query_str = format!(
102-
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
103+
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
103104
if db_setup.has_fast_fingerprint_column {
104105
"processed_source_fp"
105106
} else {
@@ -124,6 +125,7 @@ pub struct SourceTrackingInfoForPrecommit {
124125
pub processed_source_ordinal: Option<i64>,
125126
pub processed_source_fp: Option<Vec<u8>>,
126127
pub process_logic_fingerprint: Option<Vec<u8>>,
128+
pub process_lineage_fingerprint: Option<Vec<u8>>,
127129
pub process_ordinal: Option<i64>,
128130
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
129131
}
@@ -135,7 +137,7 @@ pub async fn read_source_tracking_info_for_precommit(
135137
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
136138
) -> Result<Option<SourceTrackingInfoForPrecommit>> {
137139
let query_str = format!(
138-
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
140+
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
139141
if db_setup.has_fast_fingerprint_column {
140142
"processed_source_fp"
141143
} else {
@@ -240,6 +242,7 @@ pub async fn commit_source_tracking_info(
240242
processed_source_ordinal: Option<i64>,
241243
processed_source_fp: Option<Vec<u8>>,
242244
logic_fingerprint: &[u8],
245+
lineage_fingerprint: &[u8],
243246
process_ordinal: i64,
244247
process_time_micros: i64,
245248
target_keys: TrackedTargetKeyForSource,
@@ -252,25 +255,25 @@ pub async fn commit_source_tracking_info(
252255
"INSERT INTO {} ( \
253256
source_id, source_key, \
254257
max_process_ordinal, staging_target_keys, \
255-
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
256-
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})",
258+
processed_source_ordinal, process_logic_fingerprint, process_lineage_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
259+
VALUES ($1, $2, $7 + 1, $3, $4, $5, $6, $7, $8, $9{})",
257260
db_setup.table_name,
258261
if db_setup.has_fast_fingerprint_column {
259262
", processed_source_fp"
260263
} else {
261264
""
262265
},
263266
if db_setup.has_fast_fingerprint_column {
264-
", $9"
267+
", $10"
265268
} else {
266269
""
267270
},
268271
),
269272
WriteAction::Update => format!(
270-
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2",
273+
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_lineage_fingerprint = $6, process_ordinal = $7, process_time_micros = $8, target_keys = $9{} WHERE source_id = $1 AND source_key = $2",
271274
db_setup.table_name,
272275
if db_setup.has_fast_fingerprint_column {
273-
", processed_source_fp = $9"
276+
", processed_source_fp = $10"
274277
} else {
275278
""
276279
},
@@ -282,12 +285,12 @@ pub async fn commit_source_tracking_info(
282285
.bind(sqlx::types::Json(staging_target_keys)) // $3
283286
.bind(processed_source_ordinal) // $4
284287
.bind(logic_fingerprint) // $5
285-
.bind(process_ordinal) // $6
286-
.bind(process_time_micros) // $7
287-
.bind(sqlx::types::Json(target_keys)); // $8
288-
288+
.bind(lineage_fingerprint) // $6
289+
.bind(process_ordinal) // $7
290+
.bind(process_time_micros) // $8
291+
.bind(sqlx::types::Json(target_keys)); // $9
289292
if db_setup.has_fast_fingerprint_column {
290-
query = query.bind(processed_source_fp); // $9
293+
query = query.bind(processed_source_fp); // $10
291294
}
292295
query.execute(db_executor).await?;
293296

@@ -317,7 +320,8 @@ pub struct TrackedSourceKeyMetadata {
317320
pub source_key: serde_json::Value,
318321
pub processed_source_ordinal: Option<i64>,
319322
pub processed_source_fp: Option<Vec<u8>>,
320-
pub process_logic_fingerprint: Option<Vec<u8>>,
323+
// pub process_logic_fingerprint: Option<Vec<u8>>,
324+
pub process_lineage_fingerprint: Option<Vec<u8>>,
321325
pub max_process_ordinal: Option<i64>,
322326
pub process_ordinal: Option<i64>,
323327
}
@@ -341,7 +345,7 @@ impl ListTrackedSourceKeyMetadataState {
341345
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
342346
self.query_str = format!(
343347
"SELECT \
344-
source_key, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal \
348+
source_key, processed_source_ordinal, {}, process_logic_fingerprint, process_lineage_fingerprint, max_process_ordinal, process_ordinal \
345349
FROM {} WHERE source_id = $1",
346350
if db_setup.has_fast_fingerprint_column {
347351
"processed_source_fp"

src/execution/db_tracking_setup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ async fn upgrade_tracking_table(
4545
processed_source_ordinal BIGINT,
4646
{opt_fast_fingerprint_column}
4747
process_logic_fingerprint BYTEA,
48+
process_lineage_fingerprint BYTEA,
4849
process_ordinal BIGINT,
4950
process_time_micros BIGINT,
5051
target_keys JSONB,

src/execution/row_indexer.rs

Lines changed: 104 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ pub struct SourceVersion {
5454
impl SourceVersion {
5555
pub fn from_stored(
5656
stored_ordinal: Option<i64>,
57-
stored_fp: &Option<Vec<u8>>,
58-
curr_fp: Fingerprint,
57+
stored_lineage_fp: &Option<Vec<u8>>,
58+
curr_lineage_fp: Fingerprint,
5959
) -> Self {
6060
Self {
6161
ordinal: Ordinal(stored_ordinal),
62-
kind: match &stored_fp {
62+
kind: match &stored_lineage_fp {
6363
Some(stored_fp) => {
64-
if stored_fp.as_slice() == curr_fp.0.as_slice() {
64+
if stored_fp.as_slice() == curr_lineage_fp.0.as_slice() {
6565
SourceVersionKind::CurrentLogic
6666
} else {
6767
SourceVersionKind::DifferentLogic
@@ -74,23 +74,23 @@ impl SourceVersion {
7474

7575
pub fn from_stored_processing_info(
7676
info: &db_tracking::SourceTrackingInfoForProcessing,
77-
curr_fp: Fingerprint,
77+
curr_lineage_fp: Fingerprint,
7878
) -> Self {
7979
Self::from_stored(
8080
info.processed_source_ordinal,
81-
&info.process_logic_fingerprint,
82-
curr_fp,
81+
&info.process_lineage_fingerprint,
82+
curr_lineage_fp,
8383
)
8484
}
8585

8686
pub fn from_stored_precommit_info(
8787
info: &db_tracking::SourceTrackingInfoForPrecommit,
88-
curr_fp: Fingerprint,
88+
curr_lineage_fp: Fingerprint,
8989
) -> Self {
9090
Self::from_stored(
9191
info.processed_source_ordinal,
92-
&info.process_logic_fingerprint,
93-
curr_fp,
92+
&info.process_lineage_fingerprint,
93+
curr_lineage_fp,
9494
)
9595
}
9696

@@ -119,9 +119,16 @@ impl SourceVersion {
119119
// Never process older ordinals to maintain consistency
120120
let should_skip = match (self.ordinal.0, target.ordinal.0) {
121121
(Some(existing_ordinal), Some(target_ordinal)) => {
122-
// Skip if target ordinal is older, or same ordinal with same/older logic version
123-
existing_ordinal > target_ordinal
124-
|| (existing_ordinal == target_ordinal && self.kind >= target.kind)
122+
// Skip if target ordinal is older
123+
if existing_ordinal > target_ordinal {
124+
true
125+
// If ordinals are equal, only skip if logic is unchanged (CurrentLogic)
126+
} else if existing_ordinal == target_ordinal {
127+
self.kind == SourceVersionKind::CurrentLogic
128+
&& target.kind == SourceVersionKind::CurrentLogic
129+
} else {
130+
false
131+
}
125132
}
126133
_ => false,
127134
};
@@ -835,6 +842,7 @@ impl<'a> RowIndexer<'a> {
835842
source_version.ordinal.into(),
836843
source_fp,
837844
&self.src_eval_ctx.plan.logic_fingerprint.0,
845+
&self.src_eval_ctx.plan.lineage_fingerprint.0,
838846
precommit_metadata.process_ordinal,
839847
self.process_time.timestamp_micros(),
840848
precommit_metadata.new_target_keys,
@@ -1076,4 +1084,87 @@ mod tests {
10761084
"After optimization, same ordinal should be skipped"
10771085
);
10781086
}
1087+
1088+
#[test]
1089+
fn test_lineage_fingerprint_skip_logic_happy_path() {
1090+
// Happy path: lineage fingerprint matches, should skip if ordinal is same or older
1091+
let stored_version = SourceVersion {
1092+
ordinal: Ordinal(Some(100)),
1093+
kind: SourceVersionKind::CurrentLogic,
1094+
};
1095+
let target_version = SourceVersion {
1096+
ordinal: Ordinal(Some(100)),
1097+
kind: SourceVersionKind::CurrentLogic,
1098+
};
1099+
assert!(stored_version.should_skip(&target_version, None));
1100+
// Newer ordinal should not skip
1101+
let newer_version = SourceVersion {
1102+
ordinal: Ordinal(Some(101)),
1103+
kind: SourceVersionKind::CurrentLogic,
1104+
};
1105+
assert!(!stored_version.should_skip(&newer_version, None));
1106+
}
1107+
1108+
#[test]
1109+
fn test_lineage_fingerprint_skip_logic_benign_change() {
1110+
// Benign change: lineage fingerprint unchanged, ordinal increases
1111+
let stored_version = SourceVersion {
1112+
ordinal: Ordinal(Some(100)),
1113+
kind: SourceVersionKind::CurrentLogic,
1114+
};
1115+
let benign_version = SourceVersion {
1116+
ordinal: Ordinal(Some(101)),
1117+
kind: SourceVersionKind::CurrentLogic,
1118+
};
1119+
assert!(!stored_version.should_skip(&benign_version, None));
1120+
}
1121+
1122+
#[test]
1123+
fn test_lineage_fingerprint_skip_logic_breaking_change() {
1124+
// Breaking change: lineage fingerprint changes, should not skip even if ordinal is same
1125+
let stored_version = SourceVersion {
1126+
ordinal: Ordinal(Some(100)),
1127+
kind: SourceVersionKind::CurrentLogic,
1128+
};
1129+
let breaking_version = SourceVersion {
1130+
ordinal: Ordinal(Some(100)),
1131+
kind: SourceVersionKind::DifferentLogic,
1132+
};
1133+
assert!(!stored_version.should_skip(&breaking_version, None));
1134+
}
1135+
1136+
#[test]
1137+
fn test_lineage_fingerprint_skip_logic_edge_cases() {
1138+
// Edge case: missing ordinals
1139+
let stored_version = SourceVersion {
1140+
ordinal: Ordinal(None),
1141+
kind: SourceVersionKind::CurrentLogic,
1142+
};
1143+
let target_version = SourceVersion {
1144+
ordinal: Ordinal(None),
1145+
kind: SourceVersionKind::CurrentLogic,
1146+
};
1147+
// Should not skip if ordinals are missing
1148+
assert!(!stored_version.should_skip(&target_version, None));
1149+
// Edge case: stored ordinal is older
1150+
let stored_version = SourceVersion {
1151+
ordinal: Ordinal(Some(99)),
1152+
kind: SourceVersionKind::CurrentLogic,
1153+
};
1154+
let target_version = SourceVersion {
1155+
ordinal: Ordinal(Some(100)),
1156+
kind: SourceVersionKind::CurrentLogic,
1157+
};
1158+
assert!(!stored_version.should_skip(&target_version, None));
1159+
// Edge case: stored ordinal is newer
1160+
let stored_version = SourceVersion {
1161+
ordinal: Ordinal(Some(101)),
1162+
kind: SourceVersionKind::CurrentLogic,
1163+
};
1164+
let target_version = SourceVersion {
1165+
ordinal: Ordinal(Some(100)),
1166+
kind: SourceVersionKind::CurrentLogic,
1167+
};
1168+
assert!(stored_version.should_skip(&target_version, None));
1169+
}
10791170
}

src/execution/source_indexer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -283,8 +283,8 @@ impl SourceIndexingContext {
283283
version_state: SourceRowVersionState {
284284
source_version: SourceVersion::from_stored(
285285
key_metadata.processed_source_ordinal,
286-
&key_metadata.process_logic_fingerprint,
287-
plan.logic_fingerprint,
286+
&key_metadata.process_lineage_fingerprint,
287+
plan.lineage_fingerprint,
288288
),
289289
content_version_fp: key_metadata.processed_source_fp,
290290
},

0 commit comments

Comments
 (0)