Skip to content

Commit 26b8c16

Browse files
authored
[ENH][wal3] Make GC be delete-free for log/gc/GARBAGE (#4904)
## Description of changes AWS doesn't support delete of things with an If-Match header except in directory buckets and we are not using those. Instead overwrite using the same hand-over-hand pattern as the manifest. Load a GC, decide what to do while holding on the the ETag, and then overwrite the garbage with a sentinel rather than deleting it. ## Test plan CI - [wal3] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Documentation Changes N/A
1 parent 72985bd commit 26b8c16

File tree

2 files changed

+107
-42
lines changed

2 files changed

+107
-42
lines changed

rust/wal3/src/gc.rs

Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use crate::{
1717

1818
////////////////////////////////////////////// Garbage /////////////////////////////////////////////
1919

20-
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
20+
#[derive(Debug, Clone, Eq, PartialEq, serde::Deserialize, serde::Serialize)]
2121
pub struct Garbage {
2222
pub snapshots_to_drop: Vec<SnapshotPointer>,
2323
pub snapshots_to_make: Vec<Snapshot>,
@@ -33,6 +33,22 @@ pub struct Garbage {
3333
}
3434

3535
impl Garbage {
36+
pub fn empty() -> Self {
37+
Garbage {
38+
snapshots_to_drop: Vec::new(),
39+
snapshots_to_make: Vec::new(),
40+
snapshot_for_root: None,
41+
fragments_to_drop_start: FragmentSeqNo(0),
42+
fragments_to_drop_limit: FragmentSeqNo(0),
43+
setsum_to_discard: Setsum::default(),
44+
first_to_keep: LogPosition::from_offset(1),
45+
}
46+
}
47+
48+
pub fn is_empty(&self) -> bool {
49+
*self == Self::empty()
50+
}
51+
3652
pub fn path(prefix: &str) -> String {
3753
format!("{}/gc/GARBAGE", prefix)
3854
}
@@ -151,32 +167,62 @@ impl Garbage {
151167
options: &ThrottleOptions,
152168
storage: &Storage,
153169
prefix: &str,
170+
existing: Option<&ETag>,
171+
) -> Result<Option<ETag>, Error> {
172+
self.install_new_snapshots(storage, prefix, options).await?;
173+
Self::transition(options, storage, prefix, existing, self).await
174+
}
175+
176+
#[tracing::instrument(skip(self, storage), err(Display))]
177+
pub async fn reset(
178+
&self,
179+
options: &ThrottleOptions,
180+
storage: &Storage,
181+
prefix: &str,
182+
existing: &ETag,
154183
) -> Result<Option<ETag>, Error> {
155184
self.install_new_snapshots(storage, prefix, options).await?;
185+
match Self::transition(options, storage, prefix, Some(existing), &Self::empty()).await {
186+
Ok(e_tag) => Ok(e_tag),
187+
Err(Error::LogContentionFailure) => Ok(None),
188+
Err(err) => Err(err),
189+
}
190+
}
191+
192+
async fn transition(
193+
options: &ThrottleOptions,
194+
storage: &Storage,
195+
prefix: &str,
196+
existing: Option<&ETag>,
197+
replacement: &Self,
198+
) -> Result<Option<ETag>, Error> {
156199
let exp_backoff = crate::backoff::ExponentialBackoff::new(
157200
options.throughput as f64,
158201
options.headroom as f64,
159202
);
160203
let mut retry_count = 0;
161204
loop {
162205
let path = Self::path(prefix);
163-
let payload = serde_json::to_string(&self)
206+
let payload = serde_json::to_string(replacement)
164207
.map_err(|e| {
165208
Error::CorruptManifest(format!("could not encode JSON garbage: {e:?}"))
166209
})?
167210
.into_bytes();
168-
let options = PutOptions::if_not_exists(StorageRequestPriority::P0);
211+
let options = if let Some(e_tag) = existing {
212+
PutOptions::if_matches(e_tag, StorageRequestPriority::P0)
213+
} else {
214+
PutOptions::if_not_exists(StorageRequestPriority::P0)
215+
};
169216
match storage.put_bytes(&path, payload, options).await {
170217
Ok(e_tag) => return Ok(e_tag),
171218
Err(StorageError::Precondition { path: _, source: _ }) => {
172-
// NOTE(rescrv): We know that someone put the file before us, and therefore we
173-
// know this write failed. Because the garbage file is created and deleted
174-
// we cannot just overwrite, so fail with log contention and let higher level
175-
// protocol decide.
176-
return Err(Error::LogContentionRetry);
219+
// NOTE(rescrv): We know that we put the file. The e_tag no longer matches.
220+
// Therefore, we know someone else transitioned the file and our reset should
221+
// be a NOP.
222+
return Err(Error::LogContentionFailure);
177223
}
178224
Err(e) => {
179-
tracing::error!("error uploading manifest: {e:?}");
225+
tracing::error!("error uploading garbage: {e:?}");
180226
let backoff = exp_backoff.next();
181227
if backoff > Duration::from_secs(60) || retry_count >= 3 {
182228
return Err(Arc::new(e).into());

rust/wal3/src/writer.rs

Lines changed: 52 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::time::{Duration, Instant, SystemTime};
55

66
use arrow::array::{ArrayRef, BinaryArray, RecordBatch, UInt64Array};
77
use chroma_storage::admissioncontrolleds3::StorageRequestPriority;
8-
use chroma_storage::{DeleteOptions, GetOptions, PutOptions, Storage, StorageError};
8+
use chroma_storage::{DeleteOptions, ETag, GetOptions, PutOptions, Storage, StorageError};
99
use parquet::arrow::ArrowWriter;
1010
use parquet::basic::Compression;
1111
use parquet::file::properties::WriterProperties;
@@ -582,6 +582,15 @@ impl OnceLogWriter {
582582

583583
#[tracing::instrument(skip(self))]
584584
async fn garbage_collect(&self, options: &GarbageCollectionOptions) -> Result<(), Error> {
585+
self.garbage_collect_recursive(options, false, None).await
586+
}
587+
588+
async fn garbage_collect_recursive(
589+
&self,
590+
options: &GarbageCollectionOptions,
591+
base: bool,
592+
existing: Option<&ETag>,
593+
) -> Result<(), Error> {
585594
let cutoff = self.garbage_collection_cutoff().await?;
586595
self.manifest_manager.heartbeat().await?;
587596
let garbage = self
@@ -590,16 +599,40 @@ impl OnceLogWriter {
590599
.compute_garbage(options, cutoff, &())
591600
.await?;
592601
let (garbage, e_tag) = match garbage
593-
.install(&self.options.throttle_manifest, &self.storage, &self.prefix)
602+
.install(
603+
&self.options.throttle_manifest,
604+
&self.storage,
605+
&self.prefix,
606+
existing,
607+
)
594608
.await
595609
{
596610
Ok(e_tag) => (garbage, e_tag),
597-
Err(Error::LogContentionRetry) => {
611+
Err(Error::LogContentionFailure)
612+
| Err(Error::LogContentionRetry)
613+
| Err(Error::LogContentionDurable) => {
598614
match Garbage::load(&self.options.throttle_manifest, &self.storage, &self.prefix)
599615
.await
600616
{
601-
Ok(Some((garbage, e_tag))) => (garbage, e_tag),
602-
Ok(None) => return Err(Error::LogContentionFailure),
617+
Ok(Some((garbage, e_tag))) => {
618+
if garbage.is_empty() {
619+
if base {
620+
return Err(Error::LogContentionRetry);
621+
} else {
622+
return Box::pin(self.garbage_collect_recursive(
623+
options,
624+
true,
625+
e_tag.as_ref(),
626+
))
627+
.await;
628+
}
629+
} else {
630+
(garbage, e_tag)
631+
}
632+
}
633+
Ok(None) => {
634+
return Err(Error::LogContentionFailure);
635+
}
603636
Err(err) => {
604637
return Err(err);
605638
}
@@ -609,10 +642,15 @@ impl OnceLogWriter {
609642
return Err(err);
610643
}
611644
};
645+
let Some(e_tag) = e_tag else {
646+
return Err(Error::GarbageCollection(
647+
"installed garbage without an ETag".to_string(),
648+
));
649+
};
612650
let paths = garbage
613651
.prefixed_paths_to_delete(&self.prefix)
614652
.collect::<Vec<_>>();
615-
self.manifest_manager.apply_garbage(garbage).await?;
653+
self.manifest_manager.apply_garbage(garbage.clone()).await?;
616654
let exp_backoff: ExponentialBackoff = options.throttle.into();
617655
let start = Instant::now();
618656
for path in paths {
@@ -637,33 +675,14 @@ impl OnceLogWriter {
637675
}
638676
}
639677
}
640-
loop {
641-
let options = if let Some(e_tag) = e_tag.as_ref() {
642-
DeleteOptions::if_matches(e_tag, StorageRequestPriority::P0)
643-
} else {
644-
DeleteOptions::default()
645-
};
646-
match self
647-
.storage
648-
.delete(&Garbage::path(&self.prefix), options)
649-
.await
650-
{
651-
Ok(()) => break,
652-
Err(StorageError::NotFound { .. }) => break,
653-
Err(err) => {
654-
tracing::error!("could not cleanup garbage: {err:?}");
655-
if start.elapsed() > Duration::from_secs(600) {
656-
tracing::error!("could not cleanup garbage within 10 minutes, returning");
657-
return Err(Error::StorageError(Arc::new(err)));
658-
}
659-
let mut backoff = exp_backoff.next();
660-
if backoff > Duration::from_secs(600) {
661-
backoff = Duration::from_secs(600);
662-
}
663-
tokio::time::sleep(backoff).await;
664-
}
665-
}
666-
}
678+
garbage
679+
.reset(
680+
&self.options.throttle_manifest,
681+
&self.storage,
682+
&self.prefix,
683+
&e_tag,
684+
)
685+
.await?;
667686
Ok(())
668687
}
669688

0 commit comments

Comments
 (0)