Skip to content

Commit f7e13a7

Browse files
sjuddConvex, Inc.
authored andcommitted
Use try_join helpers in incremental_index (#26425)
GitOrigin-RevId: 336d5b1d63ec2a3c5a05165b2e09d22809d33cf3
1 parent c6fa649 commit f7e13a7

File tree

7 files changed

+60
-36
lines changed

7 files changed

+60
-36
lines changed

crates/common/src/runtime/mod.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,21 @@ pub async fn try_join_buffered<
112112
+ Send
113113
+ 'static,
114114
) -> anyhow::Result<C> {
115-
stream::iter(tasks.map(|task| try_join(rt.clone(), name, task)))
116-
.buffered(JOIN_BUFFER_SIZE)
117-
.try_collect()
118-
.await
115+
assert_send(
116+
stream::iter(tasks.map(|task| assert_send(try_join(rt.clone(), name, assert_send(task)))))
117+
.buffered(JOIN_BUFFER_SIZE)
118+
.try_collect(),
119+
)
120+
.await
121+
}
122+
123+
// Work around "higher-ranked lifetime errors" due to the borrow checker's
124+
// inability (bug) to determine that some futures are in fact send. See
125+
// https://github.com/rust-lang/rust/issues/102211#issuecomment-1367900125
126+
fn assert_send<'a, T>(
127+
fut: impl 'a + Send + Future<Output = T>,
128+
) -> impl 'a + Send + Future<Output = T> {
129+
fut
119130
}
120131

121132
pub async fn try_join_buffer_unordered<
@@ -129,10 +140,12 @@ pub async fn try_join_buffer_unordered<
129140
+ Send
130141
+ 'static,
131142
) -> anyhow::Result<C> {
132-
stream::iter(tasks.map(|task| try_join(rt.clone(), name, task)))
133-
.buffer_unordered(JOIN_BUFFER_SIZE)
134-
.try_collect()
135-
.await
143+
assert_send(
144+
stream::iter(tasks.map(|task| try_join(rt.clone(), name, task)))
145+
.buffer_unordered(JOIN_BUFFER_SIZE)
146+
.try_collect(),
147+
)
148+
.await
136149
}
137150

138151
pub async fn try_join<RT: Runtime, T: Send + 'static>(

crates/database/src/index_workers/index_meta.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ pub trait SearchIndex: Clone + Debug {
103103

104104
fn estimate_document_size(schema: &Self::Schema, doc: &ResolvedDocument) -> u64;
105105

106-
async fn build_disk_index(
106+
async fn build_disk_index<RT: Runtime>(
107+
rt: &RT,
107108
schema: &Self::Schema,
108109
index_path: &PathBuf,
109110
documents: DocumentStream<'_>,

crates/database/src/index_workers/search_flusher.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ impl<RT: Runtime, T: SearchIndexConfigParser + 'static> SearchFlusher<RT, T> {
564564
params.database.retention_validator(),
565565
);
566566
let new_segment = T::IndexType::build_disk_index(
567+
&params.runtime,
567568
&qdrant_schema,
568569
&index_path,
569570
documents,

crates/database/src/text_index_worker/text_meta.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ impl SearchIndex for TextSearchIndex {
208208
schema.estimate_size(doc)
209209
}
210210

211-
async fn build_disk_index(
211+
async fn build_disk_index<RT: Runtime>(
212+
rt: &RT,
212213
schema: &Self::Schema,
213214
index_path: &PathBuf,
214215
documents: DocumentStream<'_>,
@@ -222,6 +223,7 @@ impl SearchIndex for TextSearchIndex {
222223
let revision_stream = Box::pin(stream_revision_pairs(documents, &reader));
223224

224225
build_new_segment(
226+
rt,
225227
revision_stream,
226228
schema.clone(),
227229
index_path,

crates/database/src/vector_index_worker/vector_meta.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ impl SearchIndex for VectorSearchIndex {
216216
schema.estimate_vector_size() as u64
217217
}
218218

219-
async fn build_disk_index(
219+
async fn build_disk_index<RT: Runtime>(
220+
_rt: &RT,
220221
schema: &Self::Schema,
221222
index_path: &PathBuf,
222223
documents: DocumentStream<'_>,

crates/search/src/incremental_index.rs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,13 @@ use common::{
1818
bounded_thread_pool::BoundedThreadPool,
1919
id_tracker::StaticIdTracker,
2020
persistence::DocumentRevisionStream,
21-
runtime::Runtime,
21+
runtime::{
22+
try_join_buffer_unordered,
23+
Runtime,
24+
},
2225
types::ObjectKey,
2326
};
24-
use futures::{
25-
stream,
26-
FutureExt,
27-
StreamExt,
28-
TryStreamExt,
29-
};
27+
use futures::TryStreamExt;
3028
use storage::{
3129
Storage,
3230
StorageExt,
@@ -303,7 +301,8 @@ impl PreviousTextSegments {
303301
/// Note the descending order requirement can be relaxed if the caller can
304302
/// guarantee that no deletes will be present in the stream. A caller can do so
305303
/// when providing this function with a stream from table iterator for example.
306-
pub async fn build_new_segment(
304+
pub async fn build_new_segment<RT: Runtime>(
305+
rt: &RT,
307306
revision_stream: DocumentRevisionStream<'_>,
308307
tantivy_schema: TantivySearchIndexSchema,
309308
dir: &Path,
@@ -405,6 +404,7 @@ pub async fn build_new_segment(
405404
);
406405

407406
let segments_term_metadata = get_all_segment_term_metadata(
407+
rt,
408408
search_storage,
409409
segment_term_metadata_fetcher,
410410
term_deletes_by_segment,
@@ -447,7 +447,8 @@ fn get_size(path: &PathBuf) -> anyhow::Result<u64> {
447447
std::fs::read_dir(path)?.try_fold(0, |acc, curr| Ok(acc + get_size(&curr?.path())?))
448448
}
449449

450-
async fn get_all_segment_term_metadata(
450+
async fn get_all_segment_term_metadata<RT: Runtime>(
451+
rt: &RT,
451452
storage: Arc<dyn Storage>,
452453
segment_term_metadata_fetcher: Arc<dyn SegmentTermMetadataFetcher>,
453454
term_deletes_by_segment: BTreeMap<ObjectKey, TermDeletionsByField>,
@@ -470,12 +471,9 @@ async fn get_all_segment_term_metadata(
470471
}
471472
},
472473
);
473-
// TODO: Use `try_join_buffer_unordered` helper here. We couldn't figure out the
474-
// higher-ranked lifetime error.
475-
let segments_term_metadata: Vec<_> = stream::iter(segment_term_metadata_futs)
476-
.buffer_unordered(10)
477-
.try_collect::<Vec<_>>()
478-
.await?;
474+
let segments_term_metadata: Vec<_> =
475+
try_join_buffer_unordered(rt.clone(), "text_term_metadata", segment_term_metadata_futs)
476+
.await?;
479477
Ok(segments_term_metadata)
480478
}
481479

@@ -545,15 +543,21 @@ pub async fn fetch_compact_and_upload_text_segment<RT: Runtime>(
545543
segments: Vec<FragmentedTextStorageKeys>,
546544
) -> anyhow::Result<FragmentedTextSegment> {
547545
let _storage = storage.clone();
548-
let opened_segments: Vec<_> = stream::iter(segments)
549-
.map(move |segment| fetch_text_segment(cache.clone(), storage.clone(), segment).boxed())
550-
.buffer_unordered(20)
551-
.and_then(|paths: TextSegmentPaths| {
546+
let opened_segments = try_join_buffer_unordered(
547+
rt.clone(),
548+
"text_segment_merge",
549+
segments.into_iter().map(move |segment| {
552550
let pool = blocking_thread_pool.clone();
553-
async move { pool.execute(|| open_text_segment_for_merge(paths)).await? }
554-
})
555-
.try_collect()
556-
.await?;
551+
let storage = storage.clone();
552+
let cache = cache.clone();
553+
let segment = segment.clone();
554+
async move {
555+
let paths = fetch_text_segment(cache, storage, segment).await?;
556+
pool.execute(|| open_text_segment_for_merge(paths)).await?
557+
}
558+
}),
559+
)
560+
.await?;
557561

558562
let dir = TempDir::new()?;
559563
let new_segment = merge_segments(opened_segments, dir.path()).await?;

crates/search/src/searcher/searcher.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,9 +1734,10 @@ mod tests {
17341734
let revision_stream = futures::stream::iter(revisions).boxed();
17351735
let mut previous_segments = PreviousTextSegments::default();
17361736
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt.clone())?);
1737-
let segment_term_metadata_fetcher = Arc::new(InProcessSearcher::new(rt).await?);
1737+
let segment_term_metadata_fetcher = Arc::new(InProcessSearcher::new(rt.clone()).await?);
17381738

17391739
let new_segment = build_new_segment(
1740+
&rt,
17401741
revision_stream,
17411742
schema.clone(),
17421743
test_dir.path(),
@@ -1929,9 +1930,10 @@ mod tests {
19291930
// tests because they don't use the same directory that the indexes are stored
19301931
// in, which means we must use empty PreviousTextSegments in these tests.
19311932
let storage: Arc<dyn Storage> = Arc::new(LocalDirStorage::new(rt.clone())?);
1932-
let segment_term_metadata_fetcher = Arc::new(InProcessSearcher::new(rt).await?);
1933+
let segment_term_metadata_fetcher = Arc::new(InProcessSearcher::new(rt.clone()).await?);
19331934
let mut previous_segments = PreviousTextSegments::default();
19341935
let new_segment = build_new_segment(
1936+
&rt,
19351937
revision_stream,
19361938
schema.clone(),
19371939
index_dir,

0 commit comments

Comments
 (0)