diff --git a/apps/desktop/src/hooks/useKeywords.ts b/apps/desktop/src/hooks/useKeywords.ts index 8ef2275404..38fca47a05 100644 --- a/apps/desktop/src/hooks/useKeywords.ts +++ b/apps/desktop/src/hooks/useKeywords.ts @@ -101,9 +101,7 @@ const extractKeyphraseMatches = (phrase: Keyphrase): string[] => }); const combineKeywords = (markdownWords: string[]): string[] => - Array.from(new Set(markdownWords)).filter( - (keyword) => keyword.length >= 2, - ); + Array.from(new Set(markdownWords)).filter((keyword) => keyword.length >= 2); const removeCodeBlocks = (text: string): string => text.replace(/```[\s\S]*?```/g, "").replace(/`[^`]+`/g, ""); diff --git a/apps/desktop/src/store/tinybase/persister/chat-shortcuts/index.ts b/apps/desktop/src/store/tinybase/persister/chat-shortcuts/index.ts index b21ff27d49..ebc2dec864 100644 --- a/apps/desktop/src/store/tinybase/persister/chat-shortcuts/index.ts +++ b/apps/desktop/src/store/tinybase/persister/chat-shortcuts/index.ts @@ -18,4 +18,3 @@ export function useChatShortcutPersister(store: Store) { [], ); } - diff --git a/plugins/tantivy/src/ext.rs b/plugins/tantivy/src/ext.rs index af28ab738a..609b132b9f 100644 --- a/plugins/tantivy/src/ext.rs +++ b/plugins/tantivy/src/ext.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + use tantivy::collector::{Count, TopDocs}; use tantivy::query::{ BooleanQuery, BoostQuery, FuzzyTermQuery, Occur, PhraseQuery, Query, QueryParser, TermQuery, @@ -78,11 +81,12 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { pub async fn register_collection(&self, config: CollectionConfig) -> Result<(), crate::Error> { let base = self.manager.app_handle().path2().base()?; let index_path = base.join(&config.path); + let version_path = index_path.join("schema_version"); std::fs::create_dir_all(&index_path)?; let state = self.manager.state::(); - let mut guard = state.inner.lock().await; + let mut guard = state.inner.write().await; if guard.collections.contains_key(&config.name) { tracing::debug!("Collection '{}' already registered", config.name); @@ -90,12 +94,33 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { } let schema = (config.schema_builder)(); - let index = if index_path.join("meta.json").exists() { + + let needs_reindex = if index_path.join("meta.json").exists() { + let stored_version = std::fs::read_to_string(&version_path) + .ok() + .and_then(|s| s.trim().parse::().ok()) + .unwrap_or(0); + stored_version != config.schema_version + } else { + false + }; + + let index = if index_path.join("meta.json").exists() && !needs_reindex { Index::open_in_dir(&index_path)? } else { + if needs_reindex { + tracing::info!( + "Schema version changed for collection '{}', re-creating index", + config.name + ); + std::fs::remove_dir_all(&index_path)?; + std::fs::create_dir_all(&index_path)?; + } Index::create_in_dir(&index_path, schema.clone())? }; + std::fs::write(&version_path, config.schema_version.to_string())?; + register_tokenizers(&index); let reader = index @@ -110,6 +135,10 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { index, reader, writer, + auto_commit: config.auto_commit, + commit_interval_ms: config.commit_interval_ms, + pending_writes: AtomicU64::new(0), + last_commit: std::sync::Mutex::new(Instant::now()), }; guard @@ -117,9 +146,10 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { .insert(config.name.clone(), collection_index); tracing::info!( - "Tantivy collection '{}' registered at {:?}", + "Tantivy collection '{}' registered at {:?} (version: {})", config.name, - index_path + index_path, + config.schema_version ); Ok(()) } @@ -131,7 +161,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { pub async fn search(&self, request: SearchRequest) -> Result { let collection_name = Self::get_collection_name(request.collection); let state = self.manager.state::(); - let guard = state.inner.lock().await; + let guard = state.inner.read().await; let collection_index = guard .collections @@ -355,7 +385,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { pub async fn reindex(&self, collection: Option) -> Result<(), crate::Error> { let collection_name = Self::get_collection_name(collection); let state = self.manager.state::(); - let mut guard = state.inner.lock().await; + let mut guard = state.inner.write().await; let collection_index = guard .collections @@ -371,6 +401,9 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { writer.commit()?; + collection_index.pending_writes.store(0, Ordering::SeqCst); + *collection_index.last_commit.lock().unwrap() = Instant::now(); + tracing::info!( "Reindex completed for collection '{}'. Index cleared and ready for new documents. Fields: {:?}", collection_name, @@ -387,7 +420,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { ) -> Result<(), crate::Error> { let collection_name = Self::get_collection_name(collection); let state = self.manager.state::(); - let mut guard = state.inner.lock().await; + let mut guard = state.inner.write().await; let collection_index = guard .collections @@ -413,7 +446,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { } writer.add_document(doc)?; - writer.commit()?; + + collection_index + .pending_writes + .fetch_add(1, Ordering::SeqCst); + + let should_commit = if collection_index.auto_commit { + let last_commit = collection_index.last_commit.lock().unwrap(); + last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms) + } else { + true + }; + + if should_commit { + writer.commit()?; + collection_index.pending_writes.store(0, Ordering::SeqCst); + *collection_index.last_commit.lock().unwrap() = Instant::now(); + } tracing::debug!( "Added document '{}' to collection '{}'", @@ -431,7 +480,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { ) -> Result<(), crate::Error> { let collection_name = Self::get_collection_name(collection); let state = self.manager.state::(); - let mut guard = state.inner.lock().await; + let mut guard = state.inner.write().await; let collection_index = guard .collections @@ -460,7 +509,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { } writer.add_document(doc)?; - writer.commit()?; + + collection_index + .pending_writes + .fetch_add(1, Ordering::SeqCst); + + let should_commit = if collection_index.auto_commit { + let last_commit = collection_index.last_commit.lock().unwrap(); + last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms) + } else { + true + }; + + if should_commit { + writer.commit()?; + collection_index.pending_writes.store(0, Ordering::SeqCst); + *collection_index.last_commit.lock().unwrap() = Instant::now(); + } tracing::debug!( "Updated document '{}' in collection '{}'", @@ -478,7 +543,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { ) -> Result<(), crate::Error> { let collection_name = Self::get_collection_name(collection); let state = self.manager.state::(); - let mut guard = state.inner.lock().await; + let mut guard = state.inner.write().await; let collection_index = guard .collections @@ -491,7 +556,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { let id_term = Term::from_field_text(fields.id, &id); writer.delete_term(id_term); - writer.commit()?; + + collection_index + .pending_writes + .fetch_add(1, Ordering::SeqCst); + + let should_commit = if collection_index.auto_commit { + let last_commit = collection_index.last_commit.lock().unwrap(); + last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms) + } else { + true + }; + + if should_commit { + writer.commit()?; + collection_index.pending_writes.store(0, Ordering::SeqCst); + *collection_index.last_commit.lock().unwrap() = Instant::now(); + } tracing::debug!( "Removed document '{}' from collection '{}'", @@ -501,6 +582,31 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager> Tantivy<'a, R, M> { Ok(()) } + + pub async fn flush(&self, collection: Option) -> Result<(), crate::Error> { + let collection_name = Self::get_collection_name(collection); + let state = self.manager.state::(); + let mut guard = state.inner.write().await; + + let collection_index = guard + .collections + .get_mut(&collection_name) + .ok_or_else(|| crate::Error::CollectionNotFound(collection_name.clone()))?; + + let pending = collection_index.pending_writes.load(Ordering::SeqCst); + if pending > 0 { + collection_index.writer.commit()?; + collection_index.pending_writes.store(0, Ordering::SeqCst); + *collection_index.last_commit.lock().unwrap() = Instant::now(); + tracing::debug!( + "Flushed {} pending writes for collection '{}'", + pending, + collection_name + ); + } + + Ok(()) + } } pub trait TantivyPluginExt { diff --git a/plugins/tantivy/src/lib.rs b/plugins/tantivy/src/lib.rs index 4fa13699f0..66b7e31af1 100644 --- a/plugins/tantivy/src/lib.rs +++ b/plugins/tantivy/src/lib.rs @@ -7,10 +7,12 @@ mod tokenizer; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Instant; use tantivy::schema::Schema; use tantivy::{Index, IndexReader, IndexWriter}; use tauri::Manager; -use tokio::sync::Mutex; +use tokio::sync::RwLock; pub use error::{Error, Result}; pub use ext::*; @@ -99,11 +101,15 @@ pub struct SearchRequest { pub options: SearchOptions, } +pub const SCHEMA_VERSION: u32 = 1; + pub struct CollectionConfig { pub name: String, pub path: String, pub schema_builder: fn() -> Schema, pub auto_commit: bool, + pub commit_interval_ms: u64, + pub schema_version: u32, } pub struct CollectionIndex { @@ -111,6 +117,10 @@ pub struct CollectionIndex { pub index: Index, pub reader: IndexReader, pub writer: IndexWriter, + pub auto_commit: bool, + pub commit_interval_ms: u64, + pub pending_writes: AtomicU64, + pub last_commit: std::sync::Mutex, } pub struct IndexStateInner { @@ -126,13 +136,13 @@ impl Default for IndexStateInner { } pub struct IndexState { - pub inner: Mutex, + pub inner: RwLock, } impl Default for IndexState { fn default() -> Self { Self { - inner: Mutex::new(IndexStateInner::default()), + inner: RwLock::new(IndexStateInner::default()), } } } @@ -165,6 +175,8 @@ pub fn init() -> tauri::plugin::TauriPlugin { path: "search_index".to_string(), schema_builder: schema::build_schema, auto_commit: true, + commit_interval_ms: 1000, + schema_version: SCHEMA_VERSION, }; if let Err(e) = handle.tantivy().register_collection(config).await { @@ -174,6 +186,31 @@ pub fn init() -> tauri::plugin::TauriPlugin { Ok(()) }) + .on_event(|app, event| { + if let tauri::RunEvent::ExitRequested { .. } = event { + let state = app.state::(); + if let Ok(mut guard) = state.inner.try_write() { + for (name, collection) in guard.collections.iter_mut() { + let pending = collection.pending_writes.load(Ordering::SeqCst); + if pending > 0 { + if let Err(e) = collection.writer.commit() { + tracing::error!( + "Failed to flush pending writes for collection '{}': {}", + name, + e + ); + } else { + tracing::info!( + "Flushed {} pending writes for collection '{}' on exit", + pending, + name + ); + } + } + } + } + } + }) .build() }