Skip to content

Commit 636d4bb

Browse files
feat(query): support result set spilling (#18679)
* refine * cleanup * x * x * SpillableBlock 1 * x * x * SpillerRef * fix * move * apply_settings * settings * fix * fix * fix * fix * PortableSpiller * x * x * x * x * fix * test * disable * fix * fix * test * test * update * format * doc * test * feat(query): add comprehensive logging for result set spilling operations --------- Co-authored-by: BohuTANG <[email protected]>
1 parent 602a040 commit 636d4bb

File tree

19 files changed

+1131
-439
lines changed

19 files changed

+1131
-439
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ databend-storages-common-stage = { workspace = true }
120120
databend-storages-common-table-meta = { workspace = true }
121121
derive-visitor = { workspace = true }
122122
dyn-clone = { workspace = true }
123+
either = { workspace = true }
123124
enum-as-inner = { workspace = true }
124125
ethnum = { workspace = true }
125126
fastrace = { workspace = true }
@@ -191,6 +192,7 @@ maplit = { workspace = true }
191192
mysql_async = { workspace = true }
192193
p256 = { workspace = true }
193194
pretty_assertions = { workspace = true }
195+
proptest = { workspace = true }
194196
reqwest = { workspace = true }
195197
serde_json.workspace = true
196198
serde_yaml = { workspace = true }

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ use databend_common_exception::ErrorCode;
3131
use databend_common_expression::DataSchemaRef;
3232
use databend_common_management::WorkloadGroupResourceManager;
3333
use databend_common_metrics::http::metrics_incr_http_response_errors_count;
34-
use fastrace::func_path;
3534
use fastrace::prelude::*;
3635
use http::HeaderMap;
3736
use http::HeaderValue;
@@ -296,7 +295,7 @@ async fn query_final_handler(
296295
Path(query_id): Path<String>,
297296
) -> PoemResult<impl IntoResponse> {
298297
ctx.check_node_id(&query_id)?;
299-
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
298+
let root = get_http_tracing_span("http::query_final_handler", ctx, &query_id);
300299
let _t = SlowRequestLogTracker::new(ctx);
301300
async {
302301
info!(
@@ -337,7 +336,7 @@ async fn query_cancel_handler(
337336
Path(query_id): Path<String>,
338337
) -> PoemResult<impl IntoResponse> {
339338
ctx.check_node_id(&query_id)?;
340-
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
339+
let root = get_http_tracing_span("http::query_cancel_handler", ctx, &query_id);
341340
let _t = SlowRequestLogTracker::new(ctx);
342341
async {
343342
info!(
@@ -369,7 +368,7 @@ async fn query_state_handler(
369368
Path(query_id): Path<String>,
370369
) -> PoemResult<Response> {
371370
ctx.check_node_id(&query_id)?;
372-
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
371+
let root = get_http_tracing_span("http::query_state_handler", ctx, &query_id);
373372

374373
async {
375374
let http_query_manager = HttpQueryManager::instance();
@@ -457,7 +456,7 @@ async fn query_page_handler(
457456
};
458457

459458
let query_page_handle = {
460-
let root = get_http_tracing_span(func_path!(), ctx, &query_id);
459+
let root = get_http_tracing_span("http::query_page_handler", ctx, &query_id);
461460
let _t = SlowRequestLogTracker::new(ctx);
462461
query_page_handle.in_span(root)
463462
};
@@ -556,7 +555,7 @@ pub(crate) async fn query_handler(
556555
};
557556

558557
let query_handle = {
559-
let root = get_http_tracing_span(func_path!(), ctx, &ctx.query_id);
558+
let root = get_http_tracing_span("http::query_handler", ctx, &ctx.query_id);
560559
let _t = SlowRequestLogTracker::new(ctx);
561560
query_handle.in_span(root)
562561
};

src/query/service/src/servers/http/v1/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub use http_query_handlers::query_route;
3131
pub use http_query_handlers::QueryResponse;
3232
pub use http_query_handlers::QueryResponseField;
3333
pub use http_query_handlers::QueryStats;
34+
pub use query::blocks_serializer::BlocksCollector;
3435
pub use query::blocks_serializer::BlocksSerializer;
3536
pub use query::ExecuteStateKind;
3637
pub use query::ExpiringMap;

src/query/service/src/servers/http/v1/query/blocks_serializer.rs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ use std::ops::DerefMut;
1818
use databend_common_expression::types::date::date_to_string;
1919
use databend_common_expression::types::interval::interval_to_string;
2020
use databend_common_expression::types::timestamp::timestamp_to_string;
21+
use databend_common_expression::BlockEntry;
2122
use databend_common_expression::Column;
23+
use databend_common_expression::DataBlock;
2224
use databend_common_formats::field_encoder::FieldEncoderValues;
2325
use databend_common_io::ewkb_to_geo;
2426
use databend_common_io::geo_to_ewkb;
@@ -40,38 +42,55 @@ fn data_is_null(column: &Column, row_index: usize) -> bool {
4042
}
4143
}
4244

43-
#[derive(Debug, Clone)]
44-
pub struct BlocksSerializer {
45+
#[derive(Debug, Clone, Default)]
46+
pub struct BlocksCollector {
4547
// Vec<Column> for a Block
4648
columns: Vec<(Vec<Column>, usize)>,
47-
pub(crate) format: Option<FormatSettings>,
4849
}
4950

50-
impl BlocksSerializer {
51-
pub fn empty() -> Self {
52-
Self {
53-
columns: vec![],
54-
format: None,
55-
}
51+
impl BlocksCollector {
52+
pub fn new() -> Self {
53+
Self { columns: vec![] }
5654
}
5755

58-
pub fn new(format: Option<FormatSettings>) -> Self {
59-
Self {
60-
columns: vec![],
61-
format,
56+
pub fn append_columns(&mut self, columns: Vec<Column>, num_rows: usize) {
57+
self.columns.push((columns, num_rows));
58+
}
59+
60+
pub fn append_block(&mut self, block: DataBlock) {
61+
if block.is_empty() {
62+
return;
6263
}
64+
let columns = block.columns().iter().map(BlockEntry::to_column).collect();
65+
let num_rows = block.num_rows();
66+
self.append_columns(columns, num_rows);
6367
}
6468

65-
pub fn has_format(&self) -> bool {
66-
self.format.is_some()
69+
pub fn num_rows(&self) -> usize {
70+
self.columns.iter().map(|(_, num_rows)| *num_rows).sum()
6771
}
6872

69-
pub fn set_format(&mut self, format: FormatSettings) {
70-
self.format = Some(format);
73+
pub fn into_serializer(self, format: FormatSettings) -> BlocksSerializer {
74+
BlocksSerializer {
75+
columns: self.columns,
76+
format: Some(format),
77+
}
7178
}
79+
}
7280

73-
pub fn append(&mut self, columns: Vec<Column>, num_rows: usize) {
74-
self.columns.push((columns, num_rows));
81+
#[derive(Debug, Clone)]
82+
pub struct BlocksSerializer {
83+
// Vec<Column> for a Block
84+
columns: Vec<(Vec<Column>, usize)>,
85+
format: Option<FormatSettings>,
86+
}
87+
88+
impl BlocksSerializer {
89+
pub fn empty() -> Self {
90+
Self {
91+
columns: vec![],
92+
format: None,
93+
}
7594
}
7695

7796
pub fn is_empty(&self) -> bool {

src/query/service/src/servers/http/v1/query/execute_state.rs

Lines changed: 106 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ use databend_common_exception::ResultExt;
2525
use databend_common_expression::DataBlock;
2626
use databend_common_expression::DataSchemaRef;
2727
use databend_common_expression::Scalar;
28-
use databend_common_io::prelude::FormatSettings;
2928
use databend_common_settings::Settings;
29+
use databend_common_storage::DataOperator;
30+
use databend_storages_common_cache::TempDirManager;
3031
use databend_storages_common_session::TxnManagerRef;
3132
use futures::StreamExt;
3233
use log::debug;
@@ -49,6 +50,12 @@ use crate::sessions::QueryAffect;
4950
use crate::sessions::QueryContext;
5051
use crate::sessions::Session;
5152
use crate::sessions::TableContext;
53+
use crate::spillers::LiteSpiller;
54+
use crate::spillers::SpillerConfig;
55+
use crate::spillers::SpillerDiskConfig;
56+
use crate::spillers::SpillerType;
57+
58+
type Sender = SizedChannelSender<LiteSpiller>;
5259

5360
pub struct ExecutionError;
5461

@@ -114,7 +121,7 @@ impl ExecuteState {
114121

115122
pub struct ExecuteStarting {
116123
pub(crate) ctx: Arc<QueryContext>,
117-
pub(crate) sender: SizedChannelSender<DataBlock>,
124+
pub(crate) sender: Option<Sender>,
118125
}
119126

120127
pub struct ExecuteRunning {
@@ -355,8 +362,7 @@ impl ExecuteState {
355362
sql: String,
356363
session: Arc<Session>,
357364
ctx: Arc<QueryContext>,
358-
block_sender: SizedChannelSender<DataBlock>,
359-
format_settings: Arc<parking_lot::RwLock<Option<FormatSettings>>>,
365+
mut block_sender: Sender,
360366
) -> Result<(), ExecutionError> {
361367
let make_error = || format!("failed to start query: {sql}");
362368

@@ -367,11 +373,8 @@ impl ExecuteState {
367373
.await
368374
.map_err(|err| err.display_with_sql(&sql))
369375
.with_context(make_error)?;
370-
{
371-
// set_var may change settings
372-
let mut guard = format_settings.write();
373-
*guard = Some(ctx.get_format_settings().with_context(make_error)?);
374-
}
376+
377+
Self::apply_settings(&ctx, &mut block_sender).with_context(make_error)?;
375378

376379
let interpreter = InterpreterFactory::get(ctx.clone(), &plan)
377380
.await
@@ -397,7 +400,7 @@ impl ExecuteState {
397400
let ctx_clone = ctx.clone();
398401
let block_sender_closer = block_sender.closer();
399402

400-
let res = execute(
403+
let res = Self::execute(
401404
interpreter,
402405
plan.schema(),
403406
ctx_clone,
@@ -407,60 +410,111 @@ impl ExecuteState {
407410
match CatchUnwindFuture::create(res).await {
408411
Ok(Err(err)) => {
409412
Executor::stop(&executor_clone, Err(err.clone()));
410-
block_sender_closer.close();
413+
block_sender_closer.abort();
411414
}
412415
Err(e) => {
413416
Executor::stop(&executor_clone, Err(e));
414-
block_sender_closer.close();
417+
block_sender_closer.abort();
415418
}
416419
_ => {}
417420
}
418421

419422
Ok(())
420423
}
421-
}
422424

423-
async fn execute(
424-
interpreter: Arc<dyn Interpreter>,
425-
schema: DataSchemaRef,
426-
ctx: Arc<QueryContext>,
427-
block_sender: SizedChannelSender<DataBlock>,
428-
executor: Arc<Mutex<Executor>>,
429-
) -> Result<(), ExecutionError> {
430-
let make_error = || format!("failed to execute {}", interpreter.name());
431-
432-
let mut data_stream = interpreter
433-
.execute(ctx.clone())
434-
.await
435-
.with_context(make_error)?;
436-
match data_stream.next().await {
437-
None => {
438-
let block = DataBlock::empty_with_schema(schema);
439-
block_sender.send(block, 0).await;
440-
Executor::stop::<()>(&executor, Ok(()));
441-
block_sender.close();
442-
}
443-
Some(Err(err)) => {
444-
Executor::stop(&executor, Err(err));
445-
block_sender.close();
425+
#[fastrace::trace(name = "ExecuteState::execute")]
426+
async fn execute(
427+
interpreter: Arc<dyn Interpreter>,
428+
schema: DataSchemaRef,
429+
ctx: Arc<QueryContext>,
430+
mut sender: Sender,
431+
executor: Arc<Mutex<Executor>>,
432+
) -> Result<(), ExecutionError> {
433+
let make_error = || format!("failed to execute {}", interpreter.name());
434+
435+
let mut data_stream = interpreter
436+
.execute(ctx.clone())
437+
.await
438+
.with_context(make_error)?;
439+
match data_stream.next().await {
440+
None => {
441+
Self::send_data_block(&mut sender, &executor, DataBlock::empty_with_schema(schema))
442+
.await
443+
.with_context(make_error)?;
444+
Executor::stop::<()>(&executor, Ok(()));
445+
sender.finish();
446+
}
447+
Some(Err(err)) => {
448+
Executor::stop(&executor, Err(err));
449+
sender.abort();
450+
}
451+
Some(Ok(block)) => {
452+
Self::send_data_block(&mut sender, &executor, block)
453+
.await
454+
.with_context(make_error)?;
455+
while let Some(block) = data_stream.next().await {
456+
match block {
457+
Ok(block) => {
458+
Self::send_data_block(&mut sender, &executor, block)
459+
.await
460+
.with_context(make_error)?;
461+
}
462+
Err(err) => {
463+
sender.abort();
464+
return Err(err.with_context(make_error()));
465+
}
466+
};
467+
}
468+
Executor::stop::<()>(&executor, Ok(()));
469+
sender.finish();
470+
}
446471
}
447-
Some(Ok(block)) => {
448-
let size = block.num_rows();
449-
block_sender.send(block, size).await;
450-
while let Some(block_r) = data_stream.next().await {
451-
match block_r {
452-
Ok(block) => {
453-
block_sender.send(block.clone(), block.num_rows()).await;
454-
}
455-
Err(err) => {
456-
block_sender.close();
457-
return Err(err.with_context(make_error()));
458-
}
459-
};
472+
Ok(())
473+
}
474+
475+
async fn send_data_block(
476+
sender: &mut Sender,
477+
executor: &Arc<Mutex<Executor>>,
478+
block: DataBlock,
479+
) -> Result<bool> {
480+
match sender.send(block).await {
481+
Ok(ok) => Ok(ok),
482+
Err(err) => {
483+
Executor::stop(executor, Err(err.clone()));
484+
sender.abort();
485+
Err(err)
460486
}
461-
Executor::stop::<()>(&executor, Ok(()));
462-
block_sender.close();
463487
}
464488
}
465-
Ok(())
489+
490+
fn apply_settings(ctx: &Arc<QueryContext>, block_sender: &mut Sender) -> Result<()> {
491+
let settings = ctx.get_settings();
492+
493+
let spiller = if settings.get_enable_result_set_spilling()? {
494+
let temp_dir_manager = TempDirManager::instance();
495+
let disk_bytes_limit = settings.get_result_set_spilling_to_disk_bytes_limit()?;
496+
let enable_dio = settings.get_enable_dio()?;
497+
let disk_spill = temp_dir_manager
498+
.get_disk_spill_dir(disk_bytes_limit, &ctx.get_id())
499+
.map(|temp_dir| SpillerDiskConfig::new(temp_dir, enable_dio))
500+
.transpose()?;
501+
502+
let location_prefix = ctx.query_id_spill_prefix();
503+
let config = SpillerConfig {
504+
spiller_type: SpillerType::ResultSet,
505+
location_prefix,
506+
disk_spill,
507+
use_parquet: settings.get_spilling_file_format()?.is_parquet(),
508+
};
509+
let op = DataOperator::instance().spill_operator();
510+
Some(LiteSpiller::new(op, config)?)
511+
} else {
512+
None
513+
};
514+
515+
// set_var may change settings
516+
let format_settings = ctx.get_format_settings()?;
517+
block_sender.plan_ready(format_settings, spiller);
518+
Ok(())
519+
}
466520
}

0 commit comments

Comments
 (0)