Skip to content

Commit 3cfd800

Browse files
committed
feat(fast-fp): add process_logic_fingerprint for tracking table
1 parent 4cda7ce commit 3cfd800

File tree

3 files changed

+18
-12
lines changed

3 files changed

+18
-12
lines changed

src/builder/exec_ctx.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ pub fn build_flow_setup_execution_context(
288288
db_tracking_setup::default_source_state_table_name(&flow_inst.name)
289289
})
290290
}),
291+
has_fast_fingerprint_column: metadata
292+
.features
293+
.contains(setup::flow_features::FAST_FINGERPRINT),
291294
},
292295
targets: target_states,
293296
metadata,

src/execution/db_tracking_setup.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;
2222

2323
async fn upgrade_tracking_table(
2424
pool: &PgPool,
25-
table_name: &str,
25+
desired_state: &TrackingTableSetupState,
2626
existing_version_id: i32,
27-
target_version_id: i32,
2827
) -> Result<()> {
29-
if existing_version_id < 1 && target_version_id >= 1 {
30-
let query = format!(
28+
if existing_version_id < 1 && desired_state.version_id >= 1 {
29+
let table_name = &desired_state.table_name;
30+
let opt_fast_fingerprint_column = desired_state
31+
.has_fast_fingerprint_column
32+
.then(|| "processed_source_fp BYTEA,")
33+
.unwrap_or("");
34+
let query = format!(
3135
"CREATE TABLE IF NOT EXISTS {table_name} (
3236
source_id INTEGER NOT NULL,
3337
source_key JSONB NOT NULL,
@@ -39,6 +43,7 @@ async fn upgrade_tracking_table(
3943
4044
-- Update after applying the changes to the target storage.
4145
processed_source_ordinal BIGINT,
46+
{opt_fast_fingerprint_column}
4247
process_logic_fingerprint BYTEA,
4348
process_ordinal BIGINT,
4449
process_time_micros BIGINT,
@@ -73,6 +78,8 @@ pub struct TrackingTableSetupState {
7378
pub version_id: i32,
7479
#[serde(default)]
7580
pub source_state_table_name: Option<String>,
81+
#[serde(default)]
82+
pub has_fast_fingerprint_column: bool,
7683
}
7784

7885
#[derive(Debug)]
@@ -248,13 +255,8 @@ impl TrackingTableSetupChange {
248255
}
249256

250257
if self.min_existing_version_id != Some(desired.version_id) {
251-
upgrade_tracking_table(
252-
pool,
253-
&desired.table_name,
254-
self.min_existing_version_id.unwrap_or(0),
255-
desired.version_id,
256-
)
257-
.await?;
258+
upgrade_tracking_table(pool, desired, self.min_existing_version_id.unwrap_or(0))
259+
.await?;
258260
}
259261
} else {
260262
for lagacy_name in self.legacy_tracking_table_names.iter() {

src/setup/flow_features.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::prelude::*;
22

33
pub const SOURCE_STATE_TABLE: &str = "source_state_table";
4+
pub const FAST_FINGERPRINT: &str = "fast_fingerprint";
45

56
pub fn default_features() -> BTreeSet<String> {
6-
BTreeSet::new()
7+
BTreeSet::from_iter([FAST_FINGERPRINT.to_string()])
78
}

0 commit comments

Comments
 (0)