Skip to content

Commit ce88012

Browse files
authored
chore(query): add max memory ratio quota for workload group (#18360)
* chore(query): add max memory ratio quota for workload group * chore(query): add max memory ratio quota for workload group * chore(query): add max memory ratio quota for workload group * chore(query): add max memory ratio quota for workload group
1 parent 7e232e1 commit ce88012

File tree

4 files changed

+54
-21
lines changed

4 files changed

+54
-21
lines changed

src/common/base/src/runtime/workload_group.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ pub const MEMORY_QUOTA_KEY: &str = "memory_quota";
2626
pub const QUERY_TIMEOUT_QUOTA_KEY: &str = "query_timeout";
2727
pub const MAX_CONCURRENCY_QUOTA_KEY: &str = "max_concurrency";
2828
pub const QUERY_QUEUED_TIMEOUT_QUOTA_KEY: &str = "query_queued_timeout";
29+
pub const MAX_MEMORY_USAGE_RATIO: &str = "max_memory_usage_ratio";
30+
31+
pub const DEFAULT_MAX_MEMORY_USAGE_RATIO: usize = 25;
2932

3033
#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq, Debug)]
3134
pub enum QuotaValue {
@@ -57,6 +60,18 @@ impl WorkloadGroup {
5760
pub fn get_quota(&self, key: &'static str) -> Option<QuotaValue> {
5861
self.quotas.get(key).cloned()
5962
}
63+
64+
pub fn get_max_memory_usage_ratio(&self) -> usize {
65+
let Some(QuotaValue::Percentage(v)) = self.quotas.get(MAX_MEMORY_USAGE_RATIO) else {
66+
return DEFAULT_MAX_MEMORY_USAGE_RATIO;
67+
};
68+
69+
if *v == 0 {
70+
return DEFAULT_MAX_MEMORY_USAGE_RATIO;
71+
}
72+
73+
std::cmp::min(*v, 100)
74+
}
6075
}
6176

6277
pub struct WorkloadGroupResource {

src/query/ast/src/ast/statements/workload.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ impl QuotaValueStmt {
133133
"query_timeout" => Self::parse_human_timeout(&v).ok_or("Invalid query timeout value, expected duration (e.g. '30s', '5min', '1h')"),
134134
"max_concurrency" => Self::parse_number(&v).filter(|x| !matches!(x, QuotaValueStmt::Number(0))).ok_or("Invalid max concurrency value, expected positive integer"),
135135
"query_queued_timeout" => Self::parse_human_timeout(&v).ok_or("Invalid queued query timeout value, expected duration (e.g. '30s', '5min', '1h')"),
136+
"max_memory_usage_ratio" => Self::parse_percentage(&v).ok_or("Invalid max_memory_usage_ratio value, expected percentage (e.g. '50%') between 0-100"),
136137
_ => Err("Unknown quota key"),
137138
}
138139
}

src/query/management/src/workload/workload_resource.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,8 @@ impl WorkloadGroupResourceManagerInner {
185185
(None, None) => {
186186
return Ok(workload_resource);
187187
}
188-
(None, Some(QuotaValue::Bytes(v))) => {
189-
workload_resource
190-
.max_memory_usage
191-
.store(*v, Ordering::Relaxed);
188+
(None, Some(QuotaValue::Bytes(_v))) => {
189+
self.update_mem_usage(&online_workload_group);
192190
return Ok(workload_resource);
193191
}
194192
(None, Some(QuotaValue::Percentage(v))) => {
@@ -218,10 +216,8 @@ impl WorkloadGroupResourceManagerInner {
218216
self.percent_normalizer.update(*v);
219217
self.update_mem_usage(&online_workload_group);
220218
}
221-
(Some(QuotaValue::Bytes(_old)), Some(QuotaValue::Bytes(new))) => {
222-
workload_resource
223-
.max_memory_usage
224-
.store(*new, Ordering::Relaxed);
219+
(Some(QuotaValue::Bytes(_old)), Some(QuotaValue::Bytes(_new))) => {
220+
self.update_mem_usage(&online_workload_group);
225221
return Ok(workload_resource);
226222
}
227223
_ => {}
@@ -239,16 +235,29 @@ impl WorkloadGroupResourceManagerInner {
239235
{
240236
if let Some(v) = self.percent_normalizer.get_normalized(*v) {
241237
let limit = self.global_mem_stat.get_limit();
238+
let usage_ratio = workload_group.meta.get_max_memory_usage_ratio();
242239
if limit > 0 {
243-
workload_group
244-
.max_memory_usage
245-
.store(limit as usize / 100 * v, Ordering::Relaxed);
240+
workload_group.max_memory_usage.store(
241+
limit as usize / 100 * usage_ratio / 100 * v,
242+
Ordering::Relaxed,
243+
);
246244
}
247245
}
248246
} else if let Some(QuotaValue::Bytes(v)) =
249247
workload_group.meta.quotas.get(MEMORY_QUOTA_KEY)
250248
{
251-
workload_group.max_memory_usage.store(*v, Ordering::Relaxed);
249+
let limit = self.global_mem_stat.get_limit();
250+
let usage_ratio = workload_group.meta.get_max_memory_usage_ratio();
251+
252+
let mut memory_usage = *v;
253+
if limit > 0 {
254+
let max_memory_usage = limit as usize / 100 * usage_ratio;
255+
memory_usage = std::cmp::min(max_memory_usage, memory_usage);
256+
}
257+
258+
workload_group
259+
.max_memory_usage
260+
.store(memory_usage, Ordering::Relaxed);
252261
} else {
253262
workload_group.max_memory_usage.store(0, Ordering::Relaxed)
254263
}
@@ -386,7 +395,7 @@ mod tests {
386395
// Check memory usage was calculated (100% since it's the only workload)
387396
assert_eq!(
388397
workload1.max_memory_usage.load(Ordering::Relaxed),
389-
(LIMIT / 100 * 100) as usize
398+
(LIMIT / 100 * 25 / 100 * 100) as usize
390399
);
391400
}
392401

@@ -423,11 +432,11 @@ mod tests {
423432
// Check memory allocations are calculated correctly
424433
assert_eq!(
425434
resource1.max_memory_usage.load(Ordering::Relaxed),
426-
(LIMIT / 100 * 30) as usize
435+
(LIMIT / 100 * 25 / 100 * 30) as usize
427436
); // 30% of total 100
428437
assert_eq!(
429438
resource2.max_memory_usage.load(Ordering::Relaxed),
430-
(LIMIT / 100 * 70) as usize
439+
(LIMIT / 100 * 25 / 100 * 70) as usize
431440
); // 70% of total 100
432441

433442
// Drop first workload
@@ -438,7 +447,7 @@ mod tests {
438447
assert_eq!(inner.percent_normalizer.sum.load(Ordering::Relaxed), 70);
439448
assert_eq!(
440449
resource2.max_memory_usage.load(Ordering::Relaxed),
441-
(LIMIT / 100 * 100) as usize
450+
(LIMIT / 100 * 25 / 100 * 100) as usize
442451
); // Now 100% of remaining 70
443452

444453
Ok(())
@@ -464,11 +473,11 @@ mod tests {
464473

465474
assert_eq!(
466475
resource1.max_memory_usage.load(Ordering::Relaxed),
467-
(LIMIT / 100 * 50) as usize
476+
(LIMIT / 100 * 25 / 100 * 50) as usize
468477
);
469478
assert_eq!(
470479
resource2.max_memory_usage.load(Ordering::Relaxed),
471-
(LIMIT / 100 * 50) as usize
480+
(LIMIT / 100 * 25 / 100 * 50) as usize
472481
);
473482

474483
workload_mgr
@@ -493,15 +502,15 @@ mod tests {
493502
// Memory usage should be recalculated
494503
assert_eq!(
495504
resource1.max_memory_usage.load(Ordering::Relaxed),
496-
(LIMIT / 100 * (70 * 100 / (70 + 50))) as usize
505+
(LIMIT / 100 * 25 / 100 * (70 * 100 / (70 + 50))) as usize
497506
);
498507
assert_eq!(
499508
resource2.max_memory_usage.load(Ordering::Relaxed),
500-
(LIMIT / 100 * (50 * 100 / (70 + 50))) as usize
509+
(LIMIT / 100 * 25 / 100 * (50 * 100 / (70 + 50))) as usize
501510
);
502511
assert_eq!(
503512
updated_resource.max_memory_usage.load(Ordering::Relaxed),
504-
(LIMIT / 100 * (70 * 100 / (70 + 50))) as usize
513+
(LIMIT / 100 * 25 / 100 * (70 * 100 / (70 + 50))) as usize
505514
);
506515

507516
Ok(())

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,14 @@ pub(crate) async fn query_handler(
531531
}
532532
};
533533

534+
log::info!(
535+
"[Workload-Group] attach workload group {}({}) for query {}, quotas: {:?}",
536+
workload_group.meta.name,
537+
workload_group.meta.id,
538+
ctx.query_id,
539+
workload_group.meta.quotas
540+
);
541+
534542
parent_mem_stat = ParentMemStat::Normal(workload_group.mem_stat.clone());
535543
tracking_workload_group = Some(workload_group);
536544
}

0 commit comments

Comments
 (0)