Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/text_embedding/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import os
from numpy.typing import NDArray
import numpy as np
from datetime import timedelta


@cocoindex.transform_flow()
Expand Down Expand Up @@ -38,7 +39,8 @@ def text_embedding_flow(
Define an example flow that embeds text into a vector database.
"""
data_scope["documents"] = flow_builder.add_source(
cocoindex.sources.LocalFile(path="markdown_files")
cocoindex.sources.LocalFile(path="markdown_files"),
refresh_interval=timedelta(seconds=5),
)

doc_embeddings = data_scope.add_collector()
Expand Down
59 changes: 42 additions & 17 deletions src/execution/source_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,39 @@ impl SourceIndexingContext {
let source_version = SourceVersion::from_current_data(&source_data);
let processing_sem = {
let mut state = self.state.lock().unwrap();
let row_state = state.rows.entry(key.clone()).or_default();
if row_state
.source_version
.should_skip(&source_version, Some(update_stats.as_ref()))
{
return anyhow::Ok(());
let touched_generation = state.scan_generation;
match state.rows.entry(key.clone()) {
hash_map::Entry::Occupied(mut entry) => {
if entry
.get()
.source_version
.should_skip(&source_version, Some(update_stats.as_ref()))
{
return anyhow::Ok(());
}
let sem = entry.get().processing_sem.clone();
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
entry.remove();
} else {
entry.get_mut().source_version = source_version.clone();
}
sem
}
hash_map::Entry::Vacant(entry) => {
if source_version.kind == row_indexer::SourceVersionKind::NonExistence {
update_stats.num_deletions.inc(1);
return anyhow::Ok(());
}
let new_entry = SourceRowIndexingState {
source_version: source_version.clone(),
touched_generation,
..Default::default()
};
let sem = new_entry.processing_sem.clone();
entry.insert(new_entry);
sem
}
}
row_state.source_version = source_version.clone();
row_state.processing_sem.clone()
};

let permit = processing_sem.acquire().await?;
Expand Down Expand Up @@ -183,11 +207,15 @@ impl SourceIndexingContext {
}
}
hash_map::Entry::Vacant(entry) => {
entry.insert(SourceRowIndexingState {
source_version: target_source_version,
touched_generation: scan_generation,
..Default::default()
});
if target_source_version.kind
!= row_indexer::SourceVersionKind::NonExistence
{
entry.insert(SourceRowIndexingState {
source_version: target_source_version,
touched_generation: scan_generation,
..Default::default()
});
}
}
}
}
Expand Down Expand Up @@ -277,15 +305,12 @@ impl SourceIndexingContext {

let deleted_key_versions = {
let mut deleted_key_versions = Vec::new();
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();
for (key, row_state) in state.rows.iter() {
if row_state.touched_generation < scan_generation {
deleted_key_versions.push((key.clone(), row_state.source_version.ordinal));
}
}
for (key, _) in deleted_key_versions.iter() {
state.rows.remove(key);
}
deleted_key_versions
};
for (key, source_ordinal) in deleted_key_versions {
Expand Down