Skip to content

Commit ed9d6bc

Browse files
authored
fix(ack): always ack change event unless there's processing error (#769)
1 parent 90f8e24 commit ed9d6bc

File tree

1 file changed

+56
-54
lines changed

1 file changed

+56
-54
lines changed

src/execution/source_indexer.rs

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -169,73 +169,75 @@ impl SourceIndexingContext {
169169
}
170170
};
171171

172+
let _processing_permit = processing_sem.acquire().await?;
173+
let result = row_indexer::update_source_row(
174+
&SourceRowEvaluationContext {
175+
plan: &plan,
176+
import_op,
177+
schema,
178+
key: &key,
179+
import_op_idx: self.source_idx,
180+
},
181+
&self.setup_execution_ctx,
182+
source_data.value,
183+
&source_version,
184+
&pool,
185+
&update_stats,
186+
)
187+
.await?;
188+
let target_source_version = if let SkippedOr::Skipped(existing_source_version) = result
172189
{
173-
let _processing_permit = processing_sem.acquire().await?;
174-
let result = row_indexer::update_source_row(
175-
&SourceRowEvaluationContext {
176-
plan: &plan,
177-
import_op,
178-
schema,
179-
key: &key,
180-
import_op_idx: self.source_idx,
181-
},
182-
&self.setup_execution_ctx,
183-
source_data.value,
184-
&source_version,
185-
&pool,
186-
&update_stats,
187-
)
188-
.await?;
189-
let target_source_version =
190-
if let SkippedOr::Skipped(existing_source_version) = result {
191-
Some(existing_source_version)
192-
} else if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
193-
Some(source_version)
194-
} else {
195-
None
196-
};
197-
if let Some(target_source_version) = target_source_version {
198-
let mut state = self.state.lock().unwrap();
199-
let scan_generation = state.scan_generation;
200-
let entry = state.rows.entry(key.clone());
201-
match entry {
202-
hash_map::Entry::Occupied(mut entry) => {
203-
if !entry
204-
.get()
205-
.source_version
206-
.should_skip(&target_source_version, None)
207-
{
208-
if target_source_version.kind
209-
== row_indexer::SourceVersionKind::NonExistence
210-
{
211-
entry.remove();
212-
} else {
213-
let mut_entry = entry.get_mut();
214-
mut_entry.source_version = target_source_version;
215-
mut_entry.touched_generation = scan_generation;
216-
}
217-
}
218-
}
219-
hash_map::Entry::Vacant(entry) => {
190+
Some(existing_source_version)
191+
} else if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
192+
Some(source_version)
193+
} else {
194+
None
195+
};
196+
if let Some(target_source_version) = target_source_version {
197+
let mut state = self.state.lock().unwrap();
198+
let scan_generation = state.scan_generation;
199+
let entry = state.rows.entry(key.clone());
200+
match entry {
201+
hash_map::Entry::Occupied(mut entry) => {
202+
if !entry
203+
.get()
204+
.source_version
205+
.should_skip(&target_source_version, None)
206+
{
220207
if target_source_version.kind
221-
!= row_indexer::SourceVersionKind::NonExistence
208+
== row_indexer::SourceVersionKind::NonExistence
222209
{
223-
entry.insert(SourceRowIndexingState {
224-
source_version: target_source_version,
225-
touched_generation: scan_generation,
226-
..Default::default()
227-
});
210+
entry.remove();
211+
} else {
212+
let mut_entry = entry.get_mut();
213+
mut_entry.source_version = target_source_version;
214+
mut_entry.touched_generation = scan_generation;
228215
}
229216
}
230217
}
218+
hash_map::Entry::Vacant(entry) => {
219+
if target_source_version.kind
220+
!= row_indexer::SourceVersionKind::NonExistence
221+
{
222+
entry.insert(SourceRowIndexingState {
223+
source_version: target_source_version,
224+
touched_generation: scan_generation,
225+
..Default::default()
226+
});
227+
}
228+
}
231229
}
232230
}
231+
anyhow::Ok(())
232+
};
233+
let process_and_ack = async {
234+
process.await?;
233235
if let Some(ack_fn) = ack_fn {
234236
ack_fn().await?;
235237
}
236238
anyhow::Ok(())
237239
};
238-
if let Err(e) = process.await {
240+
if let Err(e) = process_and_ack.await {
239241
update_stats.num_errors.inc(1);
240242
error!(
241243
"{:?}",

0 commit comments

Comments
 (0)