@@ -24,7 +24,7 @@ use std::ops::Range;
2424use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
2525use std:: sync:: Arc ;
2626
27- use anyhow:: bail;
27+ use anyhow:: { bail, Context } ;
2828use async_trait:: async_trait;
2929use fail:: fail_point;
3030use itertools:: Itertools ;
@@ -33,7 +33,7 @@ use quickwit_metastore::{Metastore, SplitMetadata};
3333use quickwit_storage:: SplitPayloadBuilder ;
3434use tantivy:: chrono:: Utc ;
3535use tokio:: sync:: oneshot:: Receiver ;
36- use tokio:: sync:: Semaphore ;
36+ use tokio:: sync:: { OwnedSemaphorePermit , Semaphore } ;
3737use tracing:: { info, info_span, warn, Instrument , Span } ;
3838
3939use crate :: models:: { PackagedSplit , PackagedSplitBatch , PublishOperation , PublisherMessage } ;
@@ -66,6 +66,16 @@ impl Uploader {
6666 counters : Default :: default ( ) ,
6767 }
6868 }
69+
70+ async fn acquire_semaphore (
71+ & self ,
72+ ctx : & ActorContext < Self > ,
73+ ) -> anyhow:: Result < OwnedSemaphorePermit > {
74+ let _guard = ctx. protect_zone ( ) ;
75+ Semaphore :: acquire_owned ( self . concurrent_upload_permits . clone ( ) )
76+ . await
77+ . context ( "The uploader semaphore is closed. (This should never happen.)" )
78+ }
6979}
7080
7181#[ derive( Clone , Debug , Default ) ]
@@ -197,10 +207,7 @@ impl AsyncActor for Uploader {
197207 // For instance, when sending a message on a downstream actor with a saturated
198208 // mailbox.
199209 // This is meant to be fixed with ParallelActors.
200- let permit_guard = {
201- let _guard = ctx. protect_zone ( ) ;
202- Semaphore :: acquire_owned ( self . concurrent_upload_permits . clone ( ) ) . await
203- } ;
210+ let permit_guard = self . acquire_semaphore ( ctx) . await ?;
204211 let kill_switch = ctx. kill_switch ( ) . clone ( ) ;
205212 let split_ids = batch. split_ids ( ) ;
206213 if kill_switch. is_dead ( ) {
0 commit comments