Skip to content

Commit cf6ddba

Browse files
authored
fix(query): the workload_group sort spill needs to be able to be forced to be turned off (#18561)
fix
1 parent c98b2af commit cf6ddba

File tree

4 files changed

+152
-118
lines changed

4 files changed

+152
-118
lines changed

src/query/pipeline/transforms/src/processors/memory_settings.rs

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,19 @@ use databend_common_base::runtime::ThreadTracker;
2222
use databend_common_base::runtime::GLOBAL_MEM_STAT;
2323

2424
#[derive(Clone)]
25+
#[non_exhaustive]
2526
pub struct MemorySettings {
26-
pub max_memory_usage: usize,
27+
pub spill_unit_size: usize,
28+
2729
pub enable_global_level_spill: bool,
30+
pub max_memory_usage: usize,
2831
pub global_memory_tracking: &'static MemStat,
2932

33+
pub enable_group_spill: bool,
34+
35+
pub enable_query_level_spill: bool,
3036
pub max_query_memory_usage: usize,
3137
pub query_memory_tracking: Option<Arc<MemStat>>,
32-
pub enable_query_level_spill: bool,
33-
34-
pub spill_unit_size: usize,
3538
}
3639

3740
impl Debug for MemorySettings {
@@ -71,28 +74,74 @@ impl Debug for MemorySettings {
7174
}
7275
}
7376

74-
impl MemorySettings {
75-
pub fn disable_spill() -> MemorySettings {
77+
pub struct MemorySettingsBuilder {
78+
enable_global_level_spill: bool,
79+
max_memory_usage: Option<usize>,
80+
81+
enable_group_spill: bool,
82+
83+
enable_query_level_spill: bool,
84+
max_query_memory_usage: Option<usize>,
85+
query_memory_tracking: Option<Arc<MemStat>>,
86+
87+
spill_unit_size: Option<usize>,
88+
}
89+
90+
impl MemorySettingsBuilder {
91+
pub fn with_max_memory_usage(mut self, max: usize) -> Self {
92+
self.enable_global_level_spill = true;
93+
self.max_memory_usage = Some(max);
94+
self
95+
}
96+
97+
pub fn with_max_query_memory_usage(
98+
mut self,
99+
max: usize,
100+
tracking: Option<Arc<MemStat>>,
101+
) -> Self {
102+
self.enable_query_level_spill = true;
103+
self.max_query_memory_usage = Some(max);
104+
self.query_memory_tracking = tracking;
105+
self
106+
}
107+
108+
pub fn with_workload_group(mut self, enable: bool) -> Self {
109+
self.enable_group_spill = enable;
110+
self
111+
}
112+
113+
pub fn with_spill_unit_size(mut self, spill_unit_size: usize) -> Self {
114+
self.spill_unit_size = Some(spill_unit_size);
115+
self
116+
}
117+
118+
pub fn build(self) -> MemorySettings {
76119
MemorySettings {
77-
spill_unit_size: 0,
78-
max_memory_usage: usize::MAX,
79-
enable_global_level_spill: false,
80-
max_query_memory_usage: usize::MAX,
81-
query_memory_tracking: None,
82-
enable_query_level_spill: false,
120+
enable_group_spill: self.enable_group_spill,
121+
max_memory_usage: self.max_memory_usage.unwrap_or(usize::MAX),
122+
enable_global_level_spill: self.enable_global_level_spill,
83123
global_memory_tracking: &GLOBAL_MEM_STAT,
124+
enable_query_level_spill: self.enable_query_level_spill,
125+
max_query_memory_usage: self.max_query_memory_usage.unwrap_or(usize::MAX),
126+
query_memory_tracking: self.query_memory_tracking,
127+
spill_unit_size: self.spill_unit_size.unwrap_or(0),
84128
}
85129
}
130+
}
86131

87-
pub fn always_spill(spill_unit_size: usize) -> MemorySettings {
88-
MemorySettings {
89-
spill_unit_size,
90-
max_memory_usage: 0,
91-
max_query_memory_usage: 0,
92-
enable_query_level_spill: true,
93-
enable_global_level_spill: true,
94-
global_memory_tracking: &GLOBAL_MEM_STAT,
132+
impl MemorySettings {
133+
pub fn builder() -> MemorySettingsBuilder {
134+
MemorySettingsBuilder {
135+
enable_global_level_spill: false,
136+
max_memory_usage: None,
137+
138+
enable_group_spill: true,
139+
140+
enable_query_level_spill: false,
141+
max_query_memory_usage: None,
95142
query_memory_tracking: None,
143+
144+
spill_unit_size: None,
96145
}
97146
}
98147

@@ -103,7 +152,9 @@ impl MemorySettings {
103152
return true;
104153
}
105154

106-
if let Some(workload_group) = ThreadTracker::workload_group() {
155+
if self.enable_group_spill
156+
&& let Some(workload_group) = ThreadTracker::workload_group()
157+
{
107158
let workload_group_memory_usage = workload_group.mem_stat.get_memory_usage();
108159
let max_memory_usage = workload_group.max_memory_usage.load(Ordering::Relaxed);
109160

@@ -132,6 +183,10 @@ impl MemorySettings {
132183
}
133184

134185
fn check_workload_group(&self) -> Option<isize> {
186+
if !self.enable_group_spill {
187+
return None;
188+
}
189+
135190
let workload_group = ThreadTracker::workload_group()?;
136191
let usage = workload_group.mem_stat.get_memory_usage();
137192
let max_memory_usage = workload_group.max_memory_usage.load(Ordering::Relaxed);
@@ -162,7 +217,7 @@ impl MemorySettings {
162217
})
163218
}
164219

165-
pub fn check_spill_remain(&self) -> isize {
220+
pub fn check_spill_remain(&self) -> Option<isize> {
166221
[
167222
self.check_global(),
168223
self.check_workload_group(),
@@ -171,7 +226,6 @@ impl MemorySettings {
171226
.into_iter()
172227
.flatten()
173228
.reduce(|a, b| a.min(b))
174-
.unwrap_or(1)
175229
}
176230
}
177231

@@ -186,6 +240,7 @@ mod tests {
186240
impl Default for MemorySettings {
187241
fn default() -> Self {
188242
Self {
243+
enable_group_spill: true,
189244
max_memory_usage: usize::MAX,
190245
enable_global_level_spill: false,
191246
global_memory_tracking: create_static_mem_stat(0),

src/query/service/src/pipelines/memory_settings.rs

Lines changed: 72 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use databend_common_base::runtime::GLOBAL_MEM_STAT;
1615
use databend_common_catalog::table_context::TableContext;
1716
use databend_common_exception::Result;
1817
use databend_common_pipeline_transforms::MemorySettings;
@@ -33,148 +32,128 @@ pub trait MemorySettingsExt: Sized {
3332
impl MemorySettingsExt for MemorySettings {
3433
fn from_join_settings(ctx: &QueryContext) -> Result<Self> {
3534
let settings = ctx.get_settings();
35+
let mut builder = MemorySettings::builder();
3636

3737
if settings.get_force_join_data_spill()? {
38-
return Ok(MemorySettings::always_spill(0));
38+
return Ok(builder.with_max_memory_usage(0).build());
3939
}
4040

41-
let mut enable_global_level_spill = false;
42-
let mut max_memory_usage = settings.get_max_memory_usage()? as usize;
43-
41+
let max_memory_usage = settings.get_max_memory_usage()? as usize;
4442
let max_memory_ratio = settings.get_join_spilling_memory_ratio()?;
4543
if max_memory_usage != 0 && max_memory_ratio != 0 {
46-
enable_global_level_spill = true;
47-
let max_memory_ratio = (max_memory_ratio as f64 / 100_f64).min(1_f64);
48-
max_memory_usage = (max_memory_usage as f64 * max_memory_ratio) as usize;
44+
let ratio = (max_memory_ratio as f64 / 100.0).min(1.0);
45+
let global_limit = (max_memory_usage as f64 * ratio) as usize;
46+
builder = builder.with_max_memory_usage(global_limit);
4947
}
5048

5149
let max_query_memory_usage = settings.get_max_query_memory_usage()? as usize;
52-
let enable_query_level_spill = match settings.get_query_out_of_memory_behavior()? {
53-
OutofMemoryBehavior::Throw => false,
54-
OutofMemoryBehavior::Spilling => max_query_memory_usage != 0,
55-
};
56-
57-
Ok(MemorySettings {
58-
max_memory_usage,
59-
max_query_memory_usage,
60-
enable_query_level_spill,
61-
enable_global_level_spill,
62-
spill_unit_size: 0,
63-
query_memory_tracking: ctx.get_query_memory_tracking(),
64-
global_memory_tracking: &GLOBAL_MEM_STAT,
65-
})
50+
let out_of_memory_behavior = settings.get_query_out_of_memory_behavior()?;
51+
if matches!(out_of_memory_behavior, OutofMemoryBehavior::Spilling)
52+
&& max_query_memory_usage != 0
53+
{
54+
builder = builder.with_max_query_memory_usage(
55+
max_query_memory_usage,
56+
ctx.get_query_memory_tracking(),
57+
);
58+
}
59+
60+
Ok(builder.build())
6661
}
6762

6863
fn from_sort_settings(ctx: &QueryContext) -> Result<Self> {
64+
let settings = ctx.get_settings();
65+
let mut builder = MemorySettings::builder()
66+
.with_spill_unit_size(settings.get_sort_spilling_batch_bytes()?);
67+
6968
if !ctx.get_enable_sort_spill() {
70-
return Ok(MemorySettings::disable_spill());
69+
return Ok(builder.with_workload_group(false).build());
7170
}
7271

73-
let settings = ctx.get_settings();
74-
7572
if settings.get_force_sort_data_spill()? {
76-
let spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?;
77-
return Ok(MemorySettings::always_spill(spilling_batch_bytes));
73+
return Ok(builder.with_max_memory_usage(0).build());
7874
}
7975

80-
let mut enable_global_level_spill = false;
81-
let mut max_memory_usage = settings.get_max_memory_usage()? as usize;
82-
76+
let max_memory_usage = settings.get_max_memory_usage()? as usize;
8377
let max_memory_ratio = settings.get_sort_spilling_memory_ratio()?;
8478
if max_memory_usage != 0 && max_memory_ratio != 0 {
85-
enable_global_level_spill = true;
86-
let max_memory_ratio = (max_memory_ratio as f64 / 100_f64).min(1_f64);
87-
max_memory_usage = (max_memory_usage as f64 * max_memory_ratio) as usize;
79+
let ratio = (max_memory_ratio as f64 / 100.0).min(1.0);
80+
let global_limit = (max_memory_usage as f64 * ratio) as usize;
81+
builder = builder.with_max_memory_usage(global_limit);
8882
}
8983

9084
let max_query_memory_usage = settings.get_max_query_memory_usage()? as usize;
91-
let enable_query_level_spill = match settings.get_query_out_of_memory_behavior()? {
92-
OutofMemoryBehavior::Throw => false,
93-
OutofMemoryBehavior::Spilling => max_query_memory_usage != 0,
94-
};
95-
96-
let spilling_batch_bytes = settings.get_sort_spilling_batch_bytes()?;
97-
98-
Ok(MemorySettings {
99-
max_memory_usage,
100-
max_query_memory_usage,
101-
enable_query_level_spill,
102-
enable_global_level_spill,
103-
query_memory_tracking: ctx.get_query_memory_tracking(),
104-
spill_unit_size: spilling_batch_bytes,
105-
global_memory_tracking: &GLOBAL_MEM_STAT,
106-
})
85+
let out_of_memory_behavior = settings.get_query_out_of_memory_behavior()?;
86+
if matches!(out_of_memory_behavior, OutofMemoryBehavior::Spilling)
87+
&& max_query_memory_usage != 0
88+
{
89+
builder = builder.with_max_query_memory_usage(
90+
max_query_memory_usage,
91+
ctx.get_query_memory_tracking(),
92+
);
93+
}
94+
95+
Ok(builder.build())
10796
}
10897

10998
fn from_window_settings(ctx: &QueryContext) -> Result<Self> {
11099
let settings = ctx.get_settings();
100+
let spill_unit_size = settings.get_window_spill_unit_size_mb()? * 1024 * 1024;
101+
let mut builder = MemorySettings::builder().with_spill_unit_size(spill_unit_size);
111102

112103
if settings.get_force_window_data_spill()? {
113-
let spill_unit_size = settings.get_window_spill_unit_size_mb()? * 1024 * 1024;
114-
return Ok(MemorySettings::always_spill(spill_unit_size));
104+
return Ok(builder.with_max_memory_usage(0).build());
115105
}
116106

117-
let mut enable_global_level_spill = false;
118-
let mut max_memory_usage = settings.get_max_memory_usage()? as usize;
119-
107+
let max_memory_usage = settings.get_max_memory_usage()? as usize;
120108
let max_memory_ratio = settings.get_window_partition_spilling_memory_ratio()?;
121109
if max_memory_usage != 0 && max_memory_ratio != 0 {
122-
enable_global_level_spill = true;
123-
let max_memory_ratio = (max_memory_ratio as f64 / 100_f64).min(1_f64);
124-
max_memory_usage = (max_memory_usage as f64 * max_memory_ratio) as usize;
110+
let ratio = (max_memory_ratio as f64 / 100.0).min(1.0);
111+
let global_limit = (max_memory_usage as f64 * ratio) as usize;
112+
builder = builder.with_max_memory_usage(global_limit);
125113
}
126114

127115
let max_query_memory_usage = settings.get_max_query_memory_usage()? as usize;
128-
let enable_query_level_spill = match settings.get_query_out_of_memory_behavior()? {
129-
OutofMemoryBehavior::Throw => false,
130-
OutofMemoryBehavior::Spilling => max_query_memory_usage != 0,
131-
};
132-
133-
let spill_unit_size = settings.get_window_spill_unit_size_mb()? * 1024 * 1024;
116+
let out_of_memory_behavior = settings.get_query_out_of_memory_behavior()?;
117+
if matches!(out_of_memory_behavior, OutofMemoryBehavior::Spilling)
118+
&& max_query_memory_usage != 0
119+
{
120+
builder = builder.with_max_query_memory_usage(
121+
max_query_memory_usage,
122+
ctx.get_query_memory_tracking(),
123+
);
124+
}
134125

135-
Ok(MemorySettings {
136-
spill_unit_size,
137-
max_memory_usage,
138-
max_query_memory_usage,
139-
enable_query_level_spill,
140-
enable_global_level_spill,
141-
query_memory_tracking: ctx.get_query_memory_tracking(),
142-
global_memory_tracking: &GLOBAL_MEM_STAT,
143-
})
126+
Ok(builder.build())
144127
}
145128

146129
fn from_aggregate_settings(ctx: &QueryContext) -> Result<Self> {
147130
let settings = ctx.get_settings();
131+
let mut builder = MemorySettings::builder();
148132

149133
if settings.get_force_aggregate_data_spill()? {
150-
return Ok(MemorySettings::always_spill(0));
134+
return Ok(builder.with_max_memory_usage(0).build());
151135
}
152136

153-
let mut enable_global_level_spill = false;
154-
let mut max_memory_usage = settings.get_max_memory_usage()? as usize;
155-
137+
let max_memory_usage = settings.get_max_memory_usage()? as usize;
156138
let max_memory_ratio = settings.get_aggregate_spilling_memory_ratio()?;
157139
if max_memory_usage != 0 && max_memory_ratio != 0 {
158-
enable_global_level_spill = true;
159-
let max_memory_ratio = (max_memory_ratio as f64 / 100_f64).min(1_f64);
160-
max_memory_usage = (max_memory_usage as f64 * max_memory_ratio) as usize;
140+
let ratio = (max_memory_ratio as f64 / 100.0).min(1.0);
141+
let global_limit = (max_memory_usage as f64 * ratio) as usize;
142+
builder = builder.with_max_memory_usage(global_limit);
161143
}
162144

163145
let max_query_memory_usage = settings.get_max_query_memory_usage()? as usize;
164-
let enable_query_level_spill = match settings.get_query_out_of_memory_behavior()? {
165-
OutofMemoryBehavior::Throw => false,
166-
OutofMemoryBehavior::Spilling => max_query_memory_usage != 0,
167-
};
168-
169-
Ok(MemorySettings {
170-
max_memory_usage,
171-
max_query_memory_usage,
172-
enable_query_level_spill,
173-
enable_global_level_spill,
174-
spill_unit_size: 0,
175-
query_memory_tracking: ctx.get_query_memory_tracking(),
176-
global_memory_tracking: &GLOBAL_MEM_STAT,
177-
})
146+
let out_of_memory_behavior = settings.get_query_out_of_memory_behavior()?;
147+
if matches!(out_of_memory_behavior, OutofMemoryBehavior::Spilling)
148+
&& max_query_memory_usage != 0
149+
{
150+
builder = builder.with_max_query_memory_usage(
151+
max_query_memory_usage,
152+
ctx.get_query_memory_tracking(),
153+
);
154+
}
155+
156+
Ok(builder.build())
178157
}
179158
}
180159

@@ -201,7 +180,6 @@ mod tests {
201180
let memory_settings = MemorySettings::from_join_settings(&ctx)?;
202181

203182
assert!(!memory_settings.enable_global_level_spill);
204-
assert_eq!(memory_settings.max_memory_usage, 0);
205183
assert!(memory_settings.enable_query_level_spill);
206184
Ok(())
207185
}

0 commit comments

Comments
 (0)