Skip to content

Commit fb9bb40

Browse files
committed
implement content hash computation in EvaluationMemory and optimize update_source_row logic for efficient source processing
1 parent 486e14b commit fb9bb40

File tree

3 files changed

+130
-39
lines changed

3 files changed

+130
-39
lines changed

src/execution/db_tracking.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,31 @@ pub async fn read_source_last_processed_info(
306306
.await?;
307307
Ok(last_processed_info)
308308
}
309+
310+
pub async fn update_source_tracking_ordinal_and_logic(
311+
source_id: i32,
312+
source_key_json: &serde_json::Value,
313+
processed_source_ordinal: Option<i64>,
314+
process_logic_fingerprint: &[u8],
315+
process_time_micros: i64,
316+
db_setup: &TrackingTableSetupState,
317+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
318+
) -> Result<()> {
319+
let query_str = format!(
320+
"UPDATE {} SET \
321+
processed_source_ordinal = $3, \
322+
process_logic_fingerprint = $4, \
323+
process_time_micros = $5 \
324+
WHERE source_id = $1 AND source_key = $2",
325+
db_setup.table_name
326+
);
327+
sqlx::query(&query_str)
328+
.bind(source_id) // $1
329+
.bind(source_key_json) // $2
330+
.bind(processed_source_ordinal) // $3
331+
.bind(process_logic_fingerprint) // $4
332+
.bind(process_time_micros) // $5
333+
.execute(db_executor)
334+
.await?;
335+
Ok(())
336+
}

src/execution/memoization.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub struct EvaluationMemory {
8282
cache: Option<Mutex<HashMap<Fingerprint, CacheEntry>>>,
8383
uuids: Mutex<HashMap<Fingerprint, UuidEntry>>,
8484
evaluation_only: bool,
85+
content_hash: Option<Fingerprint>,
8586
}
8687

8788
impl EvaluationMemory {
@@ -123,6 +124,7 @@ impl EvaluationMemory {
123124
.collect(),
124125
),
125126
evaluation_only: options.evaluation_only,
127+
content_hash: None,
126128
}
127129
}
128130

@@ -162,7 +164,7 @@ impl EvaluationMemory {
162164
Ok(StoredMemoizationInfo {
163165
cache,
164166
uuids,
165-
content_hash: None,
167+
content_hash: self.content_hash,
166168
})
167169
}
168170

@@ -210,6 +212,10 @@ impl EvaluationMemory {
210212
Ok(Some(result))
211213
}
212214

215+
pub fn set_content_hash(&mut self, content_hash: Fingerprint) {
216+
self.content_hash = Some(content_hash);
217+
}
218+
213219
pub fn next_uuid(&self, key: Fingerprint) -> Result<uuid::Uuid> {
214220
let mut uuids = self.uuids.lock().unwrap();
215221

src/execution/row_indexer.rs

Lines changed: 95 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -597,26 +597,16 @@ pub async fn update_source_row(
597597
None => (None, None, false, None),
598598
};
599599

600-
let (output, stored_mem_info) = match source_value {
601-
interface::SourceValue::Existence(source_value) => {
602-
let evaluation_memory = EvaluationMemory::new(
603-
process_time,
604-
memoization_info.clone(),
605-
EvaluationMemoryOptions {
606-
enable_cache: true,
607-
evaluation_only: false,
608-
},
609-
);
610-
let output =
611-
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
612-
let stored_info = evaluation_memory.into_stored()?;
613-
614-
if let (Some(existing_info), Some(existing_content_hash)) =
615-
(&memoization_info, &stored_info.content_hash)
616-
{
617-
if can_use_content_hash_optimization
618-
&& existing_info.content_hash.as_ref() == Some(existing_content_hash)
619-
{
600+
if let interface::SourceValue::Existence(ref source_value) = source_value {
601+
if let Some(existing_info) = &memoization_info {
602+
if can_use_content_hash_optimization {
603+
// Compute content hash directly from source data (no expensive evaluation needed)
604+
let current_content_hash = Fingerprinter::default()
605+
.with(source_value)?
606+
.into_fingerprint();
607+
608+
if existing_info.content_hash.as_ref() == Some(&current_content_hash) {
609+
// Content hash matches - try optimization
620610
let mut txn = pool.begin().await?;
621611

622612
let current_tracking_info =
@@ -642,24 +632,18 @@ pub async fn update_source_row(
642632
let original_process_ordinal = original_tracking_info
643633
.as_ref()
644634
.and_then(|info| info.process_ordinal);
645-
if current_info.process_ordinal != original_process_ordinal {
646-
} else {
647-
let query_str = format!(
648-
"UPDATE {} SET \
649-
processed_source_ordinal = $3, \
650-
process_logic_fingerprint = $4, \
651-
process_time_micros = $5 \
652-
WHERE source_id = $1 AND source_key = $2",
653-
src_eval_ctx.plan.tracking_table_setup.table_name
654-
);
655-
sqlx::query(&query_str)
656-
.bind(src_eval_ctx.import_op.source_id) // $1
657-
.bind(&source_key_json) // $2
658-
.bind(source_version.ordinal.0) // $3
659-
.bind(&src_eval_ctx.plan.logic_fingerprint.0) // $4
660-
.bind(process_time.timestamp_micros()) // $5
661-
.execute(&mut *txn)
662-
.await?;
635+
if current_info.process_ordinal == original_process_ordinal {
636+
// Safe to apply optimization - just update tracking table
637+
db_tracking::update_source_tracking_ordinal_and_logic(
638+
src_eval_ctx.import_op.source_id,
639+
&source_key_json,
640+
source_version.ordinal.0,
641+
&src_eval_ctx.plan.logic_fingerprint.0,
642+
process_time.timestamp_micros(),
643+
&src_eval_ctx.plan.tracking_table_setup,
644+
&mut *txn,
645+
)
646+
.await?;
663647

664648
txn.commit().await?;
665649
update_stats.num_no_change.inc(1);
@@ -668,6 +652,29 @@ pub async fn update_source_row(
668652
}
669653
}
670654
}
655+
}
656+
}
657+
658+
let (output, stored_mem_info) = match source_value {
659+
interface::SourceValue::Existence(source_value) => {
660+
let mut evaluation_memory = EvaluationMemory::new(
661+
process_time,
662+
memoization_info.clone(),
663+
EvaluationMemoryOptions {
664+
enable_cache: true,
665+
evaluation_only: false,
666+
},
667+
);
668+
669+
// Compute and set content hash from source data
670+
let content_hash = Fingerprinter::default()
671+
.with(&source_value)?
672+
.into_fingerprint();
673+
evaluation_memory.set_content_hash(content_hash);
674+
675+
let output =
676+
evaluate_source_entry(src_eval_ctx, source_value, &evaluation_memory).await?;
677+
let stored_info = evaluation_memory.into_stored()?;
671678

672679
(Some(output), stored_info)
673680
}
@@ -1071,4 +1078,54 @@ mod tests {
10711078
// The newer version should cause skipping
10721079
assert!(newer_concurrent_version.should_skip(&target_version, None));
10731080
}
1081+
1082+
#[test]
1083+
fn test_content_hash_computation() {
1084+
use crate::base::value::{BasicValue, FieldValues, Value};
1085+
use crate::utils::fingerprint::Fingerprinter;
1086+
1087+
// Test that content hash is computed correctly from source data
1088+
let source_data1 = FieldValues {
1089+
fields: vec![
1090+
Value::Basic(BasicValue::Str("Hello".into())),
1091+
Value::Basic(BasicValue::Int64(42)),
1092+
],
1093+
};
1094+
1095+
let source_data2 = FieldValues {
1096+
fields: vec![
1097+
Value::Basic(BasicValue::Str("Hello".into())),
1098+
Value::Basic(BasicValue::Int64(42)),
1099+
],
1100+
};
1101+
1102+
let source_data3 = FieldValues {
1103+
fields: vec![
1104+
Value::Basic(BasicValue::Str("World".into())), // Different content
1105+
Value::Basic(BasicValue::Int64(42)),
1106+
],
1107+
};
1108+
1109+
let hash1 = Fingerprinter::default()
1110+
.with(&source_data1)
1111+
.unwrap()
1112+
.into_fingerprint();
1113+
1114+
let hash2 = Fingerprinter::default()
1115+
.with(&source_data2)
1116+
.unwrap()
1117+
.into_fingerprint();
1118+
1119+
let hash3 = Fingerprinter::default()
1120+
.with(&source_data3)
1121+
.unwrap()
1122+
.into_fingerprint();
1123+
1124+
// Same content should produce same hash
1125+
assert_eq!(hash1, hash2);
1126+
1127+
// Different content should produce different hash
1128+
assert_ne!(hash1, hash3);
1129+
assert_ne!(hash2, hash3);
1130+
}
10741131
}

0 commit comments

Comments
 (0)