Skip to content

Commit 946c718

Browse files
authored
feat(query): enable workload groups for resource control over the http handler (#17994)
* feat(query): enable workload groups for resource control over the http handler * feat(query): enable workload groups for resource control over the http handler * feat(query): enable workload groups for resource control over the http handler * feat(query): enable workload groups for resource control over the http handler * feat(query): enable workload groups for resource control over the http handler * feat(query): enable workload groups for resource control over the http handler
1 parent 2203b11 commit 946c718

File tree

24 files changed

+462
-86
lines changed

24 files changed

+462
-86
lines changed

src/common/base/src/mem_allocator/tracker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ mod tests {
553553
) -> R {
554554
{
555555
let mem_stat = MemStat::create_child(
556-
GlobalUniqName::unique(),
556+
Some(GlobalUniqName::unique()),
557557
0,
558558
ParentMemStat::Normal(global.clone()),
559559
);

src/common/base/src/runtime/memory/mem_stat.rs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,15 @@ impl MemStat {
9898
}
9999

100100
pub fn create(name: String) -> Arc<MemStat> {
101-
MemStat::create_child(name, 0, ParentMemStat::StaticRef(&GLOBAL_MEM_STAT))
101+
MemStat::create_child(Some(name), 0, ParentMemStat::StaticRef(&GLOBAL_MEM_STAT))
102+
}
103+
104+
pub fn create_workload_group() -> Arc<MemStat> {
105+
MemStat::create_child(None, 0, ParentMemStat::StaticRef(&GLOBAL_MEM_STAT))
102106
}
103107

104108
pub fn create_child(
105-
name: String,
109+
name: Option<String>,
106110
priority: usize,
107111
parent_memory_stat: ParentMemStat,
108112
) -> Arc<MemStat> {
@@ -120,7 +124,7 @@ impl MemStat {
120124
exceeded_mutex: Mutex::new(false),
121125
memory_limit: MemoryLimit::default(),
122126
exceeded_memory: AtomicBool::new(false),
123-
memory_requester: Some(name),
127+
memory_requester: name,
124128
queries_memory_manager: &GLOBAL_QUERIES_MANAGER,
125129
})
126130
}
@@ -138,8 +142,25 @@ impl MemStat {
138142
.store(water_height, Ordering::Relaxed);
139143
}
140144

145+
let used = self.used.load(Ordering::Relaxed);
141146
self.memory_limit.limit.store(size, Ordering::Relaxed);
142147
self.memory_limit.set_limit.store(size, Ordering::Relaxed);
148+
149+
#[allow(clippy::collapsible_if)]
150+
if used < size && self.memory_requester.is_some() {
151+
if self.exceeded_memory.fetch_and(false, Ordering::SeqCst) {
152+
let _guard = LimitMemGuard::enter_unlimited();
153+
154+
let mutex = self.exceeded_mutex.lock();
155+
let mut mutex = mutex.unwrap_or_else(PoisonError::into_inner);
156+
157+
if *mutex {
158+
*mutex = false;
159+
self.queries_memory_manager
160+
.release_memory(self.memory_requester.as_ref());
161+
}
162+
}
163+
}
143164
}
144165

145166
/// Feed memory usage stat to MemStat and return if it exceeds the limit.
@@ -471,7 +492,7 @@ mod tests {
471492
fn test_multiple_level_mem_stat() -> Result<()> {
472493
let mem_stat = MemStat::create("TEST".to_string());
473494
let child_mem_stat = MemStat::create_child(
474-
"TEST_CHILD".to_string(),
495+
Some("TEST_CHILD".to_string()),
475496
0,
476497
ParentMemStat::Normal(mem_stat.clone()),
477498
);
@@ -498,7 +519,7 @@ mod tests {
498519
let mem_stat = MemStat::create("TEST".to_string());
499520
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2, false);
500521
let child_mem_stat = MemStat::create_child(
501-
"TEST_CHILD".to_string(),
522+
Some("TEST_CHILD".to_string()),
502523
0,
503524
ParentMemStat::Normal(mem_stat.clone()),
504525
);
@@ -531,7 +552,7 @@ mod tests {
531552
let mem_stat = MemStat::create("TEST".to_string());
532553
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT, false);
533554
let child_mem_stat = MemStat::create_child(
534-
"TEST_CHILD".to_string(),
555+
Some("TEST_CHILD".to_string()),
535556
0,
536557
ParentMemStat::Normal(mem_stat.clone()),
537558
);
@@ -547,7 +568,7 @@ mod tests {
547568
let mem_stat = MemStat::create("TEST".to_string());
548569
mem_stat.set_limit(MINIMUM_MEMORY_LIMIT * 2, false);
549570
let child_mem_stat = MemStat::create_child(
550-
"TEST_CHILD".to_string(),
571+
Some("TEST_CHILD".to_string()),
551572
0,
552573
ParentMemStat::Normal(mem_stat.clone()),
553574
);

src/common/base/src/runtime/memory/memory_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ mod tests {
445445
parent_memory_stat: ParentMemStat,
446446
manager: &'static QueriesMemoryManager,
447447
) -> Arc<MemStat> {
448-
let mut mem_stat = MemStat::create_child(name, priority, parent_memory_stat);
448+
let mut mem_stat = MemStat::create_child(Some(name), priority, parent_memory_stat);
449449
let mut_mem_stat = Arc::get_mut(&mut mem_stat).unwrap();
450450
mut_mem_stat.queries_memory_manager = manager;
451451
mem_stat

src/common/base/src/runtime/memory/stat_buffer_mem_stat.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ mod tests {
246246
let mut buffer = MemStatBuffer::empty(&TEST_GLOBAL);
247247

248248
let mem_stat = MemStat::create_child(
249-
String::from("test"),
249+
Some(String::from("test")),
250250
0,
251251
ParentMemStat::StaticRef(&TEST_GLOBAL),
252252
);
@@ -275,12 +275,12 @@ mod tests {
275275
let mut buffer = MemStatBuffer::empty(&TEST_GLOBAL);
276276

277277
let mem_stat_1 = MemStat::create_child(
278-
String::from("test"),
278+
Some(String::from("test")),
279279
0,
280280
ParentMemStat::StaticRef(&TEST_GLOBAL),
281281
);
282282
let mem_stat_2 = MemStat::create_child(
283-
String::from("test"),
283+
Some(String::from("test")),
284284
0,
285285
ParentMemStat::StaticRef(&TEST_GLOBAL),
286286
);
@@ -308,7 +308,7 @@ mod tests {
308308
let mut buffer = MemStatBuffer::empty(&TEST_GLOBAL);
309309

310310
let mem_stat = MemStat::create_child(
311-
String::from("test"),
311+
Some(String::from("test")),
312312
0,
313313
ParentMemStat::StaticRef(&TEST_GLOBAL),
314314
);
@@ -339,12 +339,12 @@ mod tests {
339339
let mut buffer = MemStatBuffer::empty(&TEST_GLOBAL);
340340

341341
let mem_stat_1 = MemStat::create_child(
342-
String::from("test"),
342+
Some(String::from("test")),
343343
0,
344344
ParentMemStat::StaticRef(&TEST_GLOBAL),
345345
);
346346
let mem_stat_2 = MemStat::create_child(
347-
String::from("test"),
347+
Some(String::from("test")),
348348
0,
349349
ParentMemStat::StaticRef(&TEST_GLOBAL),
350350
);

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod runtime;
2525
mod runtime_tracker;
2626
mod thread;
2727
mod time_series;
28+
pub mod workload_group;
2829

2930
pub use backtrace::dump_backtrace;
3031
pub use backtrace::get_all_tasks;
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::collections::HashMap;
16+
use std::fmt::Display;
17+
use std::fmt::Formatter;
18+
use std::time::Duration;
19+
20+
pub const CPU_QUOTA_KEY: &str = "cpu_quota";
21+
pub const MEMORY_QUOTA_KEY: &str = "memory_quota";
22+
pub const QUERY_TIMEOUT_QUOTA_KEY: &str = "query_timeout";
23+
pub const MAX_CONCURRENCY_QUOTA_KEY: &str = "max_concurrency";
24+
pub const QUERY_QUEUED_TIMEOUT_QUOTA_KEY: &str = "query_queued_timeout";
25+
26+
#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)]
27+
pub enum QuotaValue {
28+
Duration(Duration),
29+
Percentage(usize),
30+
Bytes(usize),
31+
Number(usize),
32+
}
33+
34+
impl Display for QuotaValue {
35+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
36+
match self {
37+
QuotaValue::Percentage(v) => write!(f, "{}%", v),
38+
QuotaValue::Duration(v) => write!(f, "{:?}", v),
39+
QuotaValue::Bytes(v) => write!(f, "{}", v),
40+
QuotaValue::Number(v) => write!(f, "{}", v),
41+
}
42+
}
43+
}
44+
45+
#[derive(serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
46+
pub struct WorkloadGroup {
47+
pub id: String,
48+
pub name: String,
49+
pub quotas: HashMap<String, QuotaValue>,
50+
}
51+
52+
impl WorkloadGroup {
53+
pub fn get_quota(&self, key: &'static str) -> Option<QuotaValue> {
54+
self.quotas.get(key).cloned()
55+
}
56+
}

src/query/management/src/workload/mod.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,9 @@
1414

1515
mod workload_api;
1616
mod workload_mgr;
17+
mod workload_resource;
1718

18-
pub use workload_api::QuotaValue;
1919
pub use workload_api::WorkloadApi;
20-
pub use workload_api::WorkloadGroup;
21-
pub use workload_api::CPU_QUOTA_KEY;
22-
pub use workload_api::MAX_CONCURRENCY_QUOTA_KEY;
23-
pub use workload_api::MEMORY_QUOTA_KEY;
24-
pub use workload_api::QUERY_QUEUED_TIMEOUT_QUOTA_KEY;
25-
pub use workload_api::QUERY_TIMEOUT_QUOTA_KEY;
2620
pub use workload_mgr::WorkloadMgr;
21+
pub use workload_resource::WorkloadGroupResource;
22+
pub use workload_resource::WorkloadGroupResourceManager;

src/query/management/src/workload/workload_api.rs

Lines changed: 2 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,50 +13,11 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16-
use std::fmt::Display;
17-
use std::fmt::Formatter;
18-
use std::time::Duration;
1916

17+
use databend_common_base::runtime::workload_group::QuotaValue;
18+
use databend_common_base::runtime::workload_group::WorkloadGroup;
2019
use databend_common_exception::Result;
2120

22-
pub const CPU_QUOTA_KEY: &str = "cpu_quota";
23-
pub const MEMORY_QUOTA_KEY: &str = "memory_quota";
24-
pub const QUERY_TIMEOUT_QUOTA_KEY: &str = "query_timeout";
25-
pub const MAX_CONCURRENCY_QUOTA_KEY: &str = "max_concurrency";
26-
pub const QUERY_QUEUED_TIMEOUT_QUOTA_KEY: &str = "query_queued_timeout";
27-
28-
#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)]
29-
pub enum QuotaValue {
30-
Duration(Duration),
31-
Percentage(usize),
32-
Bytes(usize),
33-
Number(usize),
34-
}
35-
36-
impl Display for QuotaValue {
37-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
38-
match self {
39-
QuotaValue::Percentage(v) => write!(f, "{}%", v),
40-
QuotaValue::Duration(v) => write!(f, "{:?}", v),
41-
QuotaValue::Bytes(v) => write!(f, "{}", v),
42-
QuotaValue::Number(v) => write!(f, "{}", v),
43-
}
44-
}
45-
}
46-
47-
#[derive(serde::Serialize, serde::Deserialize, Eq, PartialEq, Debug)]
48-
pub struct WorkloadGroup {
49-
pub id: String,
50-
pub name: String,
51-
pub quotas: HashMap<String, QuotaValue>,
52-
}
53-
54-
impl WorkloadGroup {
55-
pub fn get_quota(&self, key: &'static str) -> Option<QuotaValue> {
56-
self.quotas.get(key).cloned()
57-
}
58-
}
59-
6021
#[async_trait::async_trait]
6122
pub trait WorkloadApi: Sync + Send {
6223
async fn create(&self, group: WorkloadGroup) -> Result<WorkloadGroup>;

src/query/management/src/workload/workload_mgr.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use std::collections::HashMap;
1616

1717
use databend_common_base::base::escape_for_key;
1818
use databend_common_base::base::GlobalUniqName;
19+
use databend_common_base::runtime::workload_group::QuotaValue;
20+
use databend_common_base::runtime::workload_group::WorkloadGroup;
1921
use databend_common_exception::ErrorCode;
2022
use databend_common_exception::Result;
2123
use databend_common_meta_kvapi::kvapi::KVApi;
@@ -26,9 +28,7 @@ use databend_common_meta_types::TxnCondition;
2628
use databend_common_meta_types::TxnOp;
2729
use databend_common_meta_types::TxnRequest;
2830

29-
use crate::workload::workload_api::QuotaValue;
3031
use crate::workload::workload_api::WorkloadApi;
31-
use crate::workload::workload_api::WorkloadGroup;
3232
pub static WORKLOAD_META_KEY_PREFIX: &str = "__fd_workloads";
3333

3434
pub struct WorkloadMgr {

0 commit comments

Comments
 (0)