Skip to content

Commit 15dfbab

Browse files
feat(tantivy): improve batching, concurrency, and index versioning (#2807)
Co-authored-by: yujonglee <[email protected]> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 8252bc7 commit 15dfbab

File tree

2 files changed

+158
-15
lines changed

2 files changed

+158
-15
lines changed

plugins/tantivy/src/ext.rs

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use std::sync::atomic::{AtomicU64, Ordering};
2+
use std::time::{Duration, Instant};
3+
14
use tantivy::collector::{Count, TopDocs};
25
use tantivy::query::{
36
BooleanQuery, BoostQuery, FuzzyTermQuery, Occur, PhraseQuery, Query, QueryParser, TermQuery,
@@ -78,24 +81,46 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
7881
pub async fn register_collection(&self, config: CollectionConfig) -> Result<(), crate::Error> {
7982
let base = self.manager.app_handle().path2().base()?;
8083
let index_path = base.join(&config.path);
84+
let version_path = index_path.join("schema_version");
8185

8286
std::fs::create_dir_all(&index_path)?;
8387

8488
let state = self.manager.state::<IndexState>();
85-
let mut guard = state.inner.lock().await;
89+
let mut guard = state.inner.write().await;
8690

8791
if guard.collections.contains_key(&config.name) {
8892
tracing::debug!("Collection '{}' already registered", config.name);
8993
return Ok(());
9094
}
9195

9296
let schema = (config.schema_builder)();
93-
let index = if index_path.join("meta.json").exists() {
97+
98+
let needs_reindex = if index_path.join("meta.json").exists() {
99+
let stored_version = std::fs::read_to_string(&version_path)
100+
.ok()
101+
.and_then(|s| s.trim().parse::<u32>().ok())
102+
.unwrap_or(0);
103+
stored_version != config.schema_version
104+
} else {
105+
false
106+
};
107+
108+
let index = if index_path.join("meta.json").exists() && !needs_reindex {
94109
Index::open_in_dir(&index_path)?
95110
} else {
111+
if needs_reindex {
112+
tracing::info!(
113+
"Schema version changed for collection '{}', re-creating index",
114+
config.name
115+
);
116+
std::fs::remove_dir_all(&index_path)?;
117+
std::fs::create_dir_all(&index_path)?;
118+
}
96119
Index::create_in_dir(&index_path, schema.clone())?
97120
};
98121

122+
std::fs::write(&version_path, config.schema_version.to_string())?;
123+
99124
register_tokenizers(&index);
100125

101126
let reader = index
@@ -110,16 +135,21 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
110135
index,
111136
reader,
112137
writer,
138+
auto_commit: config.auto_commit,
139+
commit_interval_ms: config.commit_interval_ms,
140+
pending_writes: AtomicU64::new(0),
141+
last_commit: std::sync::Mutex::new(Instant::now()),
113142
};
114143

115144
guard
116145
.collections
117146
.insert(config.name.clone(), collection_index);
118147

119148
tracing::info!(
120-
"Tantivy collection '{}' registered at {:?}",
149+
"Tantivy collection '{}' registered at {:?} (version: {})",
121150
config.name,
122-
index_path
151+
index_path,
152+
config.schema_version
123153
);
124154
Ok(())
125155
}
@@ -131,7 +161,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
131161
pub async fn search(&self, request: SearchRequest) -> Result<SearchResult, crate::Error> {
132162
let collection_name = Self::get_collection_name(request.collection);
133163
let state = self.manager.state::<IndexState>();
134-
let guard = state.inner.lock().await;
164+
let guard = state.inner.read().await;
135165

136166
let collection_index = guard
137167
.collections
@@ -355,7 +385,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
355385
pub async fn reindex(&self, collection: Option<String>) -> Result<(), crate::Error> {
356386
let collection_name = Self::get_collection_name(collection);
357387
let state = self.manager.state::<IndexState>();
358-
let mut guard = state.inner.lock().await;
388+
let mut guard = state.inner.write().await;
359389

360390
let collection_index = guard
361391
.collections
@@ -371,6 +401,9 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
371401

372402
writer.commit()?;
373403

404+
collection_index.pending_writes.store(0, Ordering::SeqCst);
405+
*collection_index.last_commit.lock().unwrap() = Instant::now();
406+
374407
tracing::info!(
375408
"Reindex completed for collection '{}'. Index cleared and ready for new documents. Fields: {:?}",
376409
collection_name,
@@ -387,7 +420,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
387420
) -> Result<(), crate::Error> {
388421
let collection_name = Self::get_collection_name(collection);
389422
let state = self.manager.state::<IndexState>();
390-
let mut guard = state.inner.lock().await;
423+
let mut guard = state.inner.write().await;
391424

392425
let collection_index = guard
393426
.collections
@@ -413,7 +446,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
413446
}
414447

415448
writer.add_document(doc)?;
416-
writer.commit()?;
449+
450+
collection_index
451+
.pending_writes
452+
.fetch_add(1, Ordering::SeqCst);
453+
454+
let should_commit = if collection_index.auto_commit {
455+
let last_commit = collection_index.last_commit.lock().unwrap();
456+
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
457+
} else {
458+
true
459+
};
460+
461+
if should_commit {
462+
writer.commit()?;
463+
collection_index.pending_writes.store(0, Ordering::SeqCst);
464+
*collection_index.last_commit.lock().unwrap() = Instant::now();
465+
}
417466

418467
tracing::debug!(
419468
"Added document '{}' to collection '{}'",
@@ -431,7 +480,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
431480
) -> Result<(), crate::Error> {
432481
let collection_name = Self::get_collection_name(collection);
433482
let state = self.manager.state::<IndexState>();
434-
let mut guard = state.inner.lock().await;
483+
let mut guard = state.inner.write().await;
435484

436485
let collection_index = guard
437486
.collections
@@ -460,7 +509,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
460509
}
461510

462511
writer.add_document(doc)?;
463-
writer.commit()?;
512+
513+
collection_index
514+
.pending_writes
515+
.fetch_add(1, Ordering::SeqCst);
516+
517+
let should_commit = if collection_index.auto_commit {
518+
let last_commit = collection_index.last_commit.lock().unwrap();
519+
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
520+
} else {
521+
true
522+
};
523+
524+
if should_commit {
525+
writer.commit()?;
526+
collection_index.pending_writes.store(0, Ordering::SeqCst);
527+
*collection_index.last_commit.lock().unwrap() = Instant::now();
528+
}
464529

465530
tracing::debug!(
466531
"Updated document '{}' in collection '{}'",
@@ -478,7 +543,7 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
478543
) -> Result<(), crate::Error> {
479544
let collection_name = Self::get_collection_name(collection);
480545
let state = self.manager.state::<IndexState>();
481-
let mut guard = state.inner.lock().await;
546+
let mut guard = state.inner.write().await;
482547

483548
let collection_index = guard
484549
.collections
@@ -491,7 +556,23 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
491556

492557
let id_term = Term::from_field_text(fields.id, &id);
493558
writer.delete_term(id_term);
494-
writer.commit()?;
559+
560+
collection_index
561+
.pending_writes
562+
.fetch_add(1, Ordering::SeqCst);
563+
564+
let should_commit = if collection_index.auto_commit {
565+
let last_commit = collection_index.last_commit.lock().unwrap();
566+
last_commit.elapsed() >= Duration::from_millis(collection_index.commit_interval_ms)
567+
} else {
568+
true
569+
};
570+
571+
if should_commit {
572+
writer.commit()?;
573+
collection_index.pending_writes.store(0, Ordering::SeqCst);
574+
*collection_index.last_commit.lock().unwrap() = Instant::now();
575+
}
495576

496577
tracing::debug!(
497578
"Removed document '{}' from collection '{}'",
@@ -501,6 +582,31 @@ impl<'a, R: tauri::Runtime, M: tauri::Manager<R>> Tantivy<'a, R, M> {
501582

502583
Ok(())
503584
}
585+
586+
pub async fn flush(&self, collection: Option<String>) -> Result<(), crate::Error> {
587+
let collection_name = Self::get_collection_name(collection);
588+
let state = self.manager.state::<IndexState>();
589+
let mut guard = state.inner.write().await;
590+
591+
let collection_index = guard
592+
.collections
593+
.get_mut(&collection_name)
594+
.ok_or_else(|| crate::Error::CollectionNotFound(collection_name.clone()))?;
595+
596+
let pending = collection_index.pending_writes.load(Ordering::SeqCst);
597+
if pending > 0 {
598+
collection_index.writer.commit()?;
599+
collection_index.pending_writes.store(0, Ordering::SeqCst);
600+
*collection_index.last_commit.lock().unwrap() = Instant::now();
601+
tracing::debug!(
602+
"Flushed {} pending writes for collection '{}'",
603+
pending,
604+
collection_name
605+
);
606+
}
607+
608+
Ok(())
609+
}
504610
}
505611

506612
pub trait TantivyPluginExt<R: tauri::Runtime> {

plugins/tantivy/src/lib.rs

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ mod tokenizer;
77

88
use serde::{Deserialize, Serialize};
99
use std::collections::HashMap;
10+
use std::sync::atomic::{AtomicU64, Ordering};
11+
use std::time::Instant;
1012
use tantivy::schema::Schema;
1113
use tantivy::{Index, IndexReader, IndexWriter};
1214
use tauri::Manager;
13-
use tokio::sync::Mutex;
15+
use tokio::sync::RwLock;
1416

1517
pub use error::{Error, Result};
1618
pub use ext::*;
@@ -99,18 +101,26 @@ pub struct SearchRequest {
99101
pub options: SearchOptions,
100102
}
101103

104+
pub const SCHEMA_VERSION: u32 = 1;
105+
102106
pub struct CollectionConfig {
103107
pub name: String,
104108
pub path: String,
105109
pub schema_builder: fn() -> Schema,
106110
pub auto_commit: bool,
111+
pub commit_interval_ms: u64,
112+
pub schema_version: u32,
107113
}
108114

109115
pub struct CollectionIndex {
110116
pub schema: Schema,
111117
pub index: Index,
112118
pub reader: IndexReader,
113119
pub writer: IndexWriter,
120+
pub auto_commit: bool,
121+
pub commit_interval_ms: u64,
122+
pub pending_writes: AtomicU64,
123+
pub last_commit: std::sync::Mutex<Instant>,
114124
}
115125

116126
pub struct IndexStateInner {
@@ -126,13 +136,13 @@ impl Default for IndexStateInner {
126136
}
127137

128138
pub struct IndexState {
129-
pub inner: Mutex<IndexStateInner>,
139+
pub inner: RwLock<IndexStateInner>,
130140
}
131141

132142
impl Default for IndexState {
133143
fn default() -> Self {
134144
Self {
135-
inner: Mutex::new(IndexStateInner::default()),
145+
inner: RwLock::new(IndexStateInner::default()),
136146
}
137147
}
138148
}
@@ -165,6 +175,8 @@ pub fn init() -> tauri::plugin::TauriPlugin<tauri::Wry> {
165175
path: "search_index".to_string(),
166176
schema_builder: schema::build_schema,
167177
auto_commit: true,
178+
commit_interval_ms: 1000,
179+
schema_version: SCHEMA_VERSION,
168180
};
169181

170182
if let Err(e) = handle.tantivy().register_collection(config).await {
@@ -174,6 +186,31 @@ pub fn init() -> tauri::plugin::TauriPlugin<tauri::Wry> {
174186

175187
Ok(())
176188
})
189+
.on_event(|app, event| {
190+
if let tauri::RunEvent::ExitRequested { .. } = event {
191+
let state = app.state::<IndexState>();
192+
if let Ok(mut guard) = state.inner.try_write() {
193+
for (name, collection) in guard.collections.iter_mut() {
194+
let pending = collection.pending_writes.load(Ordering::SeqCst);
195+
if pending > 0 {
196+
if let Err(e) = collection.writer.commit() {
197+
tracing::error!(
198+
"Failed to flush pending writes for collection '{}': {}",
199+
name,
200+
e
201+
);
202+
} else {
203+
tracing::info!(
204+
"Flushed {} pending writes for collection '{}' on exit",
205+
pending,
206+
name
207+
);
208+
}
209+
}
210+
}
211+
}
212+
}
213+
})
177214
.build()
178215
}
179216

0 commit comments

Comments
 (0)