Skip to content

Commit 7105afc

Browse files
authored
fix: fully clear in-memory states for deleted source entry (#686)
1 parent 03fce6d commit 7105afc

File tree

2 files changed

+45
-18
lines changed

2 files changed

+45
-18
lines changed

examples/text_embedding/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
from numpy.typing import NDArray
88
import numpy as np
9+
from datetime import timedelta
910

1011

1112
@cocoindex.transform_flow()
@@ -38,7 +39,8 @@ def text_embedding_flow(
3839
Define an example flow that embeds text into a vector database.
3940
"""
4041
data_scope["documents"] = flow_builder.add_source(
41-
cocoindex.sources.LocalFile(path="markdown_files")
42+
cocoindex.sources.LocalFile(path="markdown_files"),
43+
refresh_interval=timedelta(seconds=5),
4244
)
4345

4446
doc_embeddings = data_scope.add_collector()

src/execution/source_indexer.rs

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,39 @@ impl SourceIndexingContext {
125125
let source_version = SourceVersion::from_current_data(&source_data);
126126
let processing_sem = {
127127
let mut state = self.state.lock().unwrap();
128-
let row_state = state.rows.entry(key.clone()).or_default();
129-
if row_state
130-
.source_version
131-
.should_skip(&source_version, Some(update_stats.as_ref()))
132-
{
133-
return anyhow::Ok(());
128+
let touched_generation = state.scan_generation;
129+
match state.rows.entry(key.clone()) {
130+
hash_map::Entry::Occupied(mut entry) => {
131+
if entry
132+
.get()
133+
.source_version
134+
.should_skip(&source_version, Some(update_stats.as_ref()))
135+
{
136+
return anyhow::Ok(());
137+
}
138+
let sem = entry.get().processing_sem.clone();
139+
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
140+
entry.remove();
141+
} else {
142+
entry.get_mut().source_version = source_version.clone();
143+
}
144+
sem
145+
}
146+
hash_map::Entry::Vacant(entry) => {
147+
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
148+
update_stats.num_deletions.inc(1);
149+
return anyhow::Ok(());
150+
}
151+
let new_entry = SourceRowIndexingState {
152+
source_version: source_version.clone(),
153+
touched_generation,
154+
..Default::default()
155+
};
156+
let sem = new_entry.processing_sem.clone();
157+
entry.insert(new_entry);
158+
sem
159+
}
134160
}
135-
row_state.source_version = source_version.clone();
136-
row_state.processing_sem.clone()
137161
};
138162

139163
let permit = processing_sem.acquire().await?;
@@ -183,11 +207,15 @@ impl SourceIndexingContext {
183207
}
184208
}
185209
hash_map::Entry::Vacant(entry) => {
186-
entry.insert(SourceRowIndexingState {
187-
source_version: target_source_version,
188-
touched_generation: scan_generation,
189-
..Default::default()
190-
});
210+
if target_source_version.kind
211+
!= row_indexer::SourceVersionKind::NonExistence
212+
{
213+
entry.insert(SourceRowIndexingState {
214+
source_version: target_source_version,
215+
touched_generation: scan_generation,
216+
..Default::default()
217+
});
218+
}
191219
}
192220
}
193221
}
@@ -277,15 +305,12 @@ impl SourceIndexingContext {
277305

278306
let deleted_key_versions = {
279307
let mut deleted_key_versions = Vec::new();
280-
let mut state = self.state.lock().unwrap();
308+
let state = self.state.lock().unwrap();
281309
for (key, row_state) in state.rows.iter() {
282310
if row_state.touched_generation < scan_generation {
283311
deleted_key_versions.push((key.clone(), row_state.source_version.ordinal));
284312
}
285313
}
286-
for (key, _) in deleted_key_versions.iter() {
287-
state.rows.remove(key);
288-
}
289314
deleted_key_versions
290315
};
291316
for (key, source_ordinal) in deleted_key_versions {

0 commit comments

Comments
 (0)