File tree Expand file tree Collapse file tree 4 files changed +17
-5
lines changed
pipelines/processors/transforms/window/partition Expand file tree Collapse file tree 4 files changed +17
-5
lines changed Original file line number Diff line number Diff line change @@ -161,6 +161,7 @@ impl IPhysicalPlan for WindowPartition {
161
161
_ => unimplemented ! ( ) ,
162
162
} ;
163
163
let window_spill_settings = MemorySettings :: from_window_settings ( & builder. ctx ) ?;
164
+ let enable_backpressure_spiller = settings. get_enable_backpressure_spiller ( ) ?;
164
165
165
166
let processor_id = AtomicUsize :: new ( 0 ) ;
166
167
builder. main_pipeline . add_transform ( |input, output| {
@@ -181,7 +182,7 @@ impl IPhysicalPlan for WindowPartition {
181
182
num_partitions,
182
183
window_spill_settings. clone ( ) ,
183
184
disk_spill. clone ( ) ,
184
- true ,
185
+ enable_backpressure_spiller ,
185
186
strategy,
186
187
) ?,
187
188
) ) )
Original file line number Diff line number Diff line change @@ -107,10 +107,10 @@ where
107
107
{
108
108
fn default ( ) -> Self {
109
109
Self {
110
- state : PartitionSpillState :: Empty ,
111
- spilled_ordinals : Vec :: new ( ) ,
112
- buffered_blocks : Vec :: new ( ) ,
113
- buffered_size : 0 ,
110
+ state : Default :: default ( ) ,
111
+ spilled_ordinals : Default :: default ( ) ,
112
+ buffered_blocks : Default :: default ( ) ,
113
+ buffered_size : Default :: default ( ) ,
114
114
}
115
115
}
116
116
}
Original file line number Diff line number Diff line change @@ -228,6 +228,13 @@ impl DefaultSettings {
228
228
scope : SettingScope :: Both ,
229
229
range : Some ( SettingRange :: Numeric ( 0 ..=500 ) ) ,
230
230
} ) ,
231
+ ( "enable_backpressure_spiller" , DefaultSettingValue {
232
+ value : UserSettingValue :: UInt64 ( 1 ) ,
233
+ desc : "Use new backpressure spiller." ,
234
+ mode : SettingMode :: Both ,
235
+ scope : SettingScope :: Both ,
236
+ range : Some ( SettingRange :: Numeric ( 0 ..=1 ) ) ,
237
+ } ) ,
231
238
( "max_spill_io_requests" , DefaultSettingValue {
232
239
value : UserSettingValue :: UInt64 ( default_max_spill_io_requests) ,
233
240
desc : "Sets the maximum number of concurrent spill I/O requests." ,
Original file line number Diff line number Diff line change @@ -897,6 +897,10 @@ impl Settings {
897
897
self . try_get_u64 ( "dynamic_sample_time_budget_ms" )
898
898
}
899
899
900
+ pub fn get_enable_backpressure_spiller ( & self ) -> Result < bool > {
901
+ Ok ( self . try_get_u64 ( "enable_backpressure_spiller" ) ? != 0 )
902
+ }
903
+
900
904
pub fn get_max_spill_io_requests ( & self ) -> Result < u64 > {
901
905
self . try_get_u64 ( "max_spill_io_requests" )
902
906
}
You can’t perform that action at this time.
0 commit comments