Skip to content

Commit a7dfea3

Browse files
authored
feat: add content hash support for change detection in source processing (#629)
* feat: add content hash support for change detection in source processing * refactor: remove content hash comments from tracking and indexing structures * style: clean up whitespace and formatting in test_content_hash.py * refactor: add type hints to test methods in test_content_hash.py * refactor: remove content hash handling from source executor options and streamline related logic * implement update function for source ordinal and fingerprints, enhancing tracking efficiency * enhance fast path logic in update_source_row to include version verification * revert all changes for files under ops * remove processed_source_content_hash from tracking structures and optimize update logic for source processing * implement content hash computation in EvaluationMemory and optimize update_source_row logic for efficient source processing * update source tracking logic to include process ordinal in database updates * refactor: optimize update_source_tracking_ordinal_and_logic by removing process_logic_fingerprint and enhancing content hash optimization logic in update_source_row * update EvaluationMemory initialization and streamline content hash handling in update_source_row for improved efficiency * remove test suite for content hash functionality in test_content_hash.py to streamline test coverage * simplify update_source_tracking_ordinal_and_logic by removing unnecessary parameters and enhance EvaluationMemory initialization for better clarity and performance * rename update_source_tracking_ordinal_and_logic to update_source_tracking_ordinal and streamline memoization info handling in update_source_row for improved clarity * refactor memoization info handling
1 parent 9568f6d commit a7dfea3

File tree

5 files changed

+382
-36
lines changed

5 files changed

+382
-36
lines changed

src/execution/db_tracking.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ pub struct TrackedTargetKeyInfo {
1313
pub key: serde_json::Value,
1414
pub additional_key: serde_json::Value,
1515
pub process_ordinal: i64,
16-
// None means deletion.
1716
pub fingerprint: Option<Fingerprint>,
1817
}
1918

@@ -84,6 +83,8 @@ pub struct SourceTrackingInfoForProcessing {
8483

8584
pub processed_source_ordinal: Option<i64>,
8685
pub process_logic_fingerprint: Option<Vec<u8>>,
86+
pub max_process_ordinal: Option<i64>,
87+
pub process_ordinal: Option<i64>,
8788
}
8889

8990
pub async fn read_source_tracking_info_for_processing(
@@ -93,7 +94,7 @@ pub async fn read_source_tracking_info_for_processing(
9394
pool: &PgPool,
9495
) -> Result<Option<SourceTrackingInfoForProcessing>> {
9596
let query_str = format!(
96-
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1 AND source_key = $2",
97+
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, max_process_ordinal, process_ordinal FROM {} WHERE source_id = $1 AND source_key = $2",
9798
db_setup.table_name
9899
);
99100
let tracking_info = sqlx::query_as(&query_str)
@@ -305,3 +306,23 @@ pub async fn read_source_last_processed_info(
305306
.await?;
306307
Ok(last_processed_info)
307308
}
309+
310+
pub async fn update_source_tracking_ordinal(
311+
source_id: i32,
312+
source_key_json: &serde_json::Value,
313+
processed_source_ordinal: Option<i64>,
314+
db_setup: &TrackingTableSetupState,
315+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
316+
) -> Result<()> {
317+
let query_str = format!(
318+
"UPDATE {} SET processed_source_ordinal = $3 WHERE source_id = $1 AND source_key = $2",
319+
db_setup.table_name
320+
);
321+
sqlx::query(&query_str)
322+
.bind(source_id) // $1
323+
.bind(source_key_json) // $2
324+
.bind(processed_source_ordinal) // $3
325+
.execute(db_executor)
326+
.await?;
327+
Ok(())
328+
}

src/execution/db_tracking_setup.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ async fn upgrade_tracking_table(
4242
);
4343
sqlx::query(&query).execute(pool).await?;
4444
}
45+
4546
Ok(())
4647
}
4748

src/execution/memoization.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ pub struct StoredMemoizationInfo {
2525

2626
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
2727
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
28+
29+
#[serde(default, skip_serializing_if = "Option::is_none")]
30+
pub content_hash: Option<Fingerprint>,
2831
}
2932

3033
pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
@@ -156,7 +159,11 @@ impl EvaluationMemory {
156159
.into_iter()
157160
.filter_map(|(k, v)| v.into_stored().map(|uuids| (k, uuids)))
158161
.collect();
159-
Ok(StoredMemoizationInfo { cache, uuids })
162+
Ok(StoredMemoizationInfo {
163+
cache,
164+
uuids,
165+
content_hash: None,
166+
})
160167
}
161168

162169
pub fn get_cache_entry(

0 commit comments

Comments
 (0)