Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
aeeb1c3
feat: add content hash support for change detection in source processing
vumichien Jun 17, 2025
0306c76
refactor: remove content hash comments from tracking and indexing str…
vumichien Jun 17, 2025
ef7b5d6
style: clean up whitespace and formatting in test_content_hash.py
vumichien Jun 17, 2025
8eccb07
refactor: add type hints to test methods in test_content_hash.py
vumichien Jun 17, 2025
e18946b
refactor: remove content hash handling from source executor options a…
vumichien Jun 19, 2025
4e8398e
implement update function for source ordinal and fingerprints, enhanc…
vumichien Jun 19, 2025
914efeb
enhance fast path logic in update_source_row to include version verif…
vumichien Jun 19, 2025
86d58e2
revert all changes for files under ops
vumichien Jun 20, 2025
486e14b
remove processed_source_content_hash from tracking structures and opt…
vumichien Jun 21, 2025
fb9bb40
implement content hash computation in EvaluationMemory and optimize u…
vumichien Jun 21, 2025
e20bc54
update source tracking logic to include process ordinal in database u…
vumichien Jun 21, 2025
61164ce
refactor: optimize update_source_tracking_ordinal_and_logic by removi…
vumichien Jun 24, 2025
f214b23
update EvaluationMemory initialization and streamline content hash ha…
vumichien Jun 25, 2025
b681c65
remove test suite for content hash functionality in test_content_hash…
vumichien Jun 26, 2025
9a2cbfe
simplify update_source_tracking_ordinal_and_logic by removing unneces…
vumichien Jun 26, 2025
fc010c4
rename update_source_tracking_ordinal_and_logic to update_source_trac…
vumichien Jun 27, 2025
e81f06b
refactor memoization info handling
vumichien Jun 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
514 changes: 514 additions & 0 deletions python/cocoindex/tests/test_content_hash.py

Large diffs are not rendered by default.

29 changes: 18 additions & 11 deletions src/execution/db_tracking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub struct TrackedTargetKeyInfo {
pub key: serde_json::Value,
pub additional_key: serde_json::Value,
pub process_ordinal: i64,
// None means deletion.
pub fingerprint: Option<Fingerprint>,
}

Expand Down Expand Up @@ -84,6 +83,8 @@ pub struct SourceTrackingInfoForProcessing {

pub processed_source_ordinal: Option<i64>,
pub process_logic_fingerprint: Option<Vec<u8>>,
/// Content hash of the processed source for change detection
pub processed_source_content_hash: Option<Vec<u8>>,
}

pub async fn read_source_tracking_info_for_processing(
Expand All @@ -93,7 +94,7 @@ pub async fn read_source_tracking_info_for_processing(
pool: &PgPool,
) -> Result<Option<SourceTrackingInfoForProcessing>> {
let query_str = format!(
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT memoization_info, processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash FROM {} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
);
let tracking_info = sqlx::query_as(&query_str)
Expand All @@ -112,6 +113,7 @@ pub struct SourceTrackingInfoForPrecommit {

pub processed_source_ordinal: Option<i64>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub processed_source_content_hash: Option<Vec<u8>>,
pub process_ordinal: Option<i64>,
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
}
Expand All @@ -123,7 +125,7 @@ pub async fn read_source_tracking_info_for_precommit(
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
) -> Result<Option<SourceTrackingInfoForPrecommit>> {
let query_str = format!(
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT max_process_ordinal, staging_target_keys, processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash, process_ordinal, target_keys FROM {} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
);
let precommit_tracking_info = sqlx::query_as(&query_str)
Expand Down Expand Up @@ -196,6 +198,7 @@ pub async fn commit_source_tracking_info(
staging_target_keys: TrackedTargetKeyForSource,
processed_source_ordinal: Option<i64>,
logic_fingerprint: &[u8],
processed_source_content_hash: Option<&[u8]>,
process_ordinal: i64,
process_time_micros: i64,
target_keys: TrackedTargetKeyForSource,
Expand All @@ -208,12 +211,12 @@ pub async fn commit_source_tracking_info(
"INSERT INTO {} ( \
source_id, source_key, \
max_process_ordinal, staging_target_keys, \
processed_source_ordinal, process_logic_fingerprint, process_ordinal, process_time_micros, target_keys) \
VALUES ($1, $2, $6 + 1, $3, $4, $5, $6, $7, $8)",
processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash, process_ordinal, process_time_micros, target_keys) \
VALUES ($1, $2, $7 + 1, $3, $4, $5, $6, $7, $8, $9)",
db_setup.table_name
),
WriteAction::Update => format!(
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, process_ordinal = $6, process_time_micros = $7, target_keys = $8 WHERE source_id = $1 AND source_key = $2",
"UPDATE {} SET staging_target_keys = $3, processed_source_ordinal = $4, process_logic_fingerprint = $5, processed_source_content_hash = $6, process_ordinal = $7, process_time_micros = $8, target_keys = $9 WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
),
};
Expand All @@ -223,9 +226,10 @@ pub async fn commit_source_tracking_info(
.bind(sqlx::types::Json(staging_target_keys)) // $3
.bind(processed_source_ordinal) // $4
.bind(logic_fingerprint) // $5
.bind(process_ordinal) // $6
.bind(process_time_micros) // $7
.bind(sqlx::types::Json(target_keys)) // $8
.bind(processed_source_content_hash) // $6
.bind(process_ordinal) // $7
.bind(process_time_micros) // $8
.bind(sqlx::types::Json(target_keys)) // $9
.execute(db_executor)
.await?;
Ok(())
Expand Down Expand Up @@ -254,6 +258,7 @@ pub struct TrackedSourceKeyMetadata {
pub source_key: serde_json::Value,
pub processed_source_ordinal: Option<i64>,
pub process_logic_fingerprint: Option<Vec<u8>>,
pub processed_source_content_hash: Option<Vec<u8>>,
}

pub struct ListTrackedSourceKeyMetadataState {
Expand All @@ -274,7 +279,7 @@ impl ListTrackedSourceKeyMetadataState {
pool: &'a PgPool,
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
self.query_str = format!(
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash FROM {} WHERE source_id = $1",
db_setup.table_name
);
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)
Expand All @@ -285,6 +290,8 @@ impl ListTrackedSourceKeyMetadataState {
pub struct SourceLastProcessedInfo {
pub processed_source_ordinal: Option<i64>,
pub process_logic_fingerprint: Option<Vec<u8>>,
#[allow(dead_code)]
pub processed_source_content_hash: Option<Vec<u8>>,
pub process_time_micros: Option<i64>,
}

Expand All @@ -295,7 +302,7 @@ pub async fn read_source_last_processed_info(
pool: &PgPool,
) -> Result<Option<SourceLastProcessedInfo>> {
let query_str = format!(
"SELECT processed_source_ordinal, process_logic_fingerprint, process_time_micros FROM {} WHERE source_id = $1 AND source_key = $2",
"SELECT processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash, process_time_micros FROM {} WHERE source_id = $1 AND source_key = $2",
db_setup.table_name
);
let last_processed_info = sqlx::query_as(&query_str)
Expand Down
10 changes: 9 additions & 1 deletion src/execution/db_tracking_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub fn default_tracking_table_name(flow_name: &str) -> String {
)
}

pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 1;
pub const CURRENT_TRACKING_TABLE_VERSION: i32 = 2;

async fn upgrade_tracking_table(
pool: &PgPool,
Expand Down Expand Up @@ -42,6 +42,14 @@ async fn upgrade_tracking_table(
);
sqlx::query(&query).execute(pool).await?;
}

if existing_version_id < 2 && target_version_id >= 2 {
let query = format!(
"ALTER TABLE {table_name} ADD COLUMN IF NOT EXISTS processed_source_content_hash BYTEA;",
);
sqlx::query(&query).execute(pool).await?;
}

Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/execution/dumper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use futures::future::try_join_all;
use futures::StreamExt;
use futures::future::try_join_all;
use indexmap::IndexMap;
use itertools::Itertools;
use serde::ser::SerializeSeq;
Expand Down
Loading