diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 45bcec819..8f85c747c 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -169,73 +169,75 @@ impl SourceIndexingContext { } }; + let _processing_permit = processing_sem.acquire().await?; + let result = row_indexer::update_source_row( + &SourceRowEvaluationContext { + plan: &plan, + import_op, + schema, + key: &key, + import_op_idx: self.source_idx, + }, + &self.setup_execution_ctx, + source_data.value, + &source_version, + &pool, + &update_stats, + ) + .await?; + let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result { - let _processing_permit = processing_sem.acquire().await?; - let result = row_indexer::update_source_row( - &SourceRowEvaluationContext { - plan: &plan, - import_op, - schema, - key: &key, - import_op_idx: self.source_idx, - }, - &self.setup_execution_ctx, - source_data.value, - &source_version, - &pool, - &update_stats, - ) - .await?; - let target_source_version = - if let SkippedOr::Skipped(existing_source_version) = result { - Some(existing_source_version) - } else if source_version.kind == row_indexer::SourceVersionKind::NonExistence { - Some(source_version) - } else { - None - }; - if let Some(target_source_version) = target_source_version { - let mut state = self.state.lock().unwrap(); - let scan_generation = state.scan_generation; - let entry = state.rows.entry(key.clone()); - match entry { - hash_map::Entry::Occupied(mut entry) => { - if !entry - .get() - .source_version - .should_skip(&target_source_version, None) - { - if target_source_version.kind - == row_indexer::SourceVersionKind::NonExistence - { - entry.remove(); - } else { - let mut_entry = entry.get_mut(); - mut_entry.source_version = target_source_version; - mut_entry.touched_generation = scan_generation; - } - } - } - hash_map::Entry::Vacant(entry) => { + Some(existing_source_version) + } else if source_version.kind == row_indexer::SourceVersionKind::NonExistence { + Some(source_version) + } else { + None + }; + if let Some(target_source_version) = target_source_version { + let mut state = self.state.lock().unwrap(); + let scan_generation = state.scan_generation; + let entry = state.rows.entry(key.clone()); + match entry { + hash_map::Entry::Occupied(mut entry) => { + if !entry + .get() + .source_version + .should_skip(&target_source_version, None) + { if target_source_version.kind - != row_indexer::SourceVersionKind::NonExistence + == row_indexer::SourceVersionKind::NonExistence { - entry.insert(SourceRowIndexingState { - source_version: target_source_version, - touched_generation: scan_generation, - ..Default::default() - }); + entry.remove(); + } else { + let mut_entry = entry.get_mut(); + mut_entry.source_version = target_source_version; + mut_entry.touched_generation = scan_generation; } } } + hash_map::Entry::Vacant(entry) => { + if target_source_version.kind + != row_indexer::SourceVersionKind::NonExistence + { + entry.insert(SourceRowIndexingState { + source_version: target_source_version, + touched_generation: scan_generation, + ..Default::default() + }); + } + } } } + anyhow::Ok(()) + }; + let process_and_ack = async { + process.await?; if let Some(ack_fn) = ack_fn { ack_fn().await?; } anyhow::Ok(()) }; - if let Err(e) = process.await { + if let Err(e) = process_and_ack.await { update_stats.num_errors.inc(1); error!( "{:?}",