Skip to content

Commit e20bc54

Browse files
committed
update source tracking logic to include process ordinal in database updates
1 parent fb9bb40 commit e20bc54

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

src/execution/db_tracking.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ pub async fn update_source_tracking_ordinal_and_logic(
312312
source_key_json: &serde_json::Value,
313313
processed_source_ordinal: Option<i64>,
314314
process_logic_fingerprint: &[u8],
315+
process_ordinal: i64,
315316
process_time_micros: i64,
316317
db_setup: &TrackingTableSetupState,
317318
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
@@ -320,7 +321,8 @@ pub async fn update_source_tracking_ordinal_and_logic(
320321
"UPDATE {} SET \
321322
processed_source_ordinal = $3, \
322323
process_logic_fingerprint = $4, \
323-
process_time_micros = $5 \
324+
process_ordinal = $5, \
325+
process_time_micros = $6 \
324326
WHERE source_id = $1 AND source_key = $2",
325327
db_setup.table_name
326328
);
@@ -329,7 +331,8 @@ pub async fn update_source_tracking_ordinal_and_logic(
329331
.bind(source_key_json) // $2
330332
.bind(processed_source_ordinal) // $3
331333
.bind(process_logic_fingerprint) // $4
332-
.bind(process_time_micros) // $5
334+
.bind(process_ordinal) // $5
335+
.bind(process_time_micros) // $6
333336
.execute(db_executor)
334337
.await?;
335338
Ok(())

src/execution/row_indexer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -634,11 +634,15 @@ pub async fn update_source_row(
634634
.and_then(|info| info.process_ordinal);
635635
if current_info.process_ordinal == original_process_ordinal {
636636
// Safe to apply optimization - just update tracking table
637+
let new_process_ordinal = (current_info.max_process_ordinal + 1)
638+
.max(process_time.timestamp_millis());
639+
637640
db_tracking::update_source_tracking_ordinal_and_logic(
638641
src_eval_ctx.import_op.source_id,
639642
&source_key_json,
640643
source_version.ordinal.0,
641644
&src_eval_ctx.plan.logic_fingerprint.0,
645+
new_process_ordinal,
642646
process_time.timestamp_micros(),
643647
&src_eval_ctx.plan.tracking_table_setup,
644648
&mut *txn,

0 commit comments

Comments
 (0)