Skip to content

Commit fe7622e

Browse files
authored
refactor(query): add lock for meta semaphore request (#18579)
1 parent a332371 commit fe7622e

File tree

4 files changed

+19
-0
lines changed

4 files changed

+19
-0
lines changed

src/common/base/src/runtime/workload_group.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::sync::atomic::AtomicUsize;
1919
use std::sync::Arc;
2020
use std::time::Duration;
2121

22+
use tokio::sync::Mutex;
2223
use tokio::sync::Semaphore;
2324

2425
use crate::runtime::MemStat;
@@ -80,6 +81,7 @@ pub struct WorkloadGroupResource {
8081
pub meta: WorkloadGroup,
8182
pub queue_key: String,
8283
pub permits: usize,
84+
pub mutex: Arc<Mutex<()>>,
8385
pub semaphore: Arc<Semaphore>,
8486
pub mem_stat: Arc<MemStat>,
8587
pub max_memory_usage: Arc<AtomicUsize>,

src/query/management/src/workload/workload_resource.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Mutex;
2020
use std::sync::PoisonError;
2121
use std::sync::Weak;
2222

23+
use databend_common_base::base::tokio::sync::Mutex as TokioMutex;
2324
use databend_common_base::runtime::workload_group::QuotaValue;
2425
use databend_common_base::runtime::workload_group::WorkloadGroupResource;
2526
use databend_common_base::runtime::workload_group::MAX_CONCURRENCY_QUOTA_KEY;
@@ -153,6 +154,7 @@ impl WorkloadGroupResourceManagerInner {
153154
}
154155
})),
155156
permits: Semaphore::MAX_PERMITS,
157+
mutex: Arc::new(TokioMutex::new(())),
156158
semaphore: Arc::new(Semaphore::new(Semaphore::MAX_PERMITS)),
157159
});
158160

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use std::time::SystemTime;
2929

3030
use databend_common_ast::ast::ExplainKind;
3131
use databend_common_base::base::escape_for_key;
32+
use databend_common_base::base::tokio::sync::Mutex as TokioMutex;
3233
use databend_common_base::base::GlobalInstance;
3334
use databend_common_base::base::WatchNotify;
3435
use databend_common_base::runtime::workload_group::QuotaValue;
@@ -100,6 +101,7 @@ pub(crate) struct Inner<Data: QueueData> {
100101
pub struct QueueManager<Data: QueueData> {
101102
permits: usize,
102103
meta_store: MetaStore,
104+
tokio_mutex: Arc<TokioMutex<()>>,
103105
semaphore: Arc<Semaphore>,
104106
global_statement_queue: bool,
105107
queue: Mutex<HashMap<Data::Key, Inner<Data>>>,
@@ -152,6 +154,7 @@ impl<Data: QueueData> QueueManager<Data> {
152154
global_statement_queue,
153155
queue: Mutex::new(HashMap::new()),
154156
semaphore: Arc::new(Semaphore::new(permits)),
157+
tokio_mutex: Arc::new(TokioMutex::new(())),
155158
})
156159
}
157160

@@ -241,6 +244,9 @@ impl<Data: QueueData> QueueManager<Data> {
241244

242245
timeout -= instant.elapsed();
243246

247+
// Prevent concurrent access to meta and serialize the submission of acquire requests.
248+
// This ensures that at most permits + nodes acquirers will be in the queue at any given time.
249+
let _guard = workload_group.mutex.clone().lock_owned().await;
244250
data.set_status("[QUERY-QUEUE] Waiting for global workload semaphore");
245251

246252
let workload_queue_guard = self
@@ -331,6 +337,9 @@ impl<Data: QueueData> QueueManager<Data> {
331337
data.lock_ttl(),
332338
);
333339

340+
// Prevent concurrent access to meta and serialize the submission of acquire requests.
341+
// This ensures that at most permits + nodes acquirers will be in the queue at any given time.
342+
let _guard = self.tokio_mutex.clone().lock_owned().await;
334343
let acquire = tokio::time::timeout(timeout, semaphore_acquire);
335344
let queue_future = AcquireQueueFuture::create(data, acquire, self.clone());
336345
queue_future.await

src/query/service/tests/it/sessions/queue_mgr.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::time::Instant;
1919
use std::time::SystemTime;
2020
use std::time::UNIX_EPOCH;
2121

22+
use databend_common_base::base::tokio::sync::Mutex;
2223
use databend_common_base::base::WatchNotify;
2324
use databend_common_base::runtime::workload_group::QuotaValue;
2425
use databend_common_base::runtime::workload_group::WorkloadGroup;
@@ -680,6 +681,7 @@ async fn test_workload_group_concurrency_control() -> Result<()> {
680681
},
681682
queue_key: "test_concurrency_queue".to_string(),
682683
permits: 2,
684+
mutex: Arc::new(Mutex::new(())),
683685
semaphore: Arc::new(Semaphore::new(2)),
684686
mem_stat: MemStat::create(String::new()),
685687
max_memory_usage: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
@@ -736,6 +738,7 @@ async fn test_workload_group_concurrent_queue_acquisition() -> Result<()> {
736738
},
737739
queue_key: "test_concurrent_acquisition".to_string(),
738740
permits: 1,
741+
mutex: Arc::new(Default::default()),
739742
semaphore: Arc::new(Semaphore::new(1)),
740743
mem_stat: MemStat::create(String::new()),
741744
max_memory_usage: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
@@ -798,6 +801,7 @@ async fn test_workload_group_multilevel_queue_guards() -> Result<()> {
798801
},
799802
queue_key: "test_multilevel_queue".to_string(),
800803
permits: 3,
804+
mutex: Arc::new(Default::default()),
801805
semaphore: Arc::new(Semaphore::new(3)),
802806
mem_stat: MemStat::create(String::new()),
803807
max_memory_usage: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
@@ -849,6 +853,7 @@ async fn test_workload_group_zero_concurrency() -> Result<()> {
849853
},
850854
queue_key: "test_zero_queue".to_string(),
851855
permits: 0,
856+
mutex: Arc::new(Default::default()),
852857
semaphore: Arc::new(Semaphore::new(0)),
853858
mem_stat: MemStat::create(String::new()),
854859
max_memory_usage: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
@@ -897,6 +902,7 @@ async fn test_workload_group_with_timeout() -> Result<()> {
897902
},
898903
queue_key: "test_timeout_queue".to_string(),
899904
permits: 1,
905+
mutex: Arc::new(Default::default()),
900906
semaphore: Arc::new(Semaphore::new(1)),
901907
mem_stat: MemStat::create(String::new()),
902908
max_memory_usage: Arc::new(std::sync::atomic::AtomicUsize::new(0)),

0 commit comments

Comments
 (0)