Skip to content

Commit a445e12

Browse files
authored
[ENH] Do not do any mutations of the manifest from within GC. (#5050)
## Description of changes 3-phase gc still used the open and recover path. This causes contention between GC and the writer, especially for hotly written logs. ## Test plan CI ## Documentation Changes N/A
1 parent 1dbcd22 commit a445e12

File tree

5 files changed

+87
-13
lines changed

5 files changed

+87
-13
lines changed

rust/garbage_collector/src/operators/delete_unused_logs.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use chroma_system::{Operator, OperatorType};
1111
use chroma_types::CollectionUuid;
1212
use futures::future::try_join_all;
1313
use thiserror::Error;
14-
use wal3::{GarbageCollectionOptions, LogPosition, LogWriter, LogWriterOptions};
14+
use wal3::{GarbageCollectionOptions, GarbageCollector, LogPosition, LogWriterOptions};
1515

1616
#[derive(Clone, Debug)]
1717
pub struct DeleteUnusedLogsOperator {
@@ -67,12 +67,11 @@ impl Operator<DeleteUnusedLogsInput, DeleteUnusedLogsOutput> for DeleteUnusedLog
6767
let storage_clone = storage_arc.clone();
6868
let mut logs = self.logs.clone();
6969
log_gc_futures.push(async move {
70-
let writer = match LogWriter::open(
70+
let writer = match GarbageCollector::open(
7171
LogWriterOptions::default(),
7272
storage_clone,
7373
&collection_id.storage_prefix_for_log(),
7474
"garbage collection service",
75-
(),
7675
)
7776
.await
7877
{

rust/garbage_collector/src/operators/truncate_dirty_log.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use chroma_storage::Storage;
77
use chroma_system::{Operator, OperatorType};
88
use futures::future::try_join_all;
99
use thiserror::Error;
10-
use wal3::{GarbageCollectionOptions, LogWriter, LogWriterOptions};
10+
use wal3::{GarbageCollectionOptions, GarbageCollector, LogWriterOptions};
1111

1212
#[derive(Clone, Debug)]
1313
pub struct TruncateDirtyLogOperator {
@@ -56,12 +56,11 @@ impl Operator<TruncateDirtyLogInput, TruncateDirtyLogOutput> for TruncateDirtyLo
5656
let mut replica_id = 0u64;
5757
loop {
5858
let dirty_log_prefix = format!("dirty-rust-log-service-{replica_id}");
59-
match LogWriter::open(
59+
match GarbageCollector::open(
6060
LogWriterOptions::default(),
6161
storage_arc.clone(),
6262
&dirty_log_prefix,
6363
"garbage collection service",
64-
(),
6564
)
6665
.await
6766
{

rust/wal3/src/gc.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ use chroma_storage::{
1010
};
1111

1212
use crate::manifest::unprefixed_snapshot_path;
13+
use crate::writer::OnceLogWriter;
1314
use crate::{
1415
deserialize_setsum, prefixed_fragment_path, serialize_setsum, Error, Fragment, FragmentSeqNo,
15-
LogPosition, Manifest, ScrubError, Snapshot, SnapshotCache, SnapshotPointer, ThrottleOptions,
16+
GarbageCollectionOptions, LogPosition, LogWriterOptions, Manifest, ScrubError, Snapshot,
17+
SnapshotCache, SnapshotPointer, ThrottleOptions,
1618
};
1719

1820
////////////////////////////////////////////// Garbage /////////////////////////////////////////////
@@ -467,3 +469,48 @@ impl Garbage {
467469
Ok(drop_acc)
468470
}
469471
}
472+
473+
///////////////////////////////////////// GarbageCollector /////////////////////////////////////////
474+
475+
pub struct GarbageCollector {
476+
log: Arc<OnceLogWriter>,
477+
}
478+
479+
impl GarbageCollector {
480+
/// Open the log into a state where it can be garbage collected.
481+
pub async fn open(
482+
options: LogWriterOptions,
483+
storage: Arc<Storage>,
484+
prefix: &str,
485+
writer: &str,
486+
) -> Result<Self, Error> {
487+
let log = OnceLogWriter::open_for_read_only_and_stale_ops(
488+
options.clone(),
489+
Arc::clone(&storage),
490+
prefix.to_string(),
491+
writer.to_string(),
492+
Arc::new(()),
493+
)
494+
.await?;
495+
Ok(Self { log })
496+
}
497+
498+
pub async fn garbage_collect_phase1_compute_garbage(
499+
&self,
500+
options: &GarbageCollectionOptions,
501+
keep_at_least: Option<LogPosition>,
502+
) -> Result<bool, Error> {
503+
self.log
504+
.garbage_collect_phase1_compute_garbage(options, keep_at_least)
505+
.await
506+
}
507+
508+
pub async fn garbage_collect_phase3_delete_garbage(
509+
&self,
510+
options: &GarbageCollectionOptions,
511+
) -> Result<(), Error> {
512+
self.log
513+
.garbage_collect_phase3_delete_garbage(options)
514+
.await
515+
}
516+
}

rust/wal3/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub use batch_manager::BatchManager;
2222
pub use copy::copy;
2323
pub use cursors::{Cursor, CursorName, CursorStore, Witness};
2424
pub use destroy::destroy;
25-
pub use gc::Garbage;
25+
pub use gc::{Garbage, GarbageCollector};
2626
pub use manifest::{unprefixed_snapshot_path, Manifest, Snapshot, SnapshotPointer};
2727
pub use manifest_manager::ManifestManager;
2828
pub use reader::{Limits, LogReader};

rust/wal3/src/writer.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ impl Drop for LogWriter {
491491
/// structure as the recovery procedure does, this allows us to re-use exactly one code path for
492492
/// both. That code path can then be well-tested because any contention state gets exercised from
493493
/// the perspective of initialization.
494-
struct OnceLogWriter {
494+
pub(crate) struct OnceLogWriter {
495495
/// LogWriterOptions are fixed at log creation time.
496496
/// LogWriter is intentionally cheap to construct and destroy.
497497
/// Reopen the log to change the options.
@@ -573,6 +573,35 @@ impl OnceLogWriter {
573573
Ok(this)
574574
}
575575

576+
pub(crate) async fn open_for_read_only_and_stale_ops(
577+
options: LogWriterOptions,
578+
storage: Arc<Storage>,
579+
prefix: String,
580+
writer: String,
581+
mark_dirty: Arc<dyn MarkDirty>,
582+
) -> Result<Arc<Self>, Error> {
583+
let done = AtomicBool::new(false);
584+
let batch_manager =
585+
BatchManager::new(options.throttle_fragment).ok_or_else(|| Error::Internal)?;
586+
let manifest_manager = ManifestManager::new(
587+
options.throttle_manifest,
588+
options.snapshot_manifest,
589+
Arc::clone(&storage),
590+
prefix.clone(),
591+
writer,
592+
)
593+
.await?;
594+
Ok(Arc::new(Self {
595+
options,
596+
storage,
597+
prefix,
598+
done,
599+
mark_dirty,
600+
manifest_manager,
601+
batch_manager,
602+
}))
603+
}
604+
576605
fn shutdown(&self) {
577606
self.batch_manager.shutdown();
578607
self.manifest_manager.shutdown();
@@ -727,7 +756,7 @@ impl OnceLogWriter {
727756
/// Returns Ok(false) if there is no garbage to act upon (e.g., it's already been collected).
728757
/// Returns Ok(true) if there is garbage to act upon.
729758
#[tracing::instrument(skip(self, options))]
730-
async fn garbage_collect_phase1_compute_garbage(
759+
pub(crate) async fn garbage_collect_phase1_compute_garbage(
731760
&self,
732761
options: &GarbageCollectionOptions,
733762
keep_at_least: Option<LogPosition>,
@@ -799,7 +828,7 @@ impl OnceLogWriter {
799828
/// Post-condition:
800829
/// - contents of gc/GARBAGE are removed from manifest/MANIFEST.
801830
#[tracing::instrument(skip(self, _options))]
802-
async fn garbage_collect_phase2_update_manifest(
831+
pub(crate) async fn garbage_collect_phase2_update_manifest(
803832
&self,
804833
_options: &GarbageCollectionOptions,
805834
) -> Result<(), Error> {
@@ -828,7 +857,7 @@ impl OnceLogWriter {
828857
/// Post-condition:
829858
/// - gc/GARBAGE and the files it references get deleted.
830859
#[tracing::instrument(skip(self, options))]
831-
async fn garbage_collect_phase3_delete_garbage(
860+
pub(crate) async fn garbage_collect_phase3_delete_garbage(
832861
&self,
833862
options: &GarbageCollectionOptions,
834863
) -> Result<(), Error> {
@@ -907,7 +936,7 @@ impl OnceLogWriter {
907936
}
908937

909938
#[tracing::instrument(skip(self))]
910-
async fn garbage_collect(
939+
pub(crate) async fn garbage_collect(
911940
&self,
912941
options: &GarbageCollectionOptions,
913942
keep_at_least: Option<LogPosition>,

0 commit comments

Comments
 (0)