Skip to content

Commit 4e8398e

Browse files
committed
implement update function for source ordinal and fingerprints, enhancing tracking efficiency
1 parent e18946b commit 4e8398e

File tree

5 files changed

+142
-249
lines changed

5 files changed

+142
-249
lines changed

src/execution/db_tracking.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ pub struct SourceTrackingInfoForProcessing {
8383

8484
pub processed_source_ordinal: Option<i64>,
8585
pub process_logic_fingerprint: Option<Vec<u8>>,
86-
/// Content hash of the processed source for change detection
86+
#[allow(dead_code)]
8787
pub processed_source_content_hash: Option<Vec<u8>>,
8888
}
8989

@@ -113,6 +113,7 @@ pub struct SourceTrackingInfoForPrecommit {
113113

114114
pub processed_source_ordinal: Option<i64>,
115115
pub process_logic_fingerprint: Option<Vec<u8>>,
116+
#[allow(dead_code)]
116117
pub processed_source_content_hash: Option<Vec<u8>>,
117118
pub process_ordinal: Option<i64>,
118119
pub target_keys: Option<sqlx::types::Json<TrackedTargetKeyForSource>>,
@@ -235,6 +236,41 @@ pub async fn commit_source_tracking_info(
235236
Ok(())
236237
}
237238

239+
pub async fn update_source_ordinal_and_fingerprints_only(
240+
source_id: i32,
241+
source_key_json: &serde_json::Value,
242+
processed_source_ordinal: Option<i64>,
243+
logic_fingerprint: &[u8],
244+
processed_source_content_hash: Option<&[u8]>,
245+
process_ordinal: i64,
246+
process_time_micros: i64,
247+
db_setup: &TrackingTableSetupState,
248+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
249+
) -> Result<()> {
250+
let query_str = format!(
251+
"UPDATE {} SET \
252+
max_process_ordinal = GREATEST(max_process_ordinal, $3), \
253+
processed_source_ordinal = $4, \
254+
process_logic_fingerprint = $5, \
255+
processed_source_content_hash = $6, \
256+
process_ordinal = $3, \
257+
process_time_micros = $7 \
258+
WHERE source_id = $1 AND source_key = $2",
259+
db_setup.table_name
260+
);
261+
sqlx::query(&query_str)
262+
.bind(source_id) // $1
263+
.bind(source_key_json) // $2
264+
.bind(process_ordinal) // $3
265+
.bind(processed_source_ordinal) // $4
266+
.bind(logic_fingerprint) // $5
267+
.bind(processed_source_content_hash) // $6
268+
.bind(process_time_micros) // $7
269+
.execute(db_executor)
270+
.await?;
271+
Ok(())
272+
}
273+
238274
pub async fn delete_source_tracking_info(
239275
source_id: i32,
240276
source_key_json: &serde_json::Value,
@@ -258,7 +294,6 @@ pub struct TrackedSourceKeyMetadata {
258294
pub source_key: serde_json::Value,
259295
pub processed_source_ordinal: Option<i64>,
260296
pub process_logic_fingerprint: Option<Vec<u8>>,
261-
pub processed_source_content_hash: Option<Vec<u8>>,
262297
}
263298

264299
pub struct ListTrackedSourceKeyMetadataState {
@@ -279,7 +314,7 @@ impl ListTrackedSourceKeyMetadataState {
279314
pool: &'a PgPool,
280315
) -> impl Stream<Item = Result<TrackedSourceKeyMetadata, sqlx::Error>> + 'a {
281316
self.query_str = format!(
282-
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint, processed_source_content_hash FROM {} WHERE source_id = $1",
317+
"SELECT source_key, processed_source_ordinal, process_logic_fingerprint FROM {} WHERE source_id = $1",
283318
db_setup.table_name
284319
);
285320
sqlx::query_as(&self.query_str).bind(source_id).fetch(pool)

src/execution/memoization.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::{bail, Result};
1+
use anyhow::{Result, bail};
22
use serde::{Deserialize, Serialize};
33
use std::{
44
borrow::Cow,
@@ -25,6 +25,9 @@ pub struct StoredMemoizationInfo {
2525

2626
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
2727
pub uuids: HashMap<Fingerprint, Vec<uuid::Uuid>>,
28+
29+
#[serde(default, skip_serializing_if = "Option::is_none")]
30+
pub content_hash: Option<Fingerprint>,
2831
}
2932

3033
pub type CacheEntryCell = Arc<tokio::sync::OnceCell<Result<value::Value, SharedError>>>;
@@ -156,7 +159,11 @@ impl EvaluationMemory {
156159
.into_iter()
157160
.filter_map(|(k, v)| v.into_stored().map(|uuids| (k, uuids)))
158161
.collect();
159-
Ok(StoredMemoizationInfo { cache, uuids })
162+
Ok(StoredMemoizationInfo {
163+
cache,
164+
uuids,
165+
content_hash: None,
166+
})
160167
}
161168

162169
pub fn get_cache_entry(

0 commit comments

Comments
 (0)