diff --git a/examples/text_embedding/main.py b/examples/text_embedding/main.py index 07e83f031..af99585a4 100644 --- a/examples/text_embedding/main.py +++ b/examples/text_embedding/main.py @@ -6,6 +6,7 @@ import os from numpy.typing import NDArray import numpy as np +from datetime import timedelta @cocoindex.transform_flow() @@ -38,7 +39,8 @@ def text_embedding_flow( Define an example flow that embeds text into a vector database. """ data_scope["documents"] = flow_builder.add_source( - cocoindex.sources.LocalFile(path="markdown_files") + cocoindex.sources.LocalFile(path="markdown_files"), + refresh_interval=timedelta(seconds=5), ) doc_embeddings = data_scope.add_collector() diff --git a/src/execution/source_indexer.rs b/src/execution/source_indexer.rs index 33b8198e2..5982425b5 100644 --- a/src/execution/source_indexer.rs +++ b/src/execution/source_indexer.rs @@ -125,15 +125,39 @@ impl SourceIndexingContext { let source_version = SourceVersion::from_current_data(&source_data); let processing_sem = { let mut state = self.state.lock().unwrap(); - let row_state = state.rows.entry(key.clone()).or_default(); - if row_state - .source_version - .should_skip(&source_version, Some(update_stats.as_ref())) - { - return anyhow::Ok(()); + let touched_generation = state.scan_generation; + match state.rows.entry(key.clone()) { + hash_map::Entry::Occupied(mut entry) => { + if entry + .get() + .source_version + .should_skip(&source_version, Some(update_stats.as_ref())) + { + return anyhow::Ok(()); + } + let sem = entry.get().processing_sem.clone(); + if source_version.kind == row_indexer::SourceVersionKind::NonExistence { + entry.remove(); + } else { + entry.get_mut().source_version = source_version.clone(); + } + sem + } + hash_map::Entry::Vacant(entry) => { + if source_version.kind == row_indexer::SourceVersionKind::NonExistence { + update_stats.num_deletions.inc(1); + return anyhow::Ok(()); + } + let new_entry = SourceRowIndexingState { + source_version: source_version.clone(), + touched_generation, + ..Default::default() + }; + let sem = new_entry.processing_sem.clone(); + entry.insert(new_entry); + sem + } } - row_state.source_version = source_version.clone(); - row_state.processing_sem.clone() }; let permit = processing_sem.acquire().await?; @@ -183,11 +207,15 @@ impl SourceIndexingContext { } } hash_map::Entry::Vacant(entry) => { - entry.insert(SourceRowIndexingState { - source_version: target_source_version, - touched_generation: scan_generation, - ..Default::default() - }); + if target_source_version.kind + != row_indexer::SourceVersionKind::NonExistence + { + entry.insert(SourceRowIndexingState { + source_version: target_source_version, + touched_generation: scan_generation, + ..Default::default() + }); + } } } } @@ -277,15 +305,12 @@ impl SourceIndexingContext { let deleted_key_versions = { let mut deleted_key_versions = Vec::new(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); for (key, row_state) in state.rows.iter() { if row_state.touched_generation < scan_generation { deleted_key_versions.push((key.clone(), row_state.source_version.ordinal)); } } - for (key, _) in deleted_key_versions.iter() { - state.rows.remove(key); - } deleted_key_versions }; for (key, source_ordinal) in deleted_key_versions {