Skip to content

Commit 6c6483d

Browse files
committed
x
1 parent 73a0fca commit 6c6483d

File tree

6 files changed

+57
-53
lines changed

6 files changed

+57
-53
lines changed

src/query/service/src/physical_plans/physical_recluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ impl IPhysicalPlan for HilbertPartition {
350350
self.num_partitions,
351351
window_spill_settings.clone(),
352352
disk_spill.clone(),
353+
false,
353354
CompactStrategy::new(self.rows_per_block, max_bytes_per_block),
354355
)?,
355356
)))

src/query/service/src/physical_plans/physical_window_partition.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ impl IPhysicalPlan for WindowPartition {
181181
num_partitions,
182182
window_spill_settings.clone(),
183183
disk_spill.clone(),
184+
true,
184185
strategy,
185186
)?,
186187
)))

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::any::Any;
2020
use std::collections::VecDeque;
2121
use std::sync::Arc;
2222

23+
use databend_common_base::runtime::GlobalIORuntime;
2324
use databend_common_exception::Result;
2425
use databend_common_expression::BlockMetaInfoDowncast;
2526
use databend_common_expression::DataBlock;
@@ -30,17 +31,19 @@ use databend_common_pipeline_core::processors::Processor;
3031
use databend_common_pipeline_transforms::MemorySettings;
3132
use databend_common_settings::Settings;
3233
use databend_common_storage::DataOperator;
34+
use either::Either;
3335

3436
use super::window_partition_buffer_v2::WindowPartitionBufferV2;
3537
use super::WindowPartitionBuffer;
3638
use super::WindowPartitionMeta;
3739
use crate::pipelines::processors::transforms::DataProcessorStrategy;
3840
use crate::sessions::QueryContext;
41+
use crate::spillers::BackpressureSpiller;
42+
use crate::spillers::BufferPool;
3943
use crate::spillers::Spiller;
4044
use crate::spillers::SpillerConfig;
4145
use crate::spillers::SpillerDiskConfig;
4246
use crate::spillers::SpillerType;
43-
use crate::spillers::WindowSpiller;
4447

4548
enum WindowBuffer {
4649
V1(WindowPartitionBuffer),
@@ -49,29 +52,30 @@ enum WindowBuffer {
4952

5053
impl WindowBuffer {
5154
fn new(
52-
is_v2: bool,
53-
partition_spiller: Spiller,
54-
writer_spiller: WindowSpiller,
55+
spiller: Either<Spiller, BackpressureSpiller>,
5556
num_partitions: usize,
5657
sort_block_size: usize,
5758
memory_settings: MemorySettings,
5859
) -> Result<Self> {
59-
if is_v2 {
60-
let inner = WindowPartitionBufferV2::new(
61-
writer_spiller,
62-
num_partitions,
63-
sort_block_size,
64-
memory_settings,
65-
)?;
66-
Ok(Self::V2(inner))
67-
} else {
68-
let inner = WindowPartitionBuffer::new(
69-
partition_spiller,
70-
num_partitions,
71-
sort_block_size,
72-
memory_settings,
73-
)?;
74-
Ok(Self::V1(inner))
60+
match spiller {
61+
Either::Left(spiller) => {
62+
let inner = WindowPartitionBuffer::new(
63+
spiller,
64+
num_partitions,
65+
sort_block_size,
66+
memory_settings,
67+
)?;
68+
Ok(Self::V1(inner))
69+
}
70+
Either::Right(spiller) => {
71+
let inner = WindowPartitionBufferV2::new(
72+
spiller,
73+
num_partitions,
74+
sort_block_size,
75+
memory_settings,
76+
)?;
77+
Ok(Self::V2(inner))
78+
}
7579
}
7680
}
7781

@@ -150,6 +154,7 @@ pub struct TransformWindowPartitionCollect<S: DataProcessorStrategy> {
150154
}
151155

152156
impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
157+
#[expect(clippy::too_many_arguments)]
153158
pub fn new(
154159
ctx: Arc<QueryContext>,
155160
input: Arc<InputPort>,
@@ -160,6 +165,7 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
160165
num_partitions: usize,
161166
memory_settings: MemorySettings,
162167
disk_spill: Option<SpillerDiskConfig>,
168+
enable_backpressure_spiller: bool,
163169
strategy: S,
164170
) -> Result<Self> {
165171
// Calculate the partition ids collected by the processor.
@@ -183,20 +189,24 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
183189

184190
// Create spillers for window operator.
185191
let operator = DataOperator::instance().spill_operator();
186-
let partition_spiller =
187-
Spiller::create(ctx.clone(), operator.clone(), spill_config.clone())?;
188-
let window_spiller = WindowSpiller::create(ctx, operator, spill_config)?;
192+
let spiller = if !enable_backpressure_spiller {
193+
Either::Left(Spiller::create(ctx, operator, spill_config)?)
194+
} else {
195+
let runtime = GlobalIORuntime::instance();
196+
let buffer_pool = BufferPool::create(runtime, 128 * 1024 * 1024, 3);
197+
Either::Right(BackpressureSpiller::create(
198+
ctx,
199+
operator,
200+
spill_config,
201+
buffer_pool,
202+
8 * 1024 * 1024,
203+
)?)
204+
};
189205

190206
// Create the window partition buffer.
191207
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
192-
let buffer = WindowBuffer::new(
193-
true,
194-
partition_spiller,
195-
window_spiller,
196-
partitions.len(),
197-
sort_block_size,
198-
memory_settings,
199-
)?;
208+
let buffer =
209+
WindowBuffer::new(spiller, partitions.len(), sort_block_size, memory_settings)?;
200210

201211
Ok(Self {
202212
input,

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ use databend_common_expression::DataSchema;
2020
use databend_common_pipeline_transforms::MemorySettings;
2121

2222
use super::concat_data_blocks;
23+
use crate::spillers::BackpressureSpiller;
2324
use crate::spillers::SpillReader;
2425
use crate::spillers::SpillWriter;
25-
use crate::spillers::WindowSpiller;
2626

2727
#[async_trait::async_trait]
2828
pub trait Reader: Send {
@@ -46,7 +46,7 @@ pub trait Builder: Send + Sync {
4646
}
4747

4848
#[async_trait::async_trait]
49-
impl Builder for WindowSpiller {
49+
impl Builder for BackpressureSpiller {
5050
type Writer = SpillWriter;
5151

5252
async fn create(&self, schema: Arc<DataSchema>) -> Result<SpillWriter> {
@@ -181,7 +181,7 @@ where
181181
}
182182
}
183183

184-
pub(super) type WindowPartitionBufferV2 = PartitionBuffer<WindowSpiller>;
184+
pub(super) type WindowPartitionBufferV2 = PartitionBuffer<BackpressureSpiller>;
185185

186186
pub(super) struct PartitionBuffer<B>
187187
where B: Builder

src/query/service/src/spillers/adapter.rs

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::time::Instant;
2424
use databend_common_base::base::dma_buffer_to_bytes;
2525
use databend_common_base::base::dma_read_file_range;
2626
use databend_common_base::base::ProgressValues;
27-
use databend_common_base::runtime::GlobalIORuntime;
2827
use databend_common_catalog::table_context::TableContext;
2928
use databend_common_exception::ErrorCode;
3029
use databend_common_exception::Result;
@@ -357,13 +356,13 @@ impl Spiller {
357356
}
358357

359358
#[derive(Clone)]
360-
pub struct WindowWriterAdapter {
359+
pub struct BackpressureAdapter {
361360
ctx: Arc<QueryContext>,
362361
buffer_pool: Arc<BufferPool>,
363362
chunk_size: usize,
364363
}
365364

366-
impl WindowWriterAdapter {
365+
impl BackpressureAdapter {
367366
fn add_spill_file(&self, location: Location, layout: Layout, size: usize) {
368367
if location.is_remote() {
369368
self.ctx.as_ref().incr_spill_progress(1, size);
@@ -374,25 +373,21 @@ impl WindowWriterAdapter {
374373
}
375374
}
376375

377-
pub type WindowSpiller = SpillerInner<WindowWriterAdapter>;
376+
pub type BackpressureSpiller = SpillerInner<BackpressureAdapter>;
378377

379-
impl WindowSpiller {
378+
impl BackpressureSpiller {
380379
pub fn create(
381380
ctx: Arc<QueryContext>,
382381
operator: Operator,
383382
config: SpillerConfig,
383+
buffer_pool: Arc<BufferPool>,
384+
chunk_size: usize,
384385
) -> Result<Self> {
385-
let runtime = GlobalIORuntime::instance();
386-
let buffer_pool = BufferPool::create(
387-
runtime,
388-
WINDOW_SPILL_BUFFER_MEMORY_BYTES,
389-
WINDOW_SPILL_BUFFER_WORKERS,
390-
);
391386
Self::new(
392-
WindowWriterAdapter {
387+
BackpressureAdapter {
393388
ctx,
394389
buffer_pool,
395-
chunk_size: WINDOW_SPILL_CHUNK_SIZE,
390+
chunk_size,
396391
},
397392
operator,
398393
config,
@@ -419,12 +414,8 @@ pub struct Chunk {
419414
pub layout: Layout,
420415
}
421416

422-
const WINDOW_SPILL_BUFFER_MEMORY_BYTES: usize = 64 * 1024 * 1024;
423-
const WINDOW_SPILL_BUFFER_WORKERS: usize = 2;
424-
const WINDOW_SPILL_CHUNK_SIZE: usize = 8 * 1024 * 1024;
425-
426417
pub struct SpillWriter {
427-
spiller: WindowSpiller,
418+
spiller: BackpressureSpiller,
428419
chunk_size: usize,
429420
schema: Arc<DataSchema>,
430421
file_writer: Option<FileWriter<UnionFileWriter>>,
@@ -517,7 +508,7 @@ impl SpillWriter {
517508
}
518509

519510
pub struct SpillReader {
520-
spiller: WindowSpiller,
511+
spiller: BackpressureSpiller,
521512
schema: Arc<DataSchema>,
522513
parquet_metadata: Arc<ParquetMetaData>,
523514
union_file: UnionFile,

src/query/service/src/spillers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod test_memory;
2424
mod union_file;
2525

2626
pub use adapter::*;
27+
pub use async_buffer::BufferPool;
2728
pub use block_writer::*;
2829
pub use databend_common_pipeline_transforms::traits::Location;
2930
pub use inner::*;

0 commit comments

Comments
 (0)