Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/builder/exec_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ pub fn build_flow_setup_execution_context(
db_tracking_setup::default_source_state_table_name(&flow_inst.name)
})
}),
has_fast_fingerprint_column: metadata
.features
.contains(setup::flow_features::FAST_FINGERPRINT),
},
targets: target_states,
metadata,
Expand Down
46 changes: 36 additions & 10 deletions src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct SourceTrackingInfoForProcessing {
pub memoization_info: Option<sqlx::types::Json<Option<StoredMemoizationInfo>>>,

pub processed_source_ordinal: Option<i64>,
pub processed_source_fp: Option<Vec<u8>>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub max_process_ordinal: Option<i64>,
pub process_ordinal: Option<i64>,
Expand All @@ -94,7 +95,12 @@ pub async fn read_source_tracking_info_for_processing(
pool: &PgPool,
) -> Result<Option<SourceTrackingInfoForProcessing>> {
let query_str = format!(
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT memoization_info, processed_source_ordinal, {}, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
if db_setup.has_fast_fingerprint_column {
"processed_source_fp"
} else {
"NULL::bytea AS processed_source_fp"
},
db_setup.table_name
);
let tracking_info = sqlx::query_as(&query_str)
Expand Down Expand Up @@ -198,6 +204,7 @@ pub async fn commit_source_tracking_info(
source_key_json: &serde_json::Value,
staging_target_keys: TrackedTargetKeyForSource,
processed_source_ordinal: Option<i64>,
processed_source_fp: Option<Vec<u8>>,
logic_fingerprint: &[u8],
process_ordinal: i64,
process_time_micros: i64,
Expand All @@ -211,26 +218,45 @@ pub async fn commit_source_tracking_info(
"INSERT INTO {} ( \
source_id, source_key, \
max_process_ordinal, staging_target_keys, \
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys) \
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8)",
db_setup.table_name
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys{}) \
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8{})",
db_setup.table_name,
if db_setup.has_fast_fingerprint_column {
", processed_source_fp"
} else {
""
},
if db_setup.has_fast_fingerprint_column {
", $9"
} else {
""
},
),
WriteAction::Update => format!(
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8 WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8{} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name,
if db_setup.has_fast_fingerprint_column {
", processed_source_fp = $9"
} else {
""
},
),
};
sqlx::query(&query_str)
let mut query = sqlx::query(&query_str)
.bind(source_id) // $1
.bind(source_key_json) // $2
.bind(sqlx::types::Json(staging_target_keys)) // $3
.bind(processed_source_ordinal) // $4
.bind(logic_fingerprint) // $5
.bind(process_ordinal) // $6
.bind(process_time_micros) // $7
.bind(sqlx::types::Json(target_keys)) // $8
.execute(db_executor)
.await?;
.bind(sqlx::types::Json(target_keys)); // $8

if db_setup.has_fast_fingerprint_column {
query = query.bind(processed_source_fp); // $9
}
query.execute(db_executor).await?;

Ok(())
}

Expand Down
24 changes: 13 additions & 11 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@ pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;

async fn upgrade_tracking_table(
pool: &PgPool,
table_name: &str,
desired_state: &TrackingTableSetupState,
existing_version_id: i32,
target_version_id: i32,
) -> Result<()> {
if existing_version_id < 1 && target_version_id >= 1 {
let query = format!(
if existing_version_id < 1 && desired_state.version_id >= 1 {
let table_name = &desired_state.table_name;
let opt_fast_fingerprint_column = desired_state
.has_fast_fingerprint_column
.then(|| "processed_source_fp BYTEA,")
.unwrap_or("");
let query = format!(
"CREATE TABLE IF NOT EXISTS {table_name} (
source_id INTEGER NOT NULL,
source_key JSONB NOT NULL,
Expand All @@ -39,6 +43,7 @@ async fn upgrade_tracking_table(

-- Update after applying the changes to the target storage.
processed_source_ordinal BIGINT,
{opt_fast_fingerprint_column}
process_logic_fingerprint BYTEA,
process_ordinal BIGINT,
process_time_micros BIGINT,
Expand Down Expand Up @@ -73,6 +78,8 @@ pub struct TrackingTableSetupState {
pub version_id: i32,
#[serde(default)]
pub source_state_table_name: Option<String>,
#[serde(default)]
pub has_fast_fingerprint_column: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -248,13 +255,8 @@ impl TrackingTableSetupChange {
}

if self.min_existing_version_id != Some(desired.version_id) {
upgrade_tracking_table(
pool,
&desired.table_name,
self.min_existing_version_id.unwrap_or(0),
desired.version_id,
)
.await?;
upgrade_tracking_table(pool, desired, self.min_existing_version_id.unwrap_or(0))
.await?;
}
} else {
for lagacy_name in self.legacy_tracking_table_names.iter() {
Expand Down
3 changes: 2 additions & 1 deletion src/execution/memoization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ pub struct StoredMemoizationInfo {
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,

/// TO BE DEPRECATED. Use the new `processed_source_fp` column instead.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub content_hash: Option<Fingerprint>,
pub content_hash: Option<String>,
}

pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
Expand Down
65 changes: 43 additions & 22 deletions src/execution/row_indexer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::prelude::*;

use base64::Engine;
use base64::prelude::BASE64_STANDARD;
use futures::future::try_join_all;
use sqlx::PgPool;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -419,6 +421,7 @@ async fn commit_source_tracking_info(
source_id: i32,
source_key_json: &serde_json::Value,
source_version: &SourceVersion,
source_fp: Option<Vec<u8>>,
logic_fingerprint: &[u8],
precommit_metadata: PrecommitMetadata,
process_timestamp: &chrono::DateTime<chrono::Utc>,
Expand Down Expand Up @@ -482,6 +485,7 @@ async fn commit_source_tracking_info(
source_key_json,
cleaned_staging_target_keys,
source_version.ordinal.into(),
source_fp,
logic_fingerprint,
precommit_metadata.process_ordinal,
process_timestamp.timestamp_micros(),
Expand All @@ -508,7 +512,7 @@ async fn try_content_hash_optimization(
src_eval_ctx: &SourceRowEvaluationContext<'_>,
source_key_json: &serde_json::Value,
source_version: &SourceVersion,
current_hash: &crate::utils::fingerprint::Fingerprint,
current_hash: &[u8],
tracking_info: &db_tracking::SourceTrackingInfoForProcessing,
existing_version: &Option<SourceVersion>,
db_setup: &db_tracking_setup::TrackingTableSetupState,
Expand All @@ -523,21 +527,31 @@ async fn try_content_hash_optimization(
return Ok(None);
}

if tracking_info
.max_process_ordinal
.zip(tracking_info.process_ordinal)
.is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord)
{
return Ok(None);
}
let existing_hash: Option<Cow<'_, Vec<u8>>> = if db_setup.has_fast_fingerprint_column {
tracking_info
.processed_source_fp
.as_ref()
.map(|fp| Cow::Borrowed(fp))
} else {
if tracking_info
.max_process_ordinal
.zip(tracking_info.process_ordinal)
.is_none_or(|(max_ord, proc_ord)| max_ord != proc_ord)
{
return Ok(None);
}

let existing_hash = tracking_info
.memoization_info
.as_ref()
.and_then(|info| info.0.as_ref())
.and_then(|stored_info| stored_info.content_hash.as_ref());
tracking_info
.memoization_info
.as_ref()
.and_then(|info| info.0.as_ref())
.and_then(|stored_info| stored_info.content_hash.as_ref())
.map(|content_hash| BASE64_STANDARD.decode(content_hash))
.transpose()?
.map(Cow::Owned)
};

if existing_hash != Some(current_hash) {
if existing_hash.as_ref().map(|fp| fp.as_slice()) != Some(current_hash) {
return Ok(None);
}

Expand Down Expand Up @@ -641,6 +655,8 @@ pub async fn update_source_row(
pool: &PgPool,
update_stats: &stats::UpdateStats,
) -> Result<SkippedOr<()>> {
let tracking_setup_state = &setup_execution_ctx.setup_state.tracking_table;

let source_key_json = serde_json::to_value(src_eval_ctx.key)?;
let process_time = chrono::Utc::now();
let source_id = setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id;
Expand Down Expand Up @@ -689,10 +705,10 @@ pub async fn update_source_row(
src_eval_ctx,
&source_key_json,
source_version,
current_hash,
current_hash.as_slice(),
existing_tracking_info,
&existing_version,
&setup_execution_ctx.setup_state.tracking_table,
tracking_setup_state,
update_stats,
pool,
)
Expand All @@ -702,7 +718,7 @@ pub async fn update_source_row(
}
}

let (output, stored_mem_info) = {
let (output, stored_mem_info, source_fp) = {
let extracted_memoization_info = existing_tracking_info
.and_then(|info| info.memoization_info)
.and_then(|info| info.0);
Expand All @@ -721,11 +737,15 @@ pub async fn update_source_row(
let output =
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
let mut stored_info = evaluation_memory.into_stored()?;
stored_info.content_hash = current_content_hash;

(Some(output), stored_info)
let content_hash = current_content_hash.map(|fp| fp.0.to_vec());
if tracking_setup_state.has_fast_fingerprint_column {
(Some(output), stored_info, content_hash)
} else {
stored_info.content_hash = content_hash.map(|fp| BASE64_STANDARD.encode(fp));
(Some(output), stored_info, None)
}
}
interface::SourceValue::NonExistence => (None, Default::default()),
interface::SourceValue::NonExistence => (None, Default::default(), None),
}
};

Expand Down Expand Up @@ -788,10 +808,11 @@ pub async fn update_source_row(
source_id,
&source_key_json,
source_version,
source_fp,
&src_eval_ctx.plan.logic_fingerprint.0,
precommit_output.metadata,
&process_time,
&setup_execution_ctx.setup_state.tracking_table,
tracking_setup_state,
pool,
)
.await?;
Expand Down
3 changes: 2 additions & 1 deletion src/setup/flow_features.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::prelude::*;

pub const SOURCE_STATE_TABLE: &str = "source_state_table";
pub const FAST_FINGERPRINT: &str = "fast_fingerprint";

pub fn default_features() -> BTreeSet<String> {
BTreeSet::new()
BTreeSet::from_iter([FAST_FINGERPRINT.to_string()])
}
4 changes: 4 additions & 0 deletions src/utils/fingerprint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ impl Fingerprint {
Err(e) => bail!("Fingerprint bytes length is unexpected: {}", e.len()),
}
}

pub fn as_slice(&self) -> &[u8] {
&self.0
}
}

impl Serialize for Fingerprint {
Expand Down
Loading