Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
aeeb1c3
feat: add content hash support for change detection in source processing
vumichien Jun 17, 2025
0306c76
refactor: remove content hash comments from tracking and indexing str…
vumichien Jun 17, 2025
ef7b5d6
style: clean up whitespace and formatting in test_content_hash.py
vumichien Jun 17, 2025
8eccb07
refactor: add type hints to test methods in test_content_hash.py
vumichien Jun 17, 2025
e18946b
refactor: remove content hash handling from source executor options a…
vumichien Jun 19, 2025
4e8398e
implement update function for source ordinal and fingerprints, enhanc…
vumichien Jun 19, 2025
914efeb
enhance fast path logic in update_source_row to include version verif…
vumichien Jun 19, 2025
86d58e2
revert all changes for files under ops
vumichien Jun 20, 2025
486e14b
remove processed_source_content_hash from tracking structures and opt…
vumichien Jun 21, 2025
fb9bb40
implement content hash computation in EvaluationMemory and optimize u…
vumichien Jun 21, 2025
e20bc54
update source tracking logic to include process ordinal in database u…
vumichien Jun 21, 2025
61164ce
refactor: optimize update_source_tracking_ordinal_and_logic by removi…
vumichien Jun 24, 2025
f214b23
update EvaluationMemory initialization and streamline content hash ha…
vumichien Jun 25, 2025
b681c65
remove test suite for content hash functionality in test_content_hash…
vumichien Jun 26, 2025
9a2cbfe
simplify update_source_tracking_ordinal_and_logic by removing unneces…
vumichien Jun 26, 2025
fc010c4
rename update_source_tracking_ordinal_and_logic to update_source_trac…
vumichien Jun 27, 2025
e81f06b
refactor memoization info handling
vumichien Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct TrackedTargetKeyInfo {
pub key: serde_json::Value,
pub additional_key: serde_json::Value,
pub process_ordinal: i64,
// None means deletion.
pub fingerprint: Option<Fingerprint>,
}

Expand Down Expand Up @@ -84,6 +83,8 @@ pub struct SourceTrackingInfoForProcessing {

pub processed_source_ordinal: Option<i64>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub max_process_ordinal: Option<i64>,
pub process_ordinal: Option<i64>,
}

pub async fn read_source_tracking_info_for_processing(
Expand All @@ -93,7 +94,7 @@ pub async fn read_source_tracking_info_for_processing(
pool: &PgPool,
) -> Result<Option<SourceTrackingInfoForProcessing>> {
let query_str = format!(
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
);
let tracking_info = sqlx::query_as(&query_str)
Expand Down Expand Up @@ -305,3 +306,23 @@ pub async fn read_source_last_processed_info(
.await?;
Ok(last_processed_info)
}

pub async fn update_source_tracking_ordinal(
source_id: i32,
source_key_json: &serde_json::Value,
processed_source_ordinal: Option<i64>,
db_setup: &TrackingTableSetupState,
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<()> {
let query_str = format!(
"UPDATE {} SET processed_source_ordinal = $3 WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
);
sqlx::query(&query_str)
.bind(source_id) // $1
.bind(source_key_json) // $2
.bind(processed_source_ordinal) // $3
.execute(db_executor)
.await?;
Ok(())
}
3 changes: 1 addition & 2 deletions src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async fn upgrade_tracking_table(
);
sqlx::query(&query).execute(pool).await?;
}

Ok(())
}

Expand Down Expand Up @@ -154,8 +155,6 @@ impl ResourceSetupStatus for TrackingTableSetupStatus {
(None, None) => SetupChangeType::NoChange,
}
}


}

impl TrackingTableSetupStatus {
Expand Down
2 changes: 1 addition & 1 deletion src/execution/dumper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use futures::future::try_join_all;
use futures::StreamExt;
use futures::future::try_join_all;
use indexmap::IndexMap;
use itertools::Itertools;
use serde::ser::SerializeSeq;
Expand Down
11 changes: 9 additions & 2 deletions src/execution/memoization.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::{bail, Result};
use anyhow::{Result, bail};
use serde::{Deserialize, Serialize};
use std::{
borrow::Cow,
Expand All @@ -25,6 +25,9 @@ pub struct StoredMemoizationInfo {

#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,

#[serde(default, skip_serializing_if = "Option::is_none")]
pub content_hash: Option<Fingerprint>,
}

pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
Expand Down Expand Up @@ -156,7 +159,11 @@ impl EvaluationMemory {
.into_iter()
.filter_map(|(k, v)| v.into_stored().map(|uuids| (k, uuids)))
.collect();
Ok(StoredMemoizationInfo { cache, uuids })
Ok(StoredMemoizationInfo {
cache,
uuids,
content_hash: None,
})
}

pub fn get_cache_entry(
Expand Down
Loading