Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions apps/desktop/src/hooks/useKeywords.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ const extractKeyphraseMatches = (phrase: Keyphrase): string[] =>
});

const combineKeywords = (markdownWords: string[]): string[] =>
Array.from(new Set(markdownWords)).filter(
(keyword) => keyword.length >= 2,
);
Array.from(new Set(markdownWords)).filter((keyword) => keyword.length >= 2);

const removeCodeBlocks = (text: string): string =>
text.replace(/```[\s\S]*?```/g, "").replace(/`[^`]+`/g, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ export function useChatShortcutPersister(store: Store) {
[],
);
}

130 changes: 118 additions & 12 deletions plugins/tantivy/src/ext.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};

use tantivy::collector::{Count, TopDocs};
use tantivy::query::{
BooleanQuery, BoostQuery, FuzzyTermQuery, Occur, PhraseQuery, Query, QueryParser, TermQuery,
Expand Down Expand Up @@ -78,24 +81,46 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
pub async fn register_collection(&self, config: CollectionConfig) -> Result<(), crate::Error> {
let base = self.manager.app_handle().path2().base()?;
let index_path = base.join(&config.path);
let version_path = index_path.join("schema_version");

std::fs::create_dir_all(&index_path)?;

let state = self.manager.state::<IndexState>();
let mut guard = state.inner.lock().await;
let mut guard = state.inner.write().await;

if guard.collections.contains_key(&config.name) {
tracing::debug!("Collection '{}' already registered", config.name);
return Ok(());
}

let schema = (config.schema_builder)();
let index = if index_path.join("meta.json").exists() {

let needs_reindex = if index_path.join("meta.json").exists() {
let stored_version = std::fs::read_to_string(&version_path)
.ok()
.and_then(|s| s.trim().parse::<u32>().ok())
.unwrap_or(0);
stored_version != config.schema_version
} else {
false
};

let index = if index_path.join("meta.json").exists() && !needs_reindex {
Index::open_in_dir(&index_path)?
} else {
if needs_reindex {
tracing::info!(
"Schema version changed for collection '{}', re-creating index",
config.name
);
std::fs::remove_dir_all(&index_path)?;
std::fs::create_dir_all(&index_path)?;
}
Index::create_in_dir(&index_path, schema.clone())?
};

std::fs::write(&version_path, config.schema_version.to_string())?;

register_tokenizers(&index);

let reader = index
Expand All @@ -110,16 +135,21 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
index,
reader,
writer,
auto_commit: config.auto_commit,
commit_interval_ms: config.commit_interval_ms,
pending_writes: AtomicU64::new(0),
last_commit: std::sync::Mutex::new(Instant::now()),
};

guard
.collections
.insert(config.name.clone(), collection_index);

tracing::info!(
"Tantivy collection '{}' registered at {:?}",
"Tantivy collection '{}' registered at {:?} (version: {})",
config.name,
index_path
index_path,
config.schema_version
);
Ok(())
}
Expand All @@ -131,7 +161,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
pub async fn search(&self, request: SearchRequest) -> Result<SearchResult, crate::Error> {
let collection_name = Self::get_collection_name(request.collection);
let state = self.manager.state::<IndexState>();
let guard = state.inner.lock().await;
let guard = state.inner.read().await;

let collection_index = guard
.collections
Expand Down Expand Up @@ -355,7 +385,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
pub async fn reindex(&self, collection: Option<String>) -> Result<(), crate::Error> {
let collection_name = Self::get_collection_name(collection);
let state = self.manager.state::<IndexState>();
let mut guard = state.inner.lock().await;
let mut guard = state.inner.write().await;

let collection_index = guard
.collections
Expand All @@ -371,6 +401,9 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {

writer.commit()?;

collection_index.pending_writes.store(0, Ordering::SeqCst);
*collection_index.last_commit.lock().unwrap() = Instant::now();

tracing::info!(
"Reindex completed for collection '{}'. Index cleared and ready for new documents. Fields: {:?}",
collection_name,
Expand All @@ -387,7 +420,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
) -> Result<(), crate::Error> {
let collection_name = Self::get_collection_name(collection);
let state = self.manager.state::<IndexState>();
let mut guard = state.inner.lock().await;
let mut guard = state.inner.write().await;

let collection_index = guard
.collections
Expand All @@ -413,7 +446,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
}

writer.add_document(doc)?;
writer.commit()?;

collection_index
.pending_writes
.fetch_add(1, Ordering::SeqCst);

let should_commit = if collection_index.auto_commit {
let last_commit = collection_index.last_commit.lock().unwrap();
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
} else {
true
};

if should_commit {
writer.commit()?;
collection_index.pending_writes.store(0, Ordering::SeqCst);
*collection_index.last_commit.lock().unwrap() = Instant::now();
}

tracing::debug!(
"Added document '{}' to collection '{}'",
Expand All @@ -431,7 +480,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
) -> Result<(), crate::Error> {
let collection_name = Self::get_collection_name(collection);
let state = self.manager.state::<IndexState>();
let mut guard = state.inner.lock().await;
let mut guard = state.inner.write().await;

let collection_index = guard
.collections
Expand Down Expand Up @@ -460,7 +509,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
}

writer.add_document(doc)?;
writer.commit()?;

collection_index
.pending_writes
.fetch_add(1, Ordering::SeqCst);

let should_commit = if collection_index.auto_commit {
let last_commit = collection_index.last_commit.lock().unwrap();
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
} else {
true
};

if should_commit {
writer.commit()?;
collection_index.pending_writes.store(0, Ordering::SeqCst);
*collection_index.last_commit.lock().unwrap() = Instant::now();
}

tracing::debug!(
"Updated document '{}' in collection '{}'",
Expand All @@ -478,7 +543,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
) -> Result<(), crate::Error> {
let collection_name = Self::get_collection_name(collection);
let state = self.manager.state::<IndexState>();
let mut guard = state.inner.lock().await;
let mut guard = state.inner.write().await;

let collection_index = guard
.collections
Expand All @@ -491,7 +556,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {

let id_term = Term::from_field_text(fields.id, &id);
writer.delete_term(id_term);
writer.commit()?;

collection_index
.pending_writes
.fetch_add(1, Ordering::SeqCst);

let should_commit = if collection_index.auto_commit {
let last_commit = collection_index.last_commit.lock().unwrap();
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
} else {
true
};

if should_commit {
writer.commit()?;
collection_index.pending_writes.store(0, Ordering::SeqCst);
*collection_index.last_commit.lock().unwrap() = Instant::now();
}

tracing::debug!(
"Removed document '{}' from collection '{}'",
Expand All @@ -501,6 +582,31 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {

Ok(())
}

pub async fn flush(&self, collection: Option<String>) -> Result<(), crate::Error> {
let collection_name = Self::get_collection_name(collection);
let state = self.manager.state::<IndexState>();
let mut guard = state.inner.write().await;

let collection_index = guard
.collections
.get_mut(&collection_name)
.ok_or_else(|| crate::Error::CollectionNotFound(collection_name.clone()))?;

let pending = collection_index.pending_writes.load(Ordering::SeqCst);
if pending > 0 {
collection_index.writer.commit()?;
collection_index.pending_writes.store(0, Ordering::SeqCst);
*collection_index.last_commit.lock().unwrap() = Instant::now();
tracing::debug!(
"Flushed {} pending writes for collection '{}'",
pending,
collection_name
);
}

Ok(())
}
}

pub trait TantivyPluginExt<R: tauri::Runtime> {
Expand Down
43 changes: 40 additions & 3 deletions plugins/tantivy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ mod tokenizer;

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use tantivy::schema::Schema;
use tantivy::{Index, IndexReader, IndexWriter};
use tauri::Manager;
use tokio::sync::Mutex;
use tokio::sync::RwLock;

pub use error::{Error, Result};
pub use ext::*;
Expand Down Expand Up @@ -99,18 +101,26 @@ pub struct SearchRequest {
pub options: SearchOptions,
}

pub const SCHEMA_VERSION: u32 = 1;

pub struct CollectionConfig {
pub name: String,
pub path: String,
pub schema_builder: fn() -> Schema,
pub auto_commit: bool,
pub commit_interval_ms: u64,
pub schema_version: u32,
}

pub struct CollectionIndex {
pub schema: Schema,
pub index: Index,
pub reader: IndexReader,
pub writer: IndexWriter,
pub auto_commit: bool,
pub commit_interval_ms: u64,
pub pending_writes: AtomicU64,
pub last_commit: std::sync::Mutex<Instant>,
}

pub struct IndexStateInner {
Expand All @@ -126,13 +136,13 @@ impl Default for IndexStateInner {
}

pub struct IndexState {
pub inner: Mutex<IndexStateInner>,
pub inner: RwLock<IndexStateInner>,
}

impl Default for IndexState {
fn default() -> Self {
Self {
inner: Mutex::new(IndexStateInner::default()),
inner: RwLock::new(IndexStateInner::default()),
}
}
}
Expand Down Expand Up @@ -165,6 +175,8 @@ pub fn init() -> tauri::plugin::TauriPlugin<tauri::Wry> {
path: "search_index".to_string(),
schema_builder: schema::build_schema,
auto_commit: true,
commit_interval_ms: 1000,
schema_version: SCHEMA_VERSION,
};

if let Err(e) = handle.tantivy().register_collection(config).await {
Expand All @@ -174,6 +186,31 @@ pub fn init() -> tauri::plugin::TauriPlugin<tauri::Wry> {

Ok(())
})
.on_event(|app, event| {
if let tauri::RunEvent::ExitRequested { .. } = event {
let state = app.state::<IndexState>();
if let Ok(mut guard) = state.inner.try_write() {
for (name, collection) in guard.collections.iter_mut() {
let pending = collection.pending_writes.load(Ordering::SeqCst);
if pending > 0 {
if let Err(e) = collection.writer.commit() {
tracing::error!(
"Failed to flush pending writes for collection '{}': {}",
name,
e
);
} else {
tracing::info!(
"Flushed {} pending writes for collection '{}' on exit",
pending,
name
);
}
}
}
}
}
})
.build()
}

Expand Down
Loading