Skip to content

Commit 102fde5

Browse files
authored
feat(fast-fp): add process_logic_fingerprint for tracking table and use it for content change check (#892)
* feat(fast-fp): add `process_logic_fingerprint` for tracking table * feat(fast-fp): store and check `process_logic_fingerprint` column
1 parent 4cda7ce commit 102fde5

File tree

7 files changed

+103
-45
lines changed

7 files changed

+103
-45
lines changed

src/builder/exec_ctx.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,9 @@ pub fn build_flow_setup_execution_context(
288288
db_tracking_setup::default_source_state_table_name(&flow_inst.name)
289289
})
290290
}),
291+
has_fast_fingerprint_column: metadata
292+
.features
293+
.contains(setup::flow_features::FAST_FINGERPRINT),
291294
},
292295
targets: target_states,
293296
metadata,

src/execution/db_tracking.rs

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub struct SourceTrackingInfoForProcessing {
8282
pub memoization_info: Option<sqlx::types::Json<Option<StoredMemoizationInfo>>>,
8383

8484
pub processed_source_ordinal: Option<i64>,
85+
pub processed_source_fp: Option<Vec<u8>>,
8586
pub process_logic_fingerprint: Option<Vec<u8>>,
8687
pub max_process_ordinal: Option<i64>,
8788
pub process_ordinal: Option<i64>,
@@ -94,7 +95,12 @@ pub async fn read_source_tracking_info_for_processing(
9495
pool: &PgPool,
9596
) -> Result<Option<SourceTrackingInfoForProcessing>> {
9697
let query_str = format!(
97-
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
98+
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
99+
if db_setup.has_fast_fingerprint_column {
100+
"processed_source_fp"
101+
} else {
102+
"NULL::bytea AS processed_source_fp"
103+
},
98104
db_setup.table_name
99105
);
100106
let tracking_info = sqlx::query_as(&query_str)
@@ -198,6 +204,7 @@ pub async fn commit_source_tracking_info(
198204
source_key_json: &serde_json::Value,
199205
staging_target_keys: TrackedTargetKeyForSource,
200206
processed_source_ordinal: Option<i64>,
207+
processed_source_fp: Option<Vec<u8>>,
201208
logic_fingerprint: &[u8],
202209
process_ordinal: i64,
203210
process_time_micros: i64,
@@ -211,26 +218,45 @@ pub async fn commit_source_tracking_info(
211218
"INSERT INTO {} ( \
212219
source_id, source_key, \
213220
max_process_ordinal, staging_target_keys, \
214-
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys) \
215-
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8)",
216-
db_setup.table_name
221+
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
222+
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})",
223+
db_setup.table_name,
224+
if db_setup.has_fast_fingerprint_column {
225+
", processed_source_fp"
226+
} else {
227+
""
228+
},
229+
if db_setup.has_fast_fingerprint_column {
230+
", $9"
231+
} else {
232+
""
233+
},
217234
),
218235
WriteAction::Update => format!(
219-
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8 WHERE source_id = $1 AND source_key = $2",
220-
db_setup.table_name
236+
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2",
237+
db_setup.table_name,
238+
if db_setup.has_fast_fingerprint_column {
239+
", processed_source_fp = $9"
240+
} else {
241+
""
242+
},
221243
),
222244
};
223-
sqlx::query(&query_str)
245+
let mut query = sqlx::query(&query_str)
224246
.bind(source_id) // $1
225247
.bind(source_key_json) // $2
226248
.bind(sqlx::types::Json(staging_target_keys)) // $3
227249
.bind(processed_source_ordinal) // $4
228250
.bind(logic_fingerprint) // $5
229251
.bind(process_ordinal) // $6
230252
.bind(process_time_micros) // $7
231-
.bind(sqlx::types::Json(target_keys)) // $8
232-
.execute(db_executor)
233-
.await?;
253+
.bind(sqlx::types::Json(target_keys)); // $8
254+
255+
if db_setup.has_fast_fingerprint_column {
256+
query = query.bind(processed_source_fp); // $9
257+
}
258+
query.execute(db_executor).await?;
259+
234260
Ok(())
235261
}
236262

src/execution/db_tracking_setup.rs

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,16 @@ pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;
2222

2323
async fn upgrade_tracking_table(
2424
pool: &PgPool,
25-
table_name: &str,
25+
desired_state: &TrackingTableSetupState,
2626
existing_version_id: i32,
27-
target_version_id: i32,
2827
) -> Result<()> {
29-
if existing_version_id < 1 && target_version_id >= 1 {
30-
let query = format!(
28+
if existing_version_id < 1 && desired_state.version_id >= 1 {
29+
let table_name = &desired_state.table_name;
30+
let opt_fast_fingerprint_column = desired_state
31+
.has_fast_fingerprint_column
32+
.then(|| "processed_source_fp BYTEA,")
33+
.unwrap_or("");
34+
let query = format!(
3135
"CREATE TABLE IF NOT EXISTS {table_name} (
3236
source_id INTEGER NOT NULL,
3337
source_key JSONB NOT NULL,
@@ -39,6 +43,7 @@ async fn upgrade_tracking_table(
3943
4044
-- Update after applying the changes to the target storage.
4145
processed_source_ordinal BIGINT,
46+
{opt_fast_fingerprint_column}
4247
process_logic_fingerprint BYTEA,
4348
process_ordinal BIGINT,
4449
process_time_micros BIGINT,
@@ -73,6 +78,8 @@ pub struct TrackingTableSetupState {
7378
pub version_id: i32,
7479
#[serde(default)]
7580
pub source_state_table_name: Option<String>,
81+
#[serde(default)]
82+
pub has_fast_fingerprint_column: bool,
7683
}
7784

7885
#[derive(Debug)]
@@ -248,13 +255,8 @@ impl TrackingTableSetupChange {
248255
}
249256

250257
if self.min_existing_version_id != Some(desired.version_id) {
251-
upgrade_tracking_table(
252-
pool,
253-
&desired.table_name,
254-
self.min_existing_version_id.unwrap_or(0),
255-
desired.version_id,
256-
)
257-
.await?;
258+
upgrade_tracking_table(pool, desired, self.min_existing_version_id.unwrap_or(0))
259+
.await?;
258260
}
259261
} else {
260262
for lagacy_name in self.legacy_tracking_table_names.iter() {

src/execution/memoization.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@ pub struct StoredMemoizationInfo {
2626
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
2727
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
2828

29+
/// TO BE DEPRECATED. Use the new `processed_source_fp` column instead.
2930
#[serde(default, skip_serializing_if = "Option::is_none")]
30-
pub content_hash: Option<Fingerprint>,
31+
pub content_hash: Option<String>,
3132
}
3233

3334
pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;

src/execution/row_indexer.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::prelude::*;
22

3+
use base64::Engine;
4+
use base64::prelude::BASE64_STANDARD;
35
use futures::future::try_join_all;
46
use sqlx::PgPool;
57
use std::collections::{HashMap, HashSet};
@@ -419,6 +421,7 @@ async fn commit_source_tracking_info(
419421
source_id: i32,
420422
source_key_json: &serde_json::Value,
421423
source_version: &SourceVersion,
424+
source_fp: Option<Vec<u8>>,
422425
logic_fingerprint: &[u8],
423426
precommit_metadata: PrecommitMetadata,
424427
process_timestamp: &chrono::DateTime<chrono::Utc>,
@@ -482,6 +485,7 @@ async fn commit_source_tracking_info(
482485
source_key_json,
483486
cleaned_staging_target_keys,
484487
source_version.ordinal.into(),
488+
source_fp,
485489
logic_fingerprint,
486490
precommit_metadata.process_ordinal,
487491
process_timestamp.timestamp_micros(),
@@ -508,7 +512,7 @@ async fn try_content_hash_optimization(
508512
src_eval_ctx: &SourceRowEvaluationContext<'_>,
509513
source_key_json: &serde_json::Value,
510514
source_version: &SourceVersion,
511-
current_hash: &crate::utils::fingerprint::Fingerprint,
515+
current_hash: &[u8],
512516
tracking_info: &db_tracking::SourceTrackingInfoForProcessing,
513517
existing_version: &Option<SourceVersion>,
514518
db_setup: &db_tracking_setup::TrackingTableSetupState,
@@ -523,21 +527,31 @@ async fn try_content_hash_optimization(
523527
return Ok(None);
524528
}
525529

526-
if tracking_info
527-
.max_process_ordinal
528-
.zip(tracking_info.process_ordinal)
529-
.is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord)
530-
{
531-
return Ok(None);
532-
}
530+
let existing_hash: Option<Cow<'_, Vec<u8>>> = if db_setup.has_fast_fingerprint_column {
531+
tracking_info
532+
.processed_source_fp
533+
.as_ref()
534+
.map(|fp| Cow::Borrowed(fp))
535+
} else {
536+
if tracking_info
537+
.max_process_ordinal
538+
.zip(tracking_info.process_ordinal)
539+
.is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord)
540+
{
541+
return Ok(None);
542+
}
533543

534-
let existing_hash = tracking_info
535-
.memoization_info
536-
.as_ref()
537-
.and_then(|info| info.0.as_ref())
538-
.and_then(|stored_info| stored_info.content_hash.as_ref());
544+
tracking_info
545+
.memoization_info
546+
.as_ref()
547+
.and_then(|info| info.0.as_ref())
548+
.and_then(|stored_info| stored_info.content_hash.as_ref())
549+
.map(|content_hash| BASE64_STANDARD.decode(content_hash))
550+
.transpose()?
551+
.map(Cow::Owned)
552+
};
539553

540-
if existing_hash != Some(current_hash) {
554+
if existing_hash.as_ref().map(|fp| fp.as_slice()) != Some(current_hash) {
541555
return Ok(None);
542556
}
543557

@@ -641,6 +655,8 @@ pub async fn update_source_row(
641655
pool: &PgPool,
642656
update_stats: &stats::UpdateStats,
643657
) -> Result<SkippedOr<()>> {
658+
let tracking_setup_state = &setup_execution_ctx.setup_state.tracking_table;
659+
644660
let source_key_json = serde_json::to_value(src_eval_ctx.key)?;
645661
let process_time = chrono::Utc::now();
646662
let source_id = setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id;
@@ -689,10 +705,10 @@ pub async fn update_source_row(
689705
src_eval_ctx,
690706
&source_key_json,
691707
source_version,
692-
current_hash,
708+
current_hash.as_slice(),
693709
existing_tracking_info,
694710
&existing_version,
695-
&setup_execution_ctx.setup_state.tracking_table,
711+
tracking_setup_state,
696712
update_stats,
697713
pool,
698714
)
@@ -702,7 +718,7 @@ pub async fn update_source_row(
702718
}
703719
}
704720

705-
let (output, stored_mem_info) = {
721+
let (output, stored_mem_info, source_fp) = {
706722
let extracted_memoization_info = existing_tracking_info
707723
.and_then(|info| info.memoization_info)
708724
.and_then(|info| info.0);
@@ -721,11 +737,15 @@ pub async fn update_source_row(
721737
let output =
722738
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
723739
let mut stored_info = evaluation_memory.into_stored()?;
724-
stored_info.content_hash = current_content_hash;
725-
726-
(Some(output), stored_info)
740+
let content_hash = current_content_hash.map(|fp| fp.0.to_vec());
741+
if tracking_setup_state.has_fast_fingerprint_column {
742+
(Some(output), stored_info, content_hash)
743+
} else {
744+
stored_info.content_hash = content_hash.map(|fp| BASE64_STANDARD.encode(fp));
745+
(Some(output), stored_info, None)
746+
}
727747
}
728-
interface::SourceValue::NonExistence => (None, Default::default()),
748+
interface::SourceValue::NonExistence => (None, Default::default(), None),
729749
}
730750
};
731751

@@ -788,10 +808,11 @@ pub async fn update_source_row(
788808
source_id,
789809
&source_key_json,
790810
source_version,
811+
source_fp,
791812
&src_eval_ctx.plan.logic_fingerprint.0,
792813
precommit_output.metadata,
793814
&process_time,
794-
&setup_execution_ctx.setup_state.tracking_table,
815+
tracking_setup_state,
795816
pool,
796817
)
797818
.await?;

src/setup/flow_features.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::prelude::*;
22

33
pub const SOURCE_STATE_TABLE: &str = "source_state_table";
4+
pub const FAST_FINGERPRINT: &str = "fast_fingerprint";
45

56
pub fn default_features() -> BTreeSet<String> {
6-
BTreeSet::new()
7+
BTreeSet::from_iter([FAST_FINGERPRINT.to_string()])
78
}

src/utils/fingerprint.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ impl Fingerprint {
5151
Err(e) => bail!("Fingerprint bytes length is unexpected: {}", e.len()),
5252
}
5353
}
54+
55+
pub fn as_slice(&self) -> &[u8] {
56+
&self.0
57+
}
5458
}
5559

5660
impl Serialize for Fingerprint {

0 commit comments

Comments
 (0)