Skip to content

Commit 3dc7840

Browse files
authored
feat(query): support cluster level concurrent limit (#17778)
* feat(query): support cluster level concurrent limit * feat(query): support cluster level concurrent limit * feat(query): support cluster level concurrent limit * feat(query): support cluster level concurrent limit * feat(query): support cluster level concurrent limit
1 parent 9d8c6dd commit 3dc7840

File tree

9 files changed

+152
-37
lines changed

9 files changed

+152
-37
lines changed

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/meta/raft-store/src/mem_meta.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::future;
1617
use std::io;
1718
use std::sync::Arc;
@@ -32,6 +33,7 @@ use futures_util::StreamExt;
3233
use futures_util::TryStreamExt;
3334
use log::debug;
3435
use tokio::sync::Mutex;
36+
use tokio::sync::Semaphore;
3537

3638
use crate::applier::Applier;
3739
use crate::mem_state_machine::MemStateMachine;
@@ -40,6 +42,7 @@ use crate::state_machine_api_ext::StateMachineApiExt;
4042
#[derive(Clone, Default)]
4143
pub struct MemMeta {
4244
sm: Arc<Mutex<MemStateMachine>>,
45+
pub locks: Arc<Mutex<HashMap<String, Arc<Semaphore>>>>,
4346
}
4447

4548
impl MemMeta {

src/meta/semaphore/src/acquirer/permit.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::storage::PermitKey;
3838
/// the oneshot sender signals the lease extending task to stop,
3939
/// allowing for proper cleanup of the [`PermitEntry`] in the meta-service.
4040
pub struct Permit {
41-
pub(crate) fu: BoxFuture<'static, Result<(), ConnectionClosed>>,
41+
pub fu: BoxFuture<'static, Result<(), ConnectionClosed>>,
4242
}
4343

4444
impl std::fmt::Debug for Permit {

src/meta/store/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ io-uring = []
1212

1313
[dependencies]
1414
async-trait = { workspace = true }
15+
databend-common-base = { workspace = true }
1516
databend-common-grpc = { workspace = true }
1617
databend-common-meta-client = { workspace = true }
1718
databend-common-meta-embedded = { workspace = true }
1819
databend-common-meta-kvapi = { workspace = true }
20+
databend-common-meta-semaphore = { workspace = true }
1921
databend-common-meta-types = { workspace = true }
2022
log = { workspace = true }
2123
tokio-stream = { workspace = true }

src/meta/store/src/lib.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::hash_map::Entry;
1516
use std::pin::Pin;
1617
use std::sync::Arc;
1718
use std::task::Context;
1819
use std::task::Poll;
20+
use std::time::Duration;
1921

22+
use databend_common_base::base::tokio::sync::Semaphore as TokioSemaphore;
2023
use databend_common_grpc::RpcClientConf;
2124
use databend_common_meta_client::errors::CreationError;
2225
use databend_common_meta_client::ClientHandle;
@@ -25,6 +28,10 @@ use databend_common_meta_embedded::MemMeta;
2528
use databend_common_meta_kvapi::kvapi;
2629
use databend_common_meta_kvapi::kvapi::KVStream;
2730
use databend_common_meta_kvapi::kvapi::UpsertKVReply;
31+
use databend_common_meta_semaphore::acquirer::Permit;
32+
use databend_common_meta_semaphore::errors::AcquireError;
33+
use databend_common_meta_semaphore::errors::ConnectionClosed;
34+
use databend_common_meta_semaphore::Semaphore;
2835
use databend_common_meta_types::protobuf::WatchRequest;
2936
use databend_common_meta_types::protobuf::WatchResponse;
3037
use databend_common_meta_types::MetaError;
@@ -80,6 +87,42 @@ impl MetaStore {
8087
}
8188
}
8289
}
90+
91+
pub async fn new_acquired(
92+
&self,
93+
prefix: impl ToString,
94+
capacity: u64,
95+
id: impl ToString,
96+
lease: Duration,
97+
) -> Result<Permit, AcquireError> {
98+
match self {
99+
MetaStore::L(v) => {
100+
let mut local_lock_map = v.locks.lock().await;
101+
102+
let acquire_res = match local_lock_map.entry(prefix.to_string()) {
103+
Entry::Occupied(v) => v.get().clone(),
104+
Entry::Vacant(v) => v
105+
.insert(Arc::new(TokioSemaphore::new(capacity as usize)))
106+
.clone(),
107+
};
108+
109+
match acquire_res.acquire_owned().await {
110+
Ok(guard) => Ok(Permit {
111+
fu: Box::pin(async move {
112+
let _guard = guard;
113+
Ok(())
114+
}),
115+
}),
116+
Err(_e) => Err(AcquireError::ConnectionClosed(ConnectionClosed::new_str(
117+
"",
118+
))),
119+
}
120+
}
121+
MetaStore::R(grpc_client) => {
122+
Semaphore::new_acquired(grpc_client.clone(), prefix, capacity, id, lease).await
123+
}
124+
}
125+
}
83126
}
84127

85128
#[async_trait::async_trait]

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ databend-common-meta-api = { workspace = true }
6868
databend-common-meta-app = { workspace = true }
6969
databend-common-meta-app-types = { workspace = true }
7070
databend-common-meta-kvapi = { workspace = true }
71+
databend-common-meta-semaphore = { workspace = true }
7172
databend-common-meta-store = { workspace = true }
7273
databend-common-meta-types = { workspace = true }
7374
databend-common-metrics = { workspace = true }

src/query/service/src/global_services.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl GlobalServices {
111111
CatalogManager::init(config, Arc::new(default_catalog), catalog_creator).await?;
112112
}
113113

114-
QueriesQueueManager::init(config.query.max_running_queries as usize)?;
114+
QueriesQueueManager::init(config.query.max_running_queries as usize, config).await?;
115115
HttpQueryManager::init(config).await?;
116116
ClientSessionManager::init(config).await?;
117117
DataExchangeManager::init()?;

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,17 @@ use std::time::Instant;
2828
use std::time::SystemTime;
2929

3030
use databend_common_ast::ast::ExplainKind;
31+
use databend_common_base::base::escape_for_key;
3132
use databend_common_base::base::GlobalInstance;
3233
use databend_common_catalog::table_context::TableContext;
34+
use databend_common_config::InnerConfig;
3335
use databend_common_exception::ErrorCode;
3436
use databend_common_exception::Result;
3537
use databend_common_meta_app::principal::UserInfo;
38+
use databend_common_meta_semaphore::acquirer::Permit;
39+
use databend_common_meta_semaphore::errors::AcquireError;
40+
use databend_common_meta_store::MetaStore;
41+
use databend_common_meta_store::MetaStoreProvider;
3642
use databend_common_metrics::session::dec_session_running_acquired_queries;
3743
use databend_common_metrics::session::inc_session_running_acquired_queries;
3844
use databend_common_metrics::session::incr_session_queue_abort_count;
@@ -47,18 +53,17 @@ use databend_common_sql::PlanExtras;
4753
use log::info;
4854
use parking_lot::Mutex;
4955
use pin_project_lite::pin_project;
50-
use tokio::sync::AcquireError;
51-
use tokio::sync::OwnedSemaphorePermit;
52-
use tokio::sync::Semaphore;
5356
use tokio::time::error::Elapsed;
5457

5558
use crate::sessions::QueryContext;
5659

5760
pub trait QueueData: Send + Sync + 'static {
58-
type Key: Send + Sync + Eq + Hash + Display + Clone + 'static;
61+
type Key: Send + Sync + Eq + Hash + Display + Clone + ToString + 'static;
5962

6063
fn get_key(&self) -> Self::Key;
6164

65+
fn get_lock_key(&self) -> String;
66+
6267
fn remove_error_message(key: Option<Self::Key>) -> ErrorCode;
6368

6469
fn timeout(&self) -> Duration;
@@ -78,29 +83,39 @@ pub(crate) struct Inner<Data: QueueData> {
7883
}
7984

8085
pub struct QueueManager<Data: QueueData> {
81-
semaphore: Arc<Semaphore>,
86+
permits: usize,
87+
meta_store: MetaStore,
8288
queue: Mutex<HashMap<Data::Key, Inner<Data>>>,
8389
}
8490

8591
impl<Data: QueueData> QueueManager<Data> {
86-
pub fn init(permits: usize) -> Result<()> {
92+
pub async fn init(permits: usize, conf: &InnerConfig) -> Result<()> {
93+
let metastore = {
94+
let provider = Arc::new(MetaStoreProvider::new(conf.meta.to_meta_grpc_client_conf()));
95+
96+
provider.create_meta_store().await.map_err(|e| {
97+
ErrorCode::MetaServiceError(format!("Failed to create meta store: {}", e))
98+
})?
99+
};
100+
87101
info!("queue manager permits: {:?}", permits);
88-
GlobalInstance::set(Self::create(permits));
102+
GlobalInstance::set(Self::create(permits, metastore));
89103
Ok(())
90104
}
91105

92106
pub fn instance() -> Arc<Self> {
93107
GlobalInstance::get::<Arc<Self>>()
94108
}
95109

96-
pub fn create(mut permits: usize) -> Arc<QueueManager<Data>> {
110+
pub fn create(mut permits: usize, meta_store: MetaStore) -> Arc<QueueManager<Data>> {
97111
if permits == 0 {
98112
permits = usize::MAX >> 4;
99113
}
100114

101115
Arc::new(QueueManager {
116+
permits,
117+
meta_store,
102118
queue: Mutex::new(HashMap::new()),
103-
semaphore: Arc::new(Semaphore::new(permits)),
104119
})
105120
}
106121

@@ -139,9 +154,16 @@ impl<Data: QueueData> QueueManager<Data> {
139154
);
140155

141156
let timeout = data.timeout();
157+
let semaphore_acquire = self.meta_store.new_acquired(
158+
data.get_lock_key(),
159+
self.permits as u64,
160+
data.get_key(), // ID of this acquirer
161+
Duration::from_secs(3),
162+
);
163+
142164
let future = AcquireQueueFuture::create(
143165
Arc::new(data),
144-
tokio::time::timeout(timeout, self.semaphore.clone().acquire_owned()),
166+
tokio::time::timeout(timeout, semaphore_acquire),
145167
self.clone(),
146168
);
147169
let start_time = SystemTime::now();
@@ -209,9 +231,12 @@ impl<Data: QueueData> QueueManager<Data> {
209231

210232
pub struct AcquireQueueGuard {
211233
#[allow(dead_code)]
212-
permit: Option<OwnedSemaphorePermit>,
234+
permit: Option<Permit>,
213235
}
214236

237+
unsafe impl Send for AcquireQueueGuard {}
238+
unsafe impl Sync for AcquireQueueGuard {}
239+
215240
impl Drop for AcquireQueueGuard {
216241
fn drop(&mut self) {
217242
if self.permit.is_some() {
@@ -221,14 +246,14 @@ impl Drop for AcquireQueueGuard {
221246
}
222247

223248
impl AcquireQueueGuard {
224-
pub fn create(permit: Option<OwnedSemaphorePermit>) -> Self {
249+
pub fn create(permit: Option<Permit>) -> Self {
225250
AcquireQueueGuard { permit }
226251
}
227252
}
228253

229254
pin_project! {
230255
pub struct AcquireQueueFuture<Data: QueueData, T>
231-
where T: Future<Output = std::result::Result< std::result::Result<OwnedSemaphorePermit, AcquireError>, Elapsed>>
256+
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
232257
{
233258
#[pin]
234259
inner: T,
@@ -243,12 +268,7 @@ where T: Future<Output = std::result::Result< std::result::Result<OwnedSemaphor
243268
}
244269

245270
impl<Data: QueueData, T> AcquireQueueFuture<Data, T>
246-
where T: Future<
247-
Output = std::result::Result<
248-
std::result::Result<OwnedSemaphorePermit, AcquireError>,
249-
Elapsed,
250-
>,
251-
>
271+
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
252272
{
253273
pub fn create(data: Arc<Data>, inner: T, mgr: Arc<QueueManager<Data>>) -> Self {
254274
AcquireQueueFuture {
@@ -263,12 +283,7 @@ where T: Future<
263283
}
264284

265285
impl<Data: QueueData, T> Future for AcquireQueueFuture<Data, T>
266-
where T: Future<
267-
Output = std::result::Result<
268-
std::result::Result<OwnedSemaphorePermit, AcquireError>,
269-
Elapsed,
270-
>,
271-
>
286+
where T: Future<Output = std::result::Result<std::result::Result<Permit, AcquireError>, Elapsed>>
272287
{
273288
type Output = Result<AcquireQueueGuard>;
274289

@@ -443,6 +458,22 @@ impl QueueData for QueryEntry {
443458
self.query_id.clone()
444459
}
445460

461+
fn get_lock_key(&self) -> String {
462+
let cluster = self.ctx.get_cluster();
463+
let local_id = escape_for_key(&cluster.local_id).unwrap();
464+
let mut lock_key = format!("__fd_queries_queue/lost_contact/{}", local_id);
465+
466+
for node in &cluster.nodes {
467+
if node.id == cluster.local_id {
468+
let cluster_id = escape_for_key(&node.cluster_id).unwrap();
469+
let warehouse_id = escape_for_key(&node.warehouse_id).unwrap();
470+
lock_key = format!("__fd_queries_queue/queue/{}/{}", warehouse_id, cluster_id);
471+
}
472+
}
473+
474+
lock_key
475+
}
476+
446477
fn remove_error_message(key: Option<Self::Key>) -> ErrorCode {
447478
match key {
448479
None => ErrorCode::AbortedQuery("The query has be kill while in queries queue"),

0 commit comments

Comments
 (0)