@@ -53,6 +53,9 @@ use databend_common_sql::PlanExtras;
5353use log:: info;
5454use parking_lot:: Mutex ;
5555use pin_project_lite:: pin_project;
56+ use tokio:: sync:: AcquireError as TokioAcquireError ;
57+ use tokio:: sync:: OwnedSemaphorePermit ;
58+ use tokio:: sync:: Semaphore ;
5659use tokio:: time:: error:: Elapsed ;
5760
5861use crate :: sessions:: QueryContext ;
@@ -87,6 +90,8 @@ pub(crate) struct Inner<Data: QueueData> {
8790pub struct QueueManager < Data : QueueData > {
8891 permits : usize ,
8992 meta_store : MetaStore ,
93+ semaphore : Arc < Semaphore > ,
94+ global_statement_queue : bool ,
9095 queue : Mutex < HashMap < Data :: Key , Inner < Data > > > ,
9196}
9297
@@ -101,23 +106,33 @@ impl<Data: QueueData> QueueManager<Data> {
101106 } ;
102107
103108 info ! ( "queue manager permits: {:?}" , permits) ;
104- GlobalInstance :: set ( Self :: create ( permits, metastore) ) ;
109+ GlobalInstance :: set ( Self :: create (
110+ permits,
111+ metastore,
112+ conf. query . global_statement_queue ,
113+ ) ) ;
105114 Ok ( ( ) )
106115 }
107116
108117 pub fn instance ( ) -> Arc < Self > {
109118 GlobalInstance :: get :: < Arc < Self > > ( )
110119 }
111120
112- pub fn create ( mut permits : usize , meta_store : MetaStore ) -> Arc < QueueManager < Data > > {
121+ pub fn create (
122+ mut permits : usize ,
123+ meta_store : MetaStore ,
124+ global_statement_queue : bool ,
125+ ) -> Arc < QueueManager < Data > > {
113126 if permits == 0 {
114127 permits = usize:: MAX >> 4 ;
115128 }
116129
117130 Arc :: new ( QueueManager {
118131 permits,
119132 meta_store,
133+ global_statement_queue,
120134 queue : Mutex :: new ( HashMap :: new ( ) ) ,
135+ semaphore : Arc :: new ( Semaphore :: new ( permits) ) ,
121136 } )
122137 }
123138
@@ -156,21 +171,35 @@ impl<Data: QueueData> QueueManager<Data> {
156171 ) ;
157172
158173 let timeout = data. timeout ( ) ;
159- let semaphore_acquire = self . meta_store . new_acquired (
160- data. get_lock_key ( ) ,
161- self . permits as u64 ,
162- data. get_key ( ) , // ID of this acquirer
163- data. lock_ttl ( ) ,
164- ) ;
165174
166- let future = AcquireQueueFuture :: create (
167- Arc :: new ( data) ,
168- tokio:: time:: timeout ( timeout, semaphore_acquire) ,
169- self . clone ( ) ,
170- ) ;
171175 let start_time = SystemTime :: now ( ) ;
176+ let acquire_res = match self . global_statement_queue {
177+ true => {
178+ let semaphore_acquire = self . meta_store . new_acquired (
179+ data. get_lock_key ( ) ,
180+ self . permits as u64 ,
181+ data. get_key ( ) , // ID of this acquirer
182+ data. lock_ttl ( ) ,
183+ ) ;
172184
173- return match future. await {
185+ AcquireQueueFuture :: create (
186+ Arc :: new ( data) ,
187+ tokio:: time:: timeout ( timeout, semaphore_acquire) ,
188+ self . clone ( ) ,
189+ )
190+ . await
191+ }
192+ false => {
193+ AcquireQueueFuture :: create (
194+ Arc :: new ( data) ,
195+ tokio:: time:: timeout ( timeout, self . semaphore . clone ( ) . acquire_owned ( ) ) ,
196+ self . clone ( ) ,
197+ )
198+ . await
199+ }
200+ } ;
201+
202+ return match acquire_res {
174203 Ok ( v) => {
175204 info ! ( "finished acquiring from queue, length: {}" , self . length( ) ) ;
176205
@@ -197,7 +226,7 @@ impl<Data: QueueData> QueueManager<Data> {
197226 } ;
198227 }
199228
200- Ok ( AcquireQueueGuard :: create ( None ) )
229+ Ok ( AcquireQueueGuard :: create_global ( None ) )
201230 }
202231
203232 pub ( crate ) fn add_entity ( & self , inner : Inner < Data > ) -> Data :: Key {
@@ -231,28 +260,35 @@ impl<Data: QueueData> QueueManager<Data> {
231260 }
232261}
233262
234- pub struct AcquireQueueGuard {
235- # [ allow ( dead_code ) ]
236- permit : Option < Permit > ,
263+ pub enum AcquireQueueGuard {
264+ Global ( Option < Permit > ) ,
265+ Local ( Option < OwnedSemaphorePermit > ) ,
237266}
238267
239268impl Drop for AcquireQueueGuard {
240269 fn drop ( & mut self ) {
241- if self . permit . is_some ( ) {
242- dec_session_running_acquired_queries ( ) ;
270+ match self {
271+ AcquireQueueGuard :: Local ( Some ( _) ) | AcquireQueueGuard :: Global ( Some ( _) ) => {
272+ dec_session_running_acquired_queries ( ) ;
273+ }
274+ _ => { }
243275 }
244276 }
245277}
246278
247279impl AcquireQueueGuard {
248- pub fn create ( permit : Option < Permit > ) -> Self {
249- AcquireQueueGuard { permit }
280+ pub fn create_global ( permit : Option < Permit > ) -> Self {
281+ AcquireQueueGuard :: Global ( permit)
282+ }
283+
284+ pub fn create_local ( permit : Option < OwnedSemaphorePermit > ) -> Self {
285+ AcquireQueueGuard :: Local ( permit)
250286 }
251287}
252288
253289pin_project ! {
254- pub struct AcquireQueueFuture <Data : QueueData , T >
255- where T : Future <Output = std:: result:: Result <std:: result:: Result <Permit , AcquireError >, Elapsed >>
290+ pub struct AcquireQueueFuture <Data : QueueData , T , Permit , E >
291+ where T : Future <Output = std:: result:: Result <std:: result:: Result <Permit , E >, Elapsed >>
256292{
257293 #[ pin]
258294 inner: T ,
@@ -266,8 +302,8 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquir
266302}
267303}
268304
269- impl < Data : QueueData , T > AcquireQueueFuture < Data , T >
270- where T : Future < Output = std:: result:: Result < std:: result:: Result < Permit , AcquireError > , Elapsed > >
305+ impl < Data : QueueData , T , Permit , E > AcquireQueueFuture < Data , T , Permit , E >
306+ where T : Future < Output = std:: result:: Result < std:: result:: Result < Permit , E > , Elapsed > >
271307{
272308 pub fn create ( data : Arc < Data > , inner : T , mgr : Arc < QueueManager < Data > > ) -> Self {
273309 AcquireQueueFuture {
@@ -281,53 +317,60 @@ where T: Future<Output = std::result::Result<std::result::Result<Permit, Acquire
281317 }
282318}
283319
284- impl < Data : QueueData , T > Future for AcquireQueueFuture < Data , T >
285- where T : Future < Output = std:: result:: Result < std:: result:: Result < Permit , AcquireError > , Elapsed > >
286- {
287- type Output = Result < AcquireQueueGuard > ;
320+ macro_rules! impl_acquire_queue_future {
321+ ( $Permit: ty, $fn_name: ident, $Error: ty) => {
322+ impl <Data : QueueData , T > Future for AcquireQueueFuture <Data , T , $Permit, $Error>
323+ where T : Future <Output = std:: result:: Result <std:: result:: Result <$Permit, $Error>, Elapsed >>
324+ {
325+ type Output = Result <AcquireQueueGuard >;
288326
289- fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
290- let this = self . project ( ) ;
327+ fn poll( self : Pin <& mut Self >, cx: & mut Context <' _>) -> Poll <Self :: Output > {
328+ let this = self . project( ) ;
291329
292- if this. is_abort . load ( Ordering :: SeqCst ) {
293- return Poll :: Ready ( Err ( Data :: remove_error_message ( this. key . take ( ) ) ) ) ;
294- }
330+ if this. is_abort. load( Ordering :: SeqCst ) {
331+ return Poll :: Ready ( Err ( Data :: remove_error_message( this. key. take( ) ) ) ) ;
332+ }
333+
334+ match this. inner. poll( cx) {
335+ Poll :: Ready ( res) => {
336+ if let Some ( key) = this. key. take( ) {
337+ if this. manager. remove_entity( & key) . is_none( ) {
338+ return Poll :: Ready ( Err ( Data :: remove_error_message( Some ( key) ) ) ) ;
339+ }
340+ }
295341
296- match this . inner . poll ( cx ) {
297- Poll :: Ready ( res ) => {
298- if let Some ( key ) = this . key . take ( ) {
299- if this . manager . remove_entity ( & key ) . is_none ( ) {
300- return Poll :: Ready ( Err ( Data :: remove_error_message ( Some ( key ) ) ) ) ;
342+ Poll :: Ready ( match res {
343+ Ok ( Ok ( v ) ) => Ok ( AcquireQueueGuard :: $fn_name ( Some ( v ) ) ) ,
344+ Ok ( Err ( _ ) ) => Err ( ErrorCode :: TokioError ( "acquire queue failure." ) ) ,
345+ Err ( _elapsed ) => Err ( ErrorCode :: Timeout ( "query queuing timeout" ) ) ,
346+ } )
301347 }
302- }
348+ Poll :: Pending => {
349+ if !* this. has_pending {
350+ * this. has_pending = true ;
351+ }
303352
304- Poll :: Ready ( match res {
305- Ok ( Ok ( v) ) => Ok ( AcquireQueueGuard :: create ( Some ( v) ) ) ,
306- Ok ( Err ( _) ) => Err ( ErrorCode :: TokioError ( "acquire queue failure." ) ) ,
307- Err ( _elapsed) => Err ( ErrorCode :: Timeout ( "query queuing timeout" ) ) ,
308- } )
309- }
310- Poll :: Pending => {
311- if !* this. has_pending {
312- * this. has_pending = true ;
313- }
353+ if let Some ( data) = this. data. take( ) {
354+ let waker = cx. waker( ) . clone( ) ;
355+ * this. key = Some ( this. manager. add_entity( Inner {
356+ data,
357+ waker,
358+ instant: Instant :: now( ) ,
359+ is_abort: this. is_abort. clone( ) ,
360+ } ) ) ;
361+ }
314362
315- if let Some ( data) = this. data . take ( ) {
316- let waker = cx. waker ( ) . clone ( ) ;
317- * this. key = Some ( this. manager . add_entity ( Inner {
318- data,
319- waker,
320- instant : Instant :: now ( ) ,
321- is_abort : this. is_abort . clone ( ) ,
322- } ) ) ;
363+ Poll :: Pending
364+ }
323365 }
324-
325- Poll :: Pending
326366 }
327367 }
328- }
368+ } ;
329369}
330370
371+ impl_acquire_queue_future ! ( Permit , create_global, AcquireError ) ;
372+ impl_acquire_queue_future ! ( OwnedSemaphorePermit , create_local, TokioAcquireError ) ;
373+
331374pub struct QueryEntry {
332375 ctx : Arc < QueryContext > ,
333376 pub query_id : String ,
0 commit comments