Skip to content

Commit 748a73d

Browse files
authored
Merge pull request #1765 from tursodatabase/compaction-failure-error-report
correctly report error on compaction failure
2 parents 54ff421 + eb59a04 commit 748a73d

File tree

1 file changed

+46
-27
lines changed
  • libsql-wal/src/storage/compaction

1 file changed

+46
-27
lines changed

libsql-wal/src/storage/compaction/mod.rs

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl<B> Compactor<B> {
207207
out_index: Option<&'a mut MapBuilder<Vec<u8>>>,
208208
mut progress: impl FnMut(u32, u32) + 'a,
209209
) -> (
210-
impl Stream<Item = Result<(CompactedFrameHeader, Bytes)>> + 'a,
210+
impl Stream<Item = crate::storage::Result<(CompactedFrameHeader, Bytes)>> + 'a,
211211
CompactedSegmentHeader,
212212
)
213213
where
@@ -363,7 +363,7 @@ impl<B> Compactor<B> {
363363
let mut builder = MapBuilder::new(Vec::new()).unwrap();
364364

365365
let (sender, mut receiver) = tokio::sync::mpsc::channel::<crate::storage::Result<Bytes>>(1);
366-
let handle: JoinHandle<Result<()>> = match out_path {
366+
let mut handle: JoinHandle<Result<()>> = match out_path {
367367
Some(path) => {
368368
let path = path.join(&format!("{new_key}.seg"));
369369
let mut data_file = tokio::fs::File::create(path).await?;
@@ -397,37 +397,56 @@ impl<B> Compactor<B> {
397397
}
398398
};
399399

400-
let (stream, segment_header) = self
401-
.dedup_stream(set.clone(), Some(&mut builder), progress)
402-
.await;
400+
let send_fut = async {
401+
let (stream, segment_header) = self
402+
.dedup_stream(set.clone(), Some(&mut builder), progress)
403+
.await;
403404

404-
sender
405-
.send(Ok(Bytes::copy_from_slice(segment_header.as_bytes())))
406-
.await
407-
.unwrap();
405+
if sender
406+
.send(Ok(Bytes::copy_from_slice(segment_header.as_bytes())))
407+
.await
408+
.is_err()
409+
{
410+
return;
411+
}
408412

409-
{
410-
tokio::pin!(stream);
411-
loop {
412-
match stream.next().await {
413-
Some(Ok((frame_header, frame_data))) => {
414-
sender
415-
.send(Ok(Bytes::copy_from_slice(frame_header.as_bytes())))
416-
.await
417-
.unwrap();
418-
sender.send(Ok(frame_data)).await.unwrap();
419-
}
420-
Some(Err(_e)) => {
421-
panic!()
422-
// sender.send(Err(e.into())).await.unwrap();
413+
{
414+
tokio::pin!(stream);
415+
loop {
416+
match stream.next().await {
417+
Some(Ok((frame_header, frame_data))) => {
418+
if sender
419+
.send(Ok(Bytes::copy_from_slice(frame_header.as_bytes())))
420+
.await
421+
.is_err()
422+
{
423+
return;
424+
}
425+
if sender.send(Ok(frame_data)).await.is_err() {
426+
return;
427+
}
428+
}
429+
Some(Err(e)) => {
430+
sender.send(Err(e.into())).await.unwrap();
431+
return;
432+
}
433+
None => break,
423434
}
424-
None => break,
425435
}
436+
437+
drop(sender);
426438
}
427-
drop(sender);
428-
}
439+
};
440+
441+
tokio::select! {
442+
res = &mut handle => {
443+
res.unwrap()?;
444+
},
445+
_ = send_fut => {
446+
handle.await.unwrap()?;
429447

430-
handle.await.unwrap()?;
448+
}
449+
}
431450

432451
let index = builder.into_inner().unwrap();
433452
match out_path {

0 commit comments

Comments
 (0)