Skip to content

Commit b406e05

Browse files
authored
feat(rows-to-retry): touch process ordinal on failure when needed (#1001)
1 parent 1a4c075 commit b406e05

File tree

3 files changed

+58
-6
lines changed

3 files changed

+58
-6
lines changed

src/execution/db_tracking.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,30 @@ pub async fn precommit_source_tracking_info(
184184
Ok(())
185185
}
186186

187+
pub async fn touch_max_process_ordinal(
188+
source_id: i32,
189+
source_key_json: &serde_json::Value,
190+
process_ordinal: i64,
191+
db_setup: &TrackingTableSetupState,
192+
db_executor: impl sqlx::Executor<'_, Database = sqlx::Postgres>,
193+
) -> Result<()> {
194+
let query_str = format!(
195+
"INSERT INTO {} AS t (source_id, source_key, max_process_ordinal, staging_target_keys) \
196+
VALUES ($1, $2, $3, $4) \
197+
ON CONFLICT (source_id, source_key) DO UPDATE SET \
198+
max_process_ordinal = GREATEST(t.max_process_ordinal + 1, EXCLUDED.max_process_ordinal)",
199+
db_setup.table_name,
200+
);
201+
sqlx::query(&query_str)
202+
.bind(source_id)
203+
.bind(source_key_json)
204+
.bind(process_ordinal)
205+
.bind(sqlx::types::Json(TrackedTargetKeyForSource::default()))
206+
.execute(db_executor)
207+
.await?;
208+
Ok(())
209+
}
210+
187211
#[derive(sqlx::FromRow, Debug)]
188212
pub struct SourceTrackingInfoForCommit {
189213
pub staging_target_keys: sqlx::types::Json<TrackedTargetKeyForSource>,

src/execution/row_indexer.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,13 @@ impl<'a> RowIndexer<'a> {
199199
src_eval_ctx: &'a SourceRowEvaluationContext<'_>,
200200
setup_execution_ctx: &'a exec_ctx::FlowSetupExecutionContext,
201201
mode: super::source_indexer::UpdateMode,
202+
process_time: chrono::DateTime<chrono::Utc>,
202203
update_stats: &'a stats::UpdateStats,
203204
pool: &'a PgPool,
204205
) -> Result<Self> {
205206
Ok(Self {
206207
source_id: setup_execution_ctx.import_ops[src_eval_ctx.import_op_idx].source_id,
207-
process_time: chrono::Utc::now(),
208+
process_time,
208209
source_key_json: serde_json::to_value(src_eval_ctx.key)?,
209210

210211
src_eval_ctx,
@@ -216,10 +217,11 @@ impl<'a> RowIndexer<'a> {
216217
}
217218

218219
pub async fn update_source_row(
219-
&mut self,
220+
&self,
220221
source_version: &SourceVersion,
221222
source_value: interface::SourceValue,
222223
source_version_fp: Option<Vec<u8>>,
224+
ordinal_touched: &mut bool,
223225
) -> Result<SkippedOr<()>> {
224226
let tracking_setup_state = &self.setup_execution_ctx.setup_state.tracking_table;
225227
// Phase 1: Check existing tracking info and apply optimizations
@@ -335,6 +337,7 @@ impl<'a> RowIndexer<'a> {
335337
}),
336338
)
337339
.await?;
340+
*ordinal_touched = true;
338341
let precommit_output = match precommit_output {
339342
SkippedOr::Normal(output) => output,
340343
SkippedOr::Skipped(v, fp) => return Ok(SkippedOr::Skipped(v, fp)),
@@ -398,7 +401,7 @@ impl<'a> RowIndexer<'a> {
398401
}
399402

400403
pub async fn try_collapse(
401-
&mut self,
404+
&self,
402405
source_version: &SourceVersion,
403406
content_version_fp: &[u8],
404407
existing_version: &SourceVersion,
@@ -538,7 +541,7 @@ impl<'a> RowIndexer<'a> {
538541
.map(|info| info.max_process_ordinal)
539542
.unwrap_or(0)
540543
+ 1)
541-
.max(self.process_time.timestamp_millis());
544+
.max(Self::process_ordinal_from_time(self.process_time));
542545
let existing_process_ordinal = tracking_info.as_ref().and_then(|info| info.process_ordinal);
543546

544547
let mut tracking_info_for_targets = HashMap::<i32, TrackingInfoForTarget>::new();
@@ -820,6 +823,10 @@ impl<'a> RowIndexer<'a> {
820823

821824
Ok(())
822825
}
826+
827+
pub fn process_ordinal_from_time(process_time: chrono::DateTime<chrono::Utc>) -> i64 {
828+
process_time.timestamp_millis()
829+
}
823830
}
824831

825832
pub async fn evaluate_source_entry_with_memory(

src/execution/source_indexer.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ pub struct SourceIndexingContext {
6363
update_sem: Semaphore,
6464
state: Mutex<SourceIndexingState>,
6565
setup_execution_ctx: Arc<exec_ctx::FlowSetupExecutionContext>,
66+
needs_to_track_rows_to_retry: bool,
6667
}
6768

6869
pub const NO_ACK: Option<fn() -> Ready<Result<()>>> = None;
@@ -296,6 +297,7 @@ impl SourceIndexingContext {
296297
Ok(Self {
297298
flow,
298299
source_idx,
300+
needs_to_track_rows_to_retry: rows_to_retry.is_some(),
299301
state: Mutex::new(SourceIndexingState {
300302
rows,
301303
scan_generation,
@@ -331,17 +333,20 @@ impl SourceIndexingContext {
331333
key: &row_input.key,
332334
import_op_idx: self.source_idx,
333335
};
334-
let mut row_indexer = row_indexer::RowIndexer::new(
336+
let process_time = chrono::Utc::now();
337+
let row_indexer = row_indexer::RowIndexer::new(
335338
&eval_ctx,
336339
&self.setup_execution_ctx,
337340
mode,
341+
process_time,
338342
&update_stats,
339343
&pool,
340344
)?;
341345

342346
let source_data = row_input.data;
343347
let mut row_state_operator =
344348
LocalSourceRowStateOperator::new(&row_input.key, &self.state, &update_stats);
349+
let mut ordinal_touched = false;
345350
let result = {
346351
let row_state_operator = &mut row_state_operator;
347352
let row_key = &row_input.key;
@@ -434,7 +439,12 @@ impl SourceIndexingContext {
434439
}
435440

436441
let result = row_indexer
437-
.update_source_row(&source_version, value, content_version_fp.clone())
442+
.update_source_row(
443+
&source_version,
444+
value,
445+
content_version_fp.clone(),
446+
&mut ordinal_touched,
447+
)
438448
.await?;
439449
if let SkippedOr::Skipped(version, fp) = result {
440450
row_state_operator
@@ -449,6 +459,17 @@ impl SourceIndexingContext {
449459
row_state_operator.commit();
450460
} else {
451461
row_state_operator.rollback();
462+
if !ordinal_touched && self.needs_to_track_rows_to_retry {
463+
let source_key_json = serde_json::to_value(&row_input.key)?;
464+
db_tracking::touch_max_process_ordinal(
465+
self.setup_execution_ctx.import_ops[self.source_idx].source_id,
466+
&source_key_json,
467+
row_indexer::RowIndexer::process_ordinal_from_time(process_time),
468+
&self.setup_execution_ctx.setup_state.tracking_table,
469+
&pool,
470+
)
471+
.await?;
472+
}
452473
}
453474
result
454475
};

0 commit comments

Comments
 (0)