@@ -207,7 +207,7 @@ impl<B> Compactor<B> {
207
207
out_index : Option < & ' a mut MapBuilder < Vec < u8 > > > ,
208
208
mut progress : impl FnMut ( u32 , u32 ) + ' a ,
209
209
) -> (
210
- impl Stream < Item = Result < ( CompactedFrameHeader , Bytes ) > > + ' a ,
210
+ impl Stream < Item = crate :: storage :: Result < ( CompactedFrameHeader , Bytes ) > > + ' a ,
211
211
CompactedSegmentHeader ,
212
212
)
213
213
where
@@ -363,7 +363,7 @@ impl<B> Compactor<B> {
363
363
let mut builder = MapBuilder :: new ( Vec :: new ( ) ) . unwrap ( ) ;
364
364
365
365
let ( sender, mut receiver) = tokio:: sync:: mpsc:: channel :: < crate :: storage:: Result < Bytes > > ( 1 ) ;
366
- let handle: JoinHandle < Result < ( ) > > = match out_path {
366
+ let mut handle: JoinHandle < Result < ( ) > > = match out_path {
367
367
Some ( path) => {
368
368
let path = path. join ( & format ! ( "{new_key}.seg" ) ) ;
369
369
let mut data_file = tokio:: fs:: File :: create ( path) . await ?;
@@ -397,37 +397,56 @@ impl<B> Compactor<B> {
397
397
}
398
398
} ;
399
399
400
- let ( stream, segment_header) = self
401
- . dedup_stream ( set. clone ( ) , Some ( & mut builder) , progress)
402
- . await ;
400
+ let send_fut = async {
401
+ let ( stream, segment_header) = self
402
+ . dedup_stream ( set. clone ( ) , Some ( & mut builder) , progress)
403
+ . await ;
403
404
404
- sender
405
- . send ( Ok ( Bytes :: copy_from_slice ( segment_header. as_bytes ( ) ) ) )
406
- . await
407
- . unwrap ( ) ;
405
+ if sender
406
+ . send ( Ok ( Bytes :: copy_from_slice ( segment_header. as_bytes ( ) ) ) )
407
+ . await
408
+ . is_err ( )
409
+ {
410
+ return ;
411
+ }
408
412
409
- {
410
- tokio:: pin!( stream) ;
411
- loop {
412
- match stream. next ( ) . await {
413
- Some ( Ok ( ( frame_header, frame_data) ) ) => {
414
- sender
415
- . send ( Ok ( Bytes :: copy_from_slice ( frame_header. as_bytes ( ) ) ) )
416
- . await
417
- . unwrap ( ) ;
418
- sender. send ( Ok ( frame_data) ) . await . unwrap ( ) ;
419
- }
420
- Some ( Err ( _e) ) => {
421
- panic ! ( )
422
- // sender.send(Err(e.into())).await.unwrap();
413
+ {
414
+ tokio:: pin!( stream) ;
415
+ loop {
416
+ match stream. next ( ) . await {
417
+ Some ( Ok ( ( frame_header, frame_data) ) ) => {
418
+ if sender
419
+ . send ( Ok ( Bytes :: copy_from_slice ( frame_header. as_bytes ( ) ) ) )
420
+ . await
421
+ . is_err ( )
422
+ {
423
+ return ;
424
+ }
425
+ if sender. send ( Ok ( frame_data) ) . await . is_err ( ) {
426
+ return ;
427
+ }
428
+ }
429
+ Some ( Err ( e) ) => {
430
+ sender. send ( Err ( e. into ( ) ) ) . await . unwrap ( ) ;
431
+ return ;
432
+ }
433
+ None => break ,
423
434
}
424
- None => break ,
425
435
}
436
+
437
+ drop ( sender) ;
426
438
}
427
- drop ( sender) ;
428
- }
439
+ } ;
440
+
441
+ tokio:: select! {
442
+ res = & mut handle => {
443
+ res. unwrap( ) ?;
444
+ } ,
445
+ _ = send_fut => {
446
+ handle. await . unwrap( ) ?;
429
447
430
- handle. await . unwrap ( ) ?;
448
+ }
449
+ }
431
450
432
451
let index = builder. into_inner ( ) . unwrap ( ) ;
433
452
match out_path {
0 commit comments