Skip to content

Commit 73a0fca

Browse files
committed
x
1 parent 46fe327 commit 73a0fca

File tree

5 files changed

+110
-75
lines changed

5 files changed

+110
-75
lines changed

src/query/service/src/pipelines/processors/transforms/window/partition/transform_window_partition_collect.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use crate::spillers::Spiller;
4040
use crate::spillers::SpillerConfig;
4141
use crate::spillers::SpillerDiskConfig;
4242
use crate::spillers::SpillerType;
43+
use crate::spillers::WindowSpiller;
4344

4445
enum WindowBuffer {
4546
V1(WindowPartitionBuffer),
@@ -49,22 +50,23 @@ enum WindowBuffer {
4950
impl WindowBuffer {
5051
fn new(
5152
is_v2: bool,
52-
spiller: Spiller,
53+
partition_spiller: Spiller,
54+
writer_spiller: WindowSpiller,
5355
num_partitions: usize,
5456
sort_block_size: usize,
5557
memory_settings: MemorySettings,
5658
) -> Result<Self> {
5759
if is_v2 {
5860
let inner = WindowPartitionBufferV2::new(
59-
spiller,
61+
writer_spiller,
6062
num_partitions,
6163
sort_block_size,
6264
memory_settings,
6365
)?;
6466
Ok(Self::V2(inner))
6567
} else {
6668
let inner = WindowPartitionBuffer::new(
67-
spiller,
69+
partition_spiller,
6870
num_partitions,
6971
sort_block_size,
7072
memory_settings,
@@ -179,15 +181,18 @@ impl<S: DataProcessorStrategy> TransformWindowPartitionCollect<S> {
179181
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
180182
};
181183

182-
// Create an inner `Spiller` to spill data.
184+
// Create spillers for window operator.
183185
let operator = DataOperator::instance().spill_operator();
184-
let spiller = Spiller::create(ctx, operator, spill_config)?;
186+
let partition_spiller =
187+
Spiller::create(ctx.clone(), operator.clone(), spill_config.clone())?;
188+
let window_spiller = WindowSpiller::create(ctx, operator, spill_config)?;
185189

186190
// Create the window partition buffer.
187191
let sort_block_size = settings.get_window_partition_sort_block_size()? as usize;
188192
let buffer = WindowBuffer::new(
189193
true,
190-
spiller,
194+
partition_spiller,
195+
window_spiller,
191196
partitions.len(),
192197
sort_block_size,
193198
memory_settings,

src/query/service/src/pipelines/processors/transforms/window/partition/window_partition_buffer_v2.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use databend_common_pipeline_transforms::MemorySettings;
2222
use super::concat_data_blocks;
2323
use crate::spillers::SpillReader;
2424
use crate::spillers::SpillWriter;
25-
use crate::spillers::Spiller;
25+
use crate::spillers::WindowSpiller;
2626

2727
#[async_trait::async_trait]
2828
pub trait Reader: Send {
@@ -46,7 +46,7 @@ pub trait Builder: Send + Sync {
4646
}
4747

4848
#[async_trait::async_trait]
49-
impl Builder for Spiller {
49+
impl Builder for WindowSpiller {
5050
type Writer = SpillWriter;
5151

5252
async fn create(&self, schema: Arc<DataSchema>) -> Result<SpillWriter> {
@@ -181,7 +181,7 @@ where
181181
}
182182
}
183183

184-
pub(super) type WindowPartitionBufferV2 = PartitionBuffer<Spiller>;
184+
pub(super) type WindowPartitionBufferV2 = PartitionBuffer<WindowSpiller>;
185185

186186
pub(super) struct PartitionBuffer<B>
187187
where B: Builder
@@ -196,9 +196,11 @@ where B: Builder
196196
next_to_restore_partition_id: isize,
197197
}
198198

199-
impl PartitionBuffer<Spiller> {
199+
impl<B> PartitionBuffer<B>
200+
where B: Builder
201+
{
200202
pub fn new(
201-
spiller: Spiller,
203+
spiller: B,
202204
num_partitions: usize,
203205
sort_block_size: usize,
204206
memory_settings: MemorySettings,

src/query/service/src/spillers/adapter.rs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ impl Spiller {
277277
};
278278

279279
// Record statistics.
280-
record_read_profile(location, &instant, data.len());
280+
record_read_profile(location.is_local(), &instant, data.len());
281281

282282
// Deserialize partitioned data block.
283283
let mut partitioned_data = Vec::with_capacity(partitions.len());
@@ -311,7 +311,7 @@ impl Spiller {
311311
Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?,
312312
};
313313

314-
record_read_profile(location, &instant, data.len());
314+
record_read_profile(location.is_local(), &instant, data.len());
315315

316316
deserialize_block(layout, data)
317317
}
@@ -354,20 +354,55 @@ impl Spiller {
354354
.cloned()
355355
.collect()
356356
}
357+
}
357358

358-
pub fn new_spill_writer(&self, schema: Arc<DataSchema>) -> Result<SpillWriter> {
359+
#[derive(Clone)]
360+
pub struct WindowWriterAdapter {
361+
ctx: Arc<QueryContext>,
362+
buffer_pool: Arc<BufferPool>,
363+
chunk_size: usize,
364+
}
365+
366+
impl WindowWriterAdapter {
367+
fn add_spill_file(&self, location: Location, layout: Layout, size: usize) {
368+
if location.is_remote() {
369+
self.ctx.as_ref().incr_spill_progress(1, size);
370+
self.ctx
371+
.as_ref()
372+
.add_spill_file(location.clone(), layout.clone());
373+
}
374+
}
375+
}
376+
377+
pub type WindowSpiller = SpillerInner<WindowWriterAdapter>;
378+
379+
impl WindowSpiller {
380+
pub fn create(
381+
ctx: Arc<QueryContext>,
382+
operator: Operator,
383+
config: SpillerConfig,
384+
) -> Result<Self> {
359385
let runtime = GlobalIORuntime::instance();
360386
let buffer_pool = BufferPool::create(
361387
runtime,
362388
WINDOW_SPILL_BUFFER_MEMORY_BYTES,
363389
WINDOW_SPILL_BUFFER_WORKERS,
364390
);
391+
Self::new(
392+
WindowWriterAdapter {
393+
ctx,
394+
buffer_pool,
395+
chunk_size: WINDOW_SPILL_CHUNK_SIZE,
396+
},
397+
operator,
398+
config,
399+
)
400+
}
365401

402+
pub fn new_spill_writer(&self, schema: Arc<DataSchema>) -> Result<SpillWriter> {
366403
Ok(SpillWriter {
367404
spiller: self.clone(),
368-
buffer_pool,
369-
dio: self.temp_dir.is_some(),
370-
chunk_size: WINDOW_SPILL_CHUNK_SIZE,
405+
chunk_size: self.adapter.chunk_size,
371406
schema,
372407
file_writer: None,
373408
})
@@ -389,9 +424,7 @@ const WINDOW_SPILL_BUFFER_WORKERS: usize = 2;
389424
const WINDOW_SPILL_CHUNK_SIZE: usize = 8 * 1024 * 1024;
390425

391426
pub struct SpillWriter {
392-
spiller: Spiller,
393-
buffer_pool: Arc<BufferPool>,
394-
dio: bool,
427+
spiller: WindowSpiller,
395428
chunk_size: usize,
396429
schema: Arc<DataSchema>,
397430
file_writer: Option<FileWriter<UnionFileWriter>>,
@@ -405,7 +438,11 @@ impl SpillWriter {
405438

406439
let writer = self
407440
.spiller
408-
.new_file_writer(&self.schema, &self.buffer_pool, self.dio, self.chunk_size)
441+
.new_file_writer(
442+
&self.schema,
443+
&self.spiller.adapter.buffer_pool,
444+
self.chunk_size,
445+
)
409446
.await?;
410447
self.file_writer = Some(writer);
411448
Ok(())
@@ -475,17 +512,15 @@ impl SpillWriter {
475512
schema: self.schema,
476513
parquet_metadata: Arc::new(metadata),
477514
union_file,
478-
dio: self.dio,
479515
})
480516
}
481517
}
482518

483519
pub struct SpillReader {
484-
spiller: Spiller,
520+
spiller: WindowSpiller,
485521
schema: Arc<DataSchema>,
486522
parquet_metadata: Arc<ParquetMetaData>,
487523
union_file: UnionFile,
488-
dio: bool,
489524
}
490525

491526
impl SpillReader {
@@ -500,7 +535,6 @@ impl SpillReader {
500535
self.parquet_metadata.clone(),
501536
&self.schema,
502537
ordinals,
503-
self.dio,
504538
)
505539
.await
506540
}
@@ -613,9 +647,7 @@ impl LiteSpiller {
613647
Location::Local(_) => None,
614648
})
615649
.collect();
616-
let op = self.0.local_operator.as_ref().unwrap_or(&self.0.operator);
617-
618-
op.delete_iter(files).await?;
650+
self.0.operator.delete_iter(files).await?;
619651
Ok(())
620652
}
621653
}

src/query/service/src/spillers/inner.rs

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub trait SpillAdapter: Send + Sync + 'static {
100100
/// 3. Serialization and deserialization input data
101101
/// 4. Interact with the underlying storage engine to write and read spilled data
102102
#[derive(Clone)]
103-
pub struct SpillerInner<A: SpillAdapter> {
103+
pub struct SpillerInner<A> {
104104
pub(super) adapter: A,
105105
pub(super) operator: Operator,
106106
location_prefix: String,
@@ -111,7 +111,7 @@ pub struct SpillerInner<A: SpillAdapter> {
111111
_spiller_type: SpillerType,
112112
}
113113

114-
impl<A: SpillAdapter> SpillerInner<A> {
114+
impl<A> SpillerInner<A> {
115115
pub fn new(adapter: A, operator: Operator, config: SpillerConfig) -> Result<Self> {
116116
let SpillerConfig {
117117
location_prefix,
@@ -139,17 +139,6 @@ impl<A: SpillAdapter> SpillerInner<A> {
139139
})
140140
}
141141

142-
/// Spill some [`DataBlock`] to storage. These blocks will be concat into one.
143-
#[fastrace::trace(name = "Spiller::spill")]
144-
pub async fn spill(&self, data_block: Vec<DataBlock>) -> Result<Location> {
145-
let (location, layout, data_size) = self.spill_unmanage(data_block).await?;
146-
147-
// Record columns layout for spilled data.
148-
self.adapter
149-
.add_spill_file(location.clone(), layout, data_size);
150-
Ok(location)
151-
}
152-
153142
async fn spill_unmanage(
154143
&self,
155144
data_block: Vec<DataBlock>,
@@ -179,13 +168,6 @@ impl<A: SpillAdapter> SpillerInner<A> {
179168
format!("{}/{}", self.location_prefix, GlobalUniqName::unique())
180169
}
181170

182-
/// Read a certain file to a [`DataBlock`].
183-
#[fastrace::trace(name = "Spiller::read_spilled_file")]
184-
pub async fn read_spilled_file(&self, location: &Location) -> Result<DataBlock> {
185-
let layout = self.adapter.get_spill_layout(location).unwrap();
186-
self.read_unmanage_spilled_file(location, &layout).await
187-
}
188-
189171
async fn read_unmanage_spilled_file(
190172
&self,
191173
location: &Location,
@@ -219,7 +201,7 @@ impl<A: SpillAdapter> SpillerInner<A> {
219201
Location::Remote(loc) => self.operator.read(loc).await?,
220202
};
221203

222-
record_read_profile(location, &instant, data.len());
204+
record_read_profile(location.is_local(), &instant, data.len());
223205

224206
deserialize_block(columns_layout, data)
225207
}
@@ -273,6 +255,26 @@ impl<A: SpillAdapter> SpillerInner<A> {
273255
}
274256
}
275257

258+
impl<A: SpillAdapter> SpillerInner<A> {
259+
/// Spill some [`DataBlock`] to storage. These blocks will be concat into one.
260+
#[fastrace::trace(name = "Spiller::spill")]
261+
pub async fn spill(&self, data_block: Vec<DataBlock>) -> Result<Location> {
262+
let (location, layout, data_size) = self.spill_unmanage(data_block).await?;
263+
264+
// Record columns layout for spilled data.
265+
self.adapter
266+
.add_spill_file(location.clone(), layout, data_size);
267+
Ok(location)
268+
}
269+
270+
/// Read a certain file to a [`DataBlock`].
271+
#[fastrace::trace(name = "Spiller::read_spilled_file")]
272+
pub async fn read_spilled_file(&self, location: &Location) -> Result<DataBlock> {
273+
let layout = self.adapter.get_spill_layout(location).unwrap();
274+
self.read_unmanage_spilled_file(location, &layout).await
275+
}
276+
}
277+
276278
pub(super) fn record_write_profile(is_local: bool, start: &Instant, write_bytes: usize) {
277279
if !is_local {
278280
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillWriteCount, 1);
@@ -291,23 +293,20 @@ pub(super) fn record_write_profile(is_local: bool, start: &Instant, write_bytes:
291293
}
292294
}
293295

294-
pub(super) fn record_read_profile(location: &Location, start: &Instant, read_bytes: usize) {
295-
match location {
296-
Location::Remote(_) => {
297-
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1);
298-
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes);
299-
Profile::record_usize_profile(
300-
ProfileStatisticsName::RemoteSpillReadTime,
301-
start.elapsed().as_millis() as usize,
302-
);
303-
}
304-
Location::Local(_) => {
305-
Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1);
306-
Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes);
307-
Profile::record_usize_profile(
308-
ProfileStatisticsName::LocalSpillReadTime,
309-
start.elapsed().as_millis() as usize,
310-
);
311-
}
296+
pub(super) fn record_read_profile(is_local: bool, start: &Instant, read_bytes: usize) {
297+
if is_local {
298+
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadCount, 1);
299+
Profile::record_usize_profile(ProfileStatisticsName::RemoteSpillReadBytes, read_bytes);
300+
Profile::record_usize_profile(
301+
ProfileStatisticsName::RemoteSpillReadTime,
302+
start.elapsed().as_millis() as usize,
303+
);
304+
} else {
305+
Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadCount, 1);
306+
Profile::record_usize_profile(ProfileStatisticsName::LocalSpillReadBytes, read_bytes);
307+
Profile::record_usize_profile(
308+
ProfileStatisticsName::LocalSpillReadTime,
309+
start.elapsed().as_millis() as usize,
310+
);
312311
}
313312
}

0 commit comments

Comments
 (0)