Skip to content

Commit 381d26d

Browse files
emmaling27Convex, Inc.
authored andcommitted
Clean up get_compactable_segments (#27054)
GitOrigin-RevId: 0b3b506f7b1c7c1d0e90ced2564fec7bd4453339
1 parent fee94c7 commit 381d26d

File tree

3 files changed

+17
-25
lines changed

3 files changed

+17
-25
lines changed

crates/database/src/index_workers/search_compactor.rs

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -250,38 +250,30 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
250250
))
251251
}
252252

253-
fn max_compactable_segments<'a>(
253+
fn get_compactable_segments<'a>(
254254
segments: Vec<&'a T::Segment>,
255255
developer_config: &T::DeveloperConfig,
256256
compaction_config: &CompactionConfig,
257257
) -> anyhow::Result<Option<Vec<&'a T::Segment>>> {
258-
let mut size: u64 = 0;
259-
let segments = segments
258+
let mut total_size: u64 = 0;
259+
let segments_with_size: Vec<_> = segments
260260
.into_iter()
261-
.map(|segment| Ok((segment, segment.statistics()?.num_documents())))
262-
// Allow errors to be propagated by giving them the lowest value for the ascending sort
263-
// We might ignore errors anyway if there are enough successful and empty segments, but
264-
// that seems unlikely.
265-
.sorted_by_key(|result: &anyhow::Result<_>|
266-
result.as_ref().map(|(_, num_docs)| *num_docs).unwrap_or(0)
267-
)
268-
.map_ok(|(segment, _)| {
269-
Ok((segment, segment.total_size_bytes(developer_config)?))
270-
})
271-
.flatten()
272-
.take_while(|segment| {
273-
let Ok((_, segment_size_bytes)) = segment else {
274-
// Propagate the error to the outer collect.
275-
return true;
276-
};
261+
.map(|segment| anyhow::Ok((segment, segment.total_size_bytes(developer_config)?)))
262+
.try_collect()?;
263+
// Sort segments in ascending size order and take as many as we can fit in the
264+
// max segment
265+
let segments = segments_with_size
266+
.into_iter()
267+
.sorted_by_key(|(_, size)| *size)
268+
.take_while(|(_segment, segment_size_bytes)| {
277269
// Some extra paranoia to fail loudly if we've misplaced some zeros somewhere.
278-
size = size
270+
total_size = total_size
279271
.checked_add(*segment_size_bytes)
280272
.context("Overflowed size!")
281273
.unwrap();
282-
size <= compaction_config.max_segment_size_bytes
274+
total_size <= compaction_config.max_segment_size_bytes
283275
})
284-
.collect::<anyhow::Result<Vec<_>>>()?;
276+
.collect::<Vec<_>>();
285277
if segments.len() >= compaction_config.min_compaction_segments as usize {
286278
Ok(Some(
287279
segments
@@ -321,13 +313,13 @@ impl<RT: Runtime, T: SearchIndex> SearchIndexCompactor<RT, T> {
321313
// Compact small segments first because it's quick and reducing the total number
322314
// of segments helps us minimize query costs.
323315
let compact_small =
324-
Self::max_compactable_segments(small_segments, developer_config, compaction_config)?;
316+
Self::get_compactable_segments(small_segments, developer_config, compaction_config)?;
325317
if let Some(compact_small) = compact_small {
326318
return Ok((to_owned(compact_small), CompactionReason::SmallSegments));
327319
}
328320
// Next check to see if we have too many larger segments and if so, compact
329321
// them.
330-
let compact_large = Self::max_compactable_segments(
322+
let compact_large = Self::get_compactable_segments(
331323
large_segments
332324
.clone()
333325
.into_iter()

crates/database/src/metrics.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -880,6 +880,7 @@ pub fn log_non_deleted_documents_per_search_index(count: u64, search_type: Searc
880880
const COMPACTION_REASON_LABEL: &str = "compaction_reason";
881881

882882
pub enum CompactionReason {
883+
// TODO(emma) remove and replace with optional result
883884
Unknown,
884885
SmallSegments,
885886
LargeSegments,

crates/database/src/text_index_worker/compactor.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use crate::{
1818

1919
pub type TextIndexCompactor<RT> = SearchIndexCompactor<RT, TextSearchIndex>;
2020

21-
#[allow(dead_code)]
2221
pub(crate) fn new_text_compactor<RT: Runtime>(
2322
database: Database<RT>,
2423
searcher: Arc<dyn Searcher>,

0 commit comments

Comments
 (0)