Skip to content

Commit 5d5b3b3

Browse files
authored
[ENH]: de-dupe block path prefixes during GC to reduce memory usage (#5766)
1 parent a7c7865 commit 5d5b3b3

File tree

4 files changed

+347
-113
lines changed

4 files changed

+347
-113
lines changed

rust/garbage_collector/src/garbage_collector_orchestrator_v2.rs

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use crate::operators::list_files_at_version::{
2121
ListFilesAtVersionsOperator,
2222
};
2323
use crate::types::{
24-
version_graph_to_collection_dependency_graph, CleanupMode, GarbageCollectorResponse,
25-
VersionGraph, VersionGraphNode,
24+
version_graph_to_collection_dependency_graph, CleanupMode, FilePathRefCountSet,
25+
GarbageCollectorResponse, VersionGraph, VersionGraphNode,
2626
};
2727
use async_trait::async_trait;
2828
use chroma_blockstore::RootManager;
@@ -68,7 +68,7 @@ pub struct GarbageCollectorOrchestrator {
6868
versions_to_delete_output: Option<ComputeVersionsToDeleteOutput>,
6969
delete_unused_file_output: Option<DeleteUnusedFilesOutput>,
7070
delete_unused_log_output: Option<DeleteUnusedLogsOutput>,
71-
file_ref_counts: HashMap<String, u32>,
71+
file_ref_counts: FilePathRefCountSet,
7272
num_pending_tasks: usize,
7373
min_versions_to_keep: u32,
7474
enable_log_gc: bool,
@@ -119,7 +119,7 @@ impl GarbageCollectorOrchestrator {
119119
cleanup_mode,
120120
result_channel: None,
121121
version_files: HashMap::new(),
122-
file_ref_counts: HashMap::new(),
122+
file_ref_counts: FilePathRefCountSet::new(),
123123
versions_to_delete_output: None,
124124
delete_unused_file_output: None,
125125
delete_unused_log_output: None,
@@ -705,10 +705,8 @@ impl GarbageCollectorOrchestrator {
705705
output.version
706706
);
707707

708-
for file_path in output.file_paths {
709-
let count = self.file_ref_counts.entry(file_path).or_insert(0);
710-
*count += 1;
711-
}
708+
self.file_ref_counts
709+
.merge(FilePathRefCountSet::from_set(output.file_paths, 1));
712710
}
713711
CollectionVersionAction::Delete => {
714712
tracing::debug!(
@@ -718,9 +716,8 @@ impl GarbageCollectorOrchestrator {
718716
output.version
719717
);
720718

721-
for file_path in output.file_paths {
722-
self.file_ref_counts.entry(file_path).or_insert(0);
723-
}
719+
self.file_ref_counts
720+
.merge(FilePathRefCountSet::from_set(output.file_paths, 0));
724721
}
725722
}
726723

@@ -733,17 +730,8 @@ impl GarbageCollectorOrchestrator {
733730
) -> Result<(), GarbageCollectorError> {
734731
// We now have results for all ListFilesAtVersionsOperator tasks that we spawned
735732
tracing::trace!("File ref counts: {:#?}", self.file_ref_counts);
736-
let file_paths_to_delete = self
737-
.file_ref_counts
738-
.iter()
739-
.filter_map(|(path, count)| {
740-
if *count == 0 {
741-
Some(path.clone())
742-
} else {
743-
None
744-
}
745-
})
746-
.collect::<Vec<_>>();
733+
734+
let file_paths_to_delete = self.file_ref_counts.as_set(0);
747735

748736
let delete_percentage =
749737
file_paths_to_delete.len() as f32 / self.file_ref_counts.len() as f32 * 100.0;
@@ -785,7 +773,6 @@ impl GarbageCollectorOrchestrator {
785773
)),
786774
DeleteUnusedFilesInput {
787775
unused_s3_files: file_paths_to_delete,
788-
hnsw_prefixes_for_deletion: vec![],
789776
},
790777
ctx.receiver(),
791778
self.context.task_cancellation_token.clone(),
@@ -822,7 +809,7 @@ impl GarbageCollectorOrchestrator {
822809
return Ok(());
823810
}
824811

825-
self.num_files_deleted += output.deleted_files.len() as u32;
812+
self.num_files_deleted += output.num_files_deleted as u32;
826813

827814
let versions_to_delete = self.versions_to_delete_output.as_ref().ok_or(
828815
GarbageCollectorError::InvariantViolation(

rust/garbage_collector/src/operators/delete_unused_files.rs

Lines changed: 42 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::types::CleanupMode;
2+
use crate::types::FilePathSet;
23
use crate::types::RENAMED_FILE_PREFIX;
34
use async_trait::async_trait;
45
use chroma_error::{ChromaError, ErrorCodes};
56
use chroma_storage::Storage;
67
use chroma_storage::StorageError;
78
use chroma_system::{Operator, OperatorType};
89
use futures::stream::StreamExt;
9-
use std::collections::HashSet;
10+
use futures::TryStreamExt;
1011
use thiserror::Error;
1112

1213
#[derive(Clone, Debug)]
@@ -39,13 +40,12 @@ impl DeleteUnusedFilesOperator {
3940

4041
#[derive(Debug)]
4142
pub struct DeleteUnusedFilesInput {
42-
pub unused_s3_files: Vec<String>,
43-
pub hnsw_prefixes_for_deletion: Vec<String>,
43+
pub unused_s3_files: FilePathSet,
4444
}
4545

4646
#[derive(Debug)]
4747
pub struct DeleteUnusedFilesOutput {
48-
pub deleted_files: HashSet<String>,
48+
pub num_files_deleted: usize,
4949
}
5050

5151
#[derive(Error, Debug)]
@@ -80,15 +80,10 @@ impl Operator<DeleteUnusedFilesInput, DeleteUnusedFilesOutput> for DeleteUnusedF
8080
) -> Result<DeleteUnusedFilesOutput, DeleteUnusedFilesError> {
8181
tracing::debug!(
8282
files_count = input.unused_s3_files.len(),
83-
hnsw_prefixes_count = input.hnsw_prefixes_for_deletion.len(),
8483
cleanup_mode = ?self.cleanup_mode,
8584
"Starting deletion of unused files"
8685
);
8786

88-
// Create a list that contains all files that will be deleted.
89-
let mut all_files = input.unused_s3_files.clone();
90-
all_files.extend(input.hnsw_prefixes_for_deletion.clone());
91-
9287
// NOTE(rohit):
9388
// We don't want to fail the entire operation if one file fails to rename or delete.
9489
// It's possible that the file was already renamed/deleted in the last run that
@@ -97,8 +92,8 @@ impl Operator<DeleteUnusedFilesInput, DeleteUnusedFilesOutput> for DeleteUnusedF
9792
CleanupMode::DryRunV2 => {}
9893
CleanupMode::Rename => {
9994
// Soft delete - rename the file
100-
if !all_files.is_empty() {
101-
let mut rename_stream = futures::stream::iter(all_files.clone())
95+
if !input.unused_s3_files.is_empty() {
96+
let mut rename_stream = futures::stream::iter(input.unused_s3_files.iter())
10297
.map(move |file_path| {
10398
let new_path = self.get_rename_path(&file_path);
10499
self.rename_with_path(file_path, new_path)
@@ -123,18 +118,22 @@ impl Operator<DeleteUnusedFilesInput, DeleteUnusedFilesOutput> for DeleteUnusedF
123118
}
124119
CleanupMode::DeleteV2 => {
125120
// Hard delete - remove the file
126-
if !all_files.is_empty() {
127-
// The S3 DeleteObjects API allows up to 1000 objects per request
128-
for chunk in all_files.chunks(1000) {
129-
let result = self.storage.delete_many(chunk).await?;
130-
if !result.errors.is_empty() {
121+
if !input.unused_s3_files.is_empty() {
122+
let mut delete_stream = futures::stream::iter(input.unused_s3_files.iter())
123+
// The S3 DeleteObjects API allows up to 1000 objects per request
124+
.chunks(1000)
125+
.then(|chunk| async move { self.storage.delete_many(chunk).await })
126+
.boxed();
127+
128+
while let Some(delete_result) = delete_stream.try_next().await? {
129+
if !delete_result.errors.is_empty() {
131130
// Log the errors but don't fail the operation
132-
for error in result.errors {
131+
for error in delete_result.errors {
133132
match error {
134133
StorageError::NotFound { path, source } => {
135-
tracing::warn!("Delete file {path} not found: {source}")
134+
tracing::info!("Delete file {path} not found: {source}")
136135
}
137-
err => return Err(DeleteUnusedFilesError::StorageError(err)),
136+
err => tracing::error!("Failed to delete: {err}"),
138137
}
139138
}
140139
}
@@ -144,13 +143,11 @@ impl Operator<DeleteUnusedFilesInput, DeleteUnusedFilesOutput> for DeleteUnusedF
144143
}
145144

146145
Ok(DeleteUnusedFilesOutput {
147-
deleted_files: all_files.into_iter().collect(),
146+
num_files_deleted: input.unused_s3_files.len(),
148147
})
149148
}
150149
}
151150

152-
impl DeleteUnusedFilesOperator {}
153-
154151
#[cfg(test)]
155152
mod tests {
156153
use super::*;
@@ -167,11 +164,14 @@ mod tests {
167164
.unwrap();
168165
}
169166

170-
async fn setup_test_files(storage: &Storage) -> (Vec<String>, Vec<String>) {
167+
async fn setup_test_files(storage: &Storage) -> FilePathSet {
168+
let mut set = FilePathSet::new();
169+
171170
// Create regular test files
172171
let test_files = vec!["file1.txt", "file2.txt"];
173172
for file in &test_files {
174173
create_test_file(storage, file, b"test content").await;
174+
set.insert_path(file.to_string());
175175
}
176176

177177
// Create HNSW test files
@@ -181,40 +181,31 @@ mod tests {
181181
.collect::<Vec<String>>();
182182
for file in &hnsw_files {
183183
create_test_file(storage, file, b"test content").await;
184+
set.insert_path(file.to_string());
184185
}
185186

186-
(
187-
test_files.iter().map(|s| s.to_string()).collect(),
188-
hnsw_files.iter().map(|s| s.to_string()).collect(),
189-
)
187+
set
190188
}
191189

192190
#[tokio::test]
193191
async fn test_dry_run_mode() {
194192
let tmp_dir = TempDir::new().unwrap();
195193
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
196-
let (test_files, hnsw_files) = setup_test_files(&storage).await;
194+
let unused_s3_files = setup_test_files(&storage).await;
197195

198196
let operator = DeleteUnusedFilesOperator::new(
199197
storage.clone(),
200198
CleanupMode::DryRunV2,
201199
"test_tenant".to_string(),
202200
);
203201
let input = DeleteUnusedFilesInput {
204-
unused_s3_files: test_files.clone(),
205-
hnsw_prefixes_for_deletion: hnsw_files.clone(),
202+
unused_s3_files: unused_s3_files.clone(),
206203
};
207204

208-
let result = operator.run(&input).await.unwrap();
205+
operator.run(&input).await.unwrap();
209206

210207
// Verify original files still exist
211-
for file in &test_files {
212-
assert!(result.deleted_files.contains(file));
213-
assert!(Path::new(&tmp_dir.path().join(file)).exists());
214-
}
215-
216-
for file in &hnsw_files {
217-
assert!(result.deleted_files.contains(file));
208+
for file in unused_s3_files.iter() {
218209
assert!(Path::new(&tmp_dir.path().join(file)).exists());
219210
}
220211
}
@@ -223,71 +214,50 @@ mod tests {
223214
async fn test_rename_mode() {
224215
let tmp_dir = TempDir::new().unwrap();
225216
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
226-
let (test_files, hnsw_files) = setup_test_files(&storage).await;
217+
let unused_s3_files = setup_test_files(&storage).await;
227218

228219
let operator = DeleteUnusedFilesOperator::new(
229220
storage.clone(),
230221
CleanupMode::Rename,
231222
"test_tenant".to_string(),
232223
);
233224
let input = DeleteUnusedFilesInput {
234-
unused_s3_files: test_files.clone(),
235-
hnsw_prefixes_for_deletion: hnsw_files.clone(),
225+
unused_s3_files: unused_s3_files.clone(),
236226
};
237227

238-
let result = operator.run(&input).await.unwrap();
239-
240-
// Verify regular files were moved to deleted directory
241-
for file in &test_files {
242-
let original_path = tmp_dir.path().join(file);
243-
let new_path = tmp_dir
244-
.path()
245-
.join(format!("{}{}/{}", RENAMED_FILE_PREFIX, "test_tenant", file));
246-
assert!(!original_path.exists());
247-
assert!(new_path.exists());
248-
assert!(result.deleted_files.contains(file));
249-
}
228+
operator.run(&input).await.unwrap();
250229

251-
// Verify HNSW files were moved to deleted directory
252-
for file in &hnsw_files {
253-
let original_path = tmp_dir.path().join(file);
230+
// Verify files were moved to deleted directory
231+
for file in unused_s3_files.iter() {
232+
let original_path = tmp_dir.path().join(&file);
254233
let new_path = tmp_dir
255234
.path()
256235
.join(format!("{}{}/{}", RENAMED_FILE_PREFIX, "test_tenant", file));
257236
assert!(!original_path.exists());
258237
assert!(new_path.exists());
259-
assert!(result.deleted_files.contains(file));
260238
}
261239
}
262240

263241
#[tokio::test]
264242
async fn test_delete_mode() {
265243
let tmp_dir = TempDir::new().unwrap();
266244
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
267-
let (test_files, hnsw_files) = setup_test_files(&storage).await;
245+
let unused_s3_files = setup_test_files(&storage).await;
268246

269247
let operator = DeleteUnusedFilesOperator::new(
270248
storage.clone(),
271249
CleanupMode::DeleteV2,
272250
"test_tenant".to_string(),
273251
);
274252
let input = DeleteUnusedFilesInput {
275-
unused_s3_files: test_files.clone(),
276-
hnsw_prefixes_for_deletion: hnsw_files.clone(),
253+
unused_s3_files: unused_s3_files.clone(),
277254
};
278255

279-
let result = operator.run(&input).await.unwrap();
256+
operator.run(&input).await.unwrap();
280257

281-
// Verify regular files were deleted
282-
for file in &test_files {
283-
assert!(!Path::new(&tmp_dir.path().join(file)).exists());
284-
assert!(result.deleted_files.contains(file));
285-
}
286-
287-
// Verify HNSW files were deleted
288-
for file in &hnsw_files {
258+
// Verify files were deleted
259+
for file in unused_s3_files.iter() {
289260
assert!(!Path::new(&tmp_dir.path().join(file)).exists());
290-
assert!(result.deleted_files.contains(file));
291261
}
292262
}
293263

@@ -296,7 +266,8 @@ mod tests {
296266
let tmp_dir = TempDir::new().unwrap();
297267
let storage = Storage::Local(LocalStorage::new(tmp_dir.path().to_str().unwrap()));
298268

299-
let unused_files = vec!["nonexistent.txt".to_string()];
269+
let mut unused_files = FilePathSet::new();
270+
unused_files.insert_path("nonexistent.txt".to_string());
300271

301272
// Test Delete mode - should succeed but record the error in deletion list
302273
let delete_operator = DeleteUnusedFilesOperator::new(
@@ -307,7 +278,6 @@ mod tests {
307278
let result = delete_operator
308279
.run(&DeleteUnusedFilesInput {
309280
unused_s3_files: unused_files.clone(),
310-
hnsw_prefixes_for_deletion: vec![],
311281
})
312282
.await;
313283
assert!(result.is_ok());
@@ -321,7 +291,6 @@ mod tests {
321291
let result = rename_operator
322292
.run(&DeleteUnusedFilesInput {
323293
unused_s3_files: unused_files.clone(),
324-
hnsw_prefixes_for_deletion: vec![],
325294
})
326295
.await;
327296
assert!(result.is_ok());
@@ -335,7 +304,6 @@ mod tests {
335304
let result = list_operator
336305
.run(&DeleteUnusedFilesInput {
337306
unused_s3_files: unused_files,
338-
hnsw_prefixes_for_deletion: vec![],
339307
})
340308
.await;
341309
assert!(result.is_ok());

0 commit comments

Comments
 (0)