Skip to content

Commit f2daf84

Browse files
committed
fix: fully clear in-memory states for deleted source entry
1 parent 03fce6d commit f2daf84

File tree

1 file changed

+40
-17
lines changed

1 file changed

+40
-17
lines changed

src/execution/source_indexer.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,37 @@ impl SourceIndexingContext {
125125
let source_version = SourceVersion::from_current_data(&source_data);
126126
let processing_sem = {
127127
let mut state = self.state.lock().unwrap();
128-
let row_state = state.rows.entry(key.clone()).or_default();
129-
if row_state
130-
.source_version
131-
.should_skip(&source_version, Some(update_stats.as_ref()))
132-
{
133-
return anyhow::Ok(());
128+
match state.rows.entry(key.clone()) {
129+
hash_map::Entry::Occupied(mut entry) => {
130+
if entry
131+
.get()
132+
.source_version
133+
.should_skip(&source_version, Some(update_stats.as_ref()))
134+
{
135+
return anyhow::Ok(());
136+
}
137+
let sem = entry.get().processing_sem.clone();
138+
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
139+
entry.remove();
140+
} else {
141+
entry.get_mut().source_version = source_version.clone();
142+
}
143+
sem
144+
}
145+
hash_map::Entry::Vacant(entry) => {
146+
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
147+
update_stats.num_deletions.inc(1);
148+
return anyhow::Ok(());
149+
}
150+
let new_entry = SourceRowIndexingState {
151+
source_version: source_version.clone(),
152+
..Default::default()
153+
};
154+
let sem = new_entry.processing_sem.clone();
155+
entry.insert(new_entry);
156+
sem
157+
}
134158
}
135-
row_state.source_version = source_version.clone();
136-
row_state.processing_sem.clone()
137159
};
138160

139161
let permit = processing_sem.acquire().await?;
@@ -183,11 +205,15 @@ impl SourceIndexingContext {
183205
}
184206
}
185207
hash_map::Entry::Vacant(entry) => {
186-
entry.insert(SourceRowIndexingState {
187-
source_version: target_source_version,
188-
touched_generation: scan_generation,
189-
..Default::default()
190-
});
208+
if target_source_version.kind
209+
!= row_indexer::SourceVersionKind::NonExistence
210+
{
211+
entry.insert(SourceRowIndexingState {
212+
source_version: target_source_version,
213+
touched_generation: scan_generation,
214+
..Default::default()
215+
});
216+
}
191217
}
192218
}
193219
}
@@ -277,15 +303,12 @@ impl SourceIndexingContext {
277303

278304
let deleted_key_versions = {
279305
let mut deleted_key_versions = Vec::new();
280-
let mut state = self.state.lock().unwrap();
306+
let state = self.state.lock().unwrap();
281307
for (key, row_state) in state.rows.iter() {
282308
if row_state.touched_generation < scan_generation {
283309
deleted_key_versions.push((key.clone(), row_state.source_version.ordinal));
284310
}
285311
}
286-
for (key, _) in deleted_key_versions.iter() {
287-
state.rows.remove(key);
288-
}
289312
deleted_key_versions
290313
};
291314
for (key, source_ordinal) in deleted_key_versions {

0 commit comments

Comments
 (0)