diff --git a/Cargo.lock b/Cargo.lock index f1b0b6ed7954b..46ace238cdcbc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1817,6 +1817,12 @@ version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e93abca9e28e0a1b9877922aacb20576e05d4679ffa78c3d6dc22a26a216659" +[[package]] +name = "bytesize" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3c8f83209414aacf0eeae3cf730b18d6981697fba62f200fcfb92b9f082acba" + [[package]] name = "bzip2" version = "0.4.4" @@ -3079,7 +3085,7 @@ dependencies = [ "borsh", "bytemuck", "bytes", - "bytesize", + "bytesize 2.0.1", "chrono", "concurrent-queue", "crc32fast", @@ -4051,6 +4057,7 @@ version = "0.1.0" dependencies = [ "async-backtrace", "async-trait", + "bytesize 2.0.1", "databend-common-base", "databend-common-exception", "databend-common-expression", @@ -4755,9 +4762,9 @@ dependencies = [ "log", "logforth", "opendal", - "opentelemetry 0.26.0", - "opentelemetry-otlp 0.26.0", - "opentelemetry_sdk 0.26.0", + "opentelemetry 0.29.1", + "opentelemetry-otlp 0.29.0", + "opentelemetry_sdk 0.29.0", "parquet", "serde", "serde_json", @@ -5247,6 +5254,7 @@ dependencies = [ "bumpalo", "byteorder", "bytes", + "bytesize 2.0.1", "chrono", "chrono-tz 0.8.6", "concurrent-queue", @@ -5352,8 +5360,8 @@ dependencies = [ "num_cpus", "opendal", "opensrv-mysql", - "opentelemetry 0.26.0", - "opentelemetry_sdk 0.26.0", + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", "p256", "parking_lot 0.12.3", "parquet", @@ -6512,9 +6520,9 @@ checksum = "a2a2b11eda1d40935b26cf18f6833c526845ae8c41e58d09af6adeb6f0269183" [[package]] name = "fastrace" -version = "0.7.9" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c5a9b2e56fac2bf32bca26fdc509f674d0f2bdd15404b629ccee9c642453bb7" +checksum = "318783b9fefe06130ab664ff1779215657586b004c0c7f3d6ece16d658936d06" dependencies = [ "fastant", "fastrace-macro", @@ -6527,9 +6535,9 @@ dependencies = [ [[package]] name = "fastrace-macro" -version = "0.7.9" +version = "0.7.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9cdabd2b113942d0f771c11a7baf0edd24098b923ac546fd39b9811c82b4220" +checksum = "c7079009cf129d63c850dee732b58d7639d278a47ad99c607954ac94cfd57ef4" dependencies = [ "proc-macro-error2", "proc-macro2", @@ -6539,15 +6547,15 @@ dependencies = [ [[package]] name = "fastrace-opentelemetry" -version = "0.7.4" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b9ddbe3c21df04560ef1dfd29f426f7fb8ee1814e5c4b4260e7162c6c67d359" +checksum = "1c1cc020aea54a15f53ca1d4e42321d40f9e767f42d222ea1fdbe9c43ea49d15" dependencies = [ "fastrace", - "futures", "log", - "opentelemetry 0.26.0", - "opentelemetry_sdk 0.26.0", + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", + "pollster", ] [[package]] @@ -7492,7 +7500,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac7045ac9fe5f9c727f38799d002a7ed3583cd777e3322a7c4b43e3cf437dc69" dependencies = [ "bytes", - "bytesize", + "bytesize 1.3.3", "crc32fast", "crossbeam-channel", "flate2", @@ -11096,23 +11104,23 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.26.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" dependencies = [ "futures-core", "futures-sink", "js-sys", - "once_cell", "pin-project-lite", - "thiserror 1.0.69", + "thiserror 2.0.12", + "tracing", ] [[package]] name = "opentelemetry" -version = "0.28.0" +version = "0.29.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "236e667b670a5cdf90c258f5a55794ec5ac5027e960c224bff8367a59e1e6426" +checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" dependencies = [ "futures-core", "futures-sink", @@ -11122,19 +11130,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-http" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6351496aeaa49d7c267fb480678d85d1cd30c5edb20b497c48c56f62a8c14b99" -dependencies = [ - "async-trait", - "bytes", - "http 1.3.1", - "opentelemetry 0.26.0", - "reqwest", -] - [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -11150,23 +11145,17 @@ dependencies = [ ] [[package]] -name = "opentelemetry-otlp" -version = "0.26.0" +name = "opentelemetry-http" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e1f9c8b032d4f635c730c0efcf731d5e2530ea13fa8bef7939ddc8420696bd" +checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" dependencies = [ "async-trait", - "futures-core", + "bytes", "http 1.3.1", - "opentelemetry 0.26.0", - "opentelemetry-http 0.26.0", - "opentelemetry-proto 0.26.1", - "opentelemetry_sdk 0.26.0", - "prost", + "opentelemetry 0.29.1", "reqwest", - "thiserror 1.0.69", - "tokio", - "tonic", + "tracing", ] [[package]] @@ -11192,15 +11181,23 @@ dependencies = [ ] [[package]] -name = "opentelemetry-proto" -version = "0.26.1" +name = "opentelemetry-otlp" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d3968ce3aefdcca5c27e3c4ea4391b37547726a70893aab52d3de95d5f8b34" +checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" dependencies = [ - "opentelemetry 0.26.0", - "opentelemetry_sdk 0.26.0", + "futures-core", + "http 1.3.1", + "opentelemetry 0.29.1", + "opentelemetry-http 0.29.0", + "opentelemetry-proto 0.29.0", + "opentelemetry_sdk 0.29.0", "prost", + "reqwest", + "thiserror 2.0.12", + "tokio", "tonic", + "tracing", ] [[package]] @@ -11218,41 +11215,52 @@ dependencies = [ "tonic", ] +[[package]] +name = "opentelemetry-proto" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +dependencies = [ + "opentelemetry 0.29.1", + "opentelemetry_sdk 0.29.0", + "prost", + "tonic", +] + [[package]] name = "opentelemetry_sdk" -version = "0.26.0" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" dependencies = [ "async-trait", "futures-channel", "futures-executor", "futures-util", "glob", - "once_cell", - "opentelemetry 0.26.0", + "opentelemetry 0.28.0", "percent-encoding", "rand 0.8.5", "serde_json", - "thiserror 1.0.69", + "thiserror 2.0.12", "tokio", "tokio-stream", + "tracing", ] [[package]] name = "opentelemetry_sdk" -version = "0.28.0" +version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84dfad6042089c7fc1f6118b7040dc2eb4ab520abbf410b79dc481032af39570" +checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" dependencies = [ - "async-trait", "futures-channel", "futures-executor", "futures-util", "glob", - "opentelemetry 0.28.0", + "opentelemetry 0.29.1", "percent-encoding", - "rand 0.8.5", + "rand 0.9.1", "serde_json", "thiserror 2.0.12", "tokio", @@ -11770,6 +11778,12 @@ dependencies = [ "syn 2.0.101", ] +[[package]] +name = "pollster" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f3a9f18d041e6d0e102a0a46750538147e5e8992d3b4873aaafee2520b00ce3" + [[package]] name = "polyval" version = "0.6.2" @@ -12058,7 +12072,7 @@ version = "28.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "744a264d26b88a6a7e37cbad97953fa233b94d585236310bcbc88474b4092d79" dependencies = [ - "bytesize", + "bytesize 1.3.3", "human_format", ] @@ -15621,6 +15635,7 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", + "flate2", "h2 0.4.10", "http 1.3.1", "http-body 1.0.1", diff --git a/Cargo.toml b/Cargo.toml index feb53100b2081..10e5dd2b15ecd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -274,7 +274,7 @@ byte-unit = "5.1.6" bytemuck = { version = "1", features = ["derive"] } byteorder = "1.4.3" bytes = "1.5.0" -bytesize = "1.1.0" +bytesize = "2" cbordata = { version = "0.6.0" } cfg-if = "1.0.0" chrono = { version = "0.4.40", features = ["serde"] } @@ -568,19 +568,22 @@ vergen = { version = "8.3.1", default-features = false, features = ["build", "ca # Observability env_logger = "0.11" -fastrace = { version = "0.7.4", features = ["enable"] } -fastrace-opentelemetry = "0.7.4" +fastrace = { version = "0.7.14", features = ["enable"] } +fastrace-opentelemetry = "0.10" log = { version = "0.4.27", features = ["serde", "kv_unstable_std"] } logcall = "0.1.9" -opentelemetry = { version = "0.26.0", features = ["trace", "logs"] } -opentelemetry-otlp = { version = "0.26.0", features = [ +opentelemetry = { version = "0.29", features = ["trace", "logs"] } +opentelemetry-otlp = { version = "0.29", default-features = false, features = [ "trace", "logs", + "metrics", + "internal-logs", "grpc-tonic", + "gzip-tonic", "http-proto", "reqwest-client", ] } -opentelemetry_sdk = { version = "0.26.0", features = ["trace", "logs", "rt-tokio"] } +opentelemetry_sdk = { version = "0.29", features = ["trace", "logs", "rt-tokio"] } prometheus-client = "0.22" prometheus-parse = "0.2.3" tracing = "0.1.40" diff --git a/scripts/ci/deploy/databend-query-cluster-tracing.sh b/scripts/ci/deploy/databend-query-cluster-tracing.sh index 76c89ec1eb741..18651376388c4 100755 --- a/scripts/ci/deploy/databend-query-cluster-tracing.sh +++ b/scripts/ci/deploy/databend-query-cluster-tracing.sh @@ -29,21 +29,22 @@ python3 scripts/ci/wait_tcp.py --timeout 30 --port 9191 export RUST_BACKTRACE=1 export LOG_TRACING_ON=true +export LOG_TRACING_CAPTURE_LOG_LEVEL="Warn,databend_=Debug" echo 'Start databend-query node-1' -nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-1.toml --internal-enable-sandbox-tenant --log-tracing-level Warn,databend_=Debug,openraft=Info >./.databend/query-1.out 2>&1 & +nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-1.toml --internal-enable-sandbox-tenant >./.databend/query-1.out 2>&1 & echo "Waiting on node-1..." python3 scripts/ci/wait_tcp.py --timeout 30 --port 9091 echo 'Start databend-query node-2' -nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-2.toml --internal-enable-sandbox-tenant --log-tracing-level Warn,databend_=Debug,openraft=Info >./.databend/query-2.out 2>&1 & +nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-2.toml --internal-enable-sandbox-tenant >./.databend/query-2.out 2>&1 & echo "Waiting on node-2..." python3 scripts/ci/wait_tcp.py --timeout 30 --port 9092 echo 'Start databend-query node-3' -nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-3.toml --internal-enable-sandbox-tenant --log-tracing-level Warn,databend_=Debug,openraft=Info >./.databend/query-3.out 2>&1 & +nohup target/${BUILD_PROFILE}/databend-query -c scripts/ci/deploy/config/databend-query-node-3.toml --internal-enable-sandbox-tenant >./.databend/query-3.out 2>&1 & echo "Waiting on node-3..." python3 scripts/ci/wait_tcp.py --timeout 30 --port 9093 diff --git a/src/common/tracing/Cargo.toml b/src/common/tracing/Cargo.toml index df1cca567b9ed..6ff7d6f14b696 100644 --- a/src/common/tracing/Cargo.toml +++ b/src/common/tracing/Cargo.toml @@ -26,7 +26,7 @@ log = { workspace = true } logforth = { workspace = true } opendal = { workspace = true } opentelemetry = { workspace = true } -opentelemetry-otlp = { workspace = true, features = ["reqwest-client"] } +opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } parquet = { workspace = true } serde = { workspace = true } diff --git a/src/common/tracing/src/init.rs b/src/common/tracing/src/init.rs index 5ad9635219619..d88ce303351ad 100644 --- a/src/common/tracing/src/init.rs +++ b/src/common/tracing/src/init.rs @@ -15,7 +15,6 @@ use std::borrow::Cow; use std::collections::BTreeMap; use std::sync::Arc; -use std::time::Duration; use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::RwLock; @@ -26,7 +25,9 @@ use log::LevelFilter; use logforth::filter::env_filter::EnvFilterBuilder; use logforth::filter::EnvFilter; use opendal::Operator; +use opentelemetry_otlp::Compression; use opentelemetry_otlp::WithExportConfig; +use opentelemetry_otlp::WithTonicConfig; use crate::config::OTLPProtocol; use crate::filter::ThreadTrackerFilter; @@ -141,41 +142,40 @@ pub fn init_logging( // initialize tracing a reporter if cfg.tracing.on { let endpoint = cfg.tracing.otlp.endpoint.clone(); - let mut kvs = cfg - .tracing - .otlp - .labels - .iter() - .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())) - .collect::>(); - kvs.push(opentelemetry::KeyValue::new( - "service.name", - trace_name.clone(), - )); - for (k, v) in &labels { - kvs.push(opentelemetry::KeyValue::new(k.to_string(), v.to_string())); - } let exporter = match cfg.tracing.otlp.protocol { - OTLPProtocol::Grpc => opentelemetry_otlp::new_exporter() - .tonic() + OTLPProtocol::Grpc => opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_compression(Compression::Gzip) .with_endpoint(endpoint) .with_protocol(opentelemetry_otlp::Protocol::Grpc) - .with_timeout(Duration::from_secs( - opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, - )) - .build_span_exporter() + .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT) + .build() .expect("initialize oltp grpc exporter"), - OTLPProtocol::Http => opentelemetry_otlp::new_exporter() - .http() + OTLPProtocol::Http => opentelemetry_otlp::SpanExporter::builder() + .with_http() .with_endpoint(endpoint) .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) - .with_timeout(Duration::from_secs( - opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT, - )) - .build_span_exporter() + .with_timeout(opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT) + .build() .expect("initialize oltp http exporter"), }; - let (reporter_rt, otlp_reporter) = Thread::spawn(|| { + + let resource = opentelemetry_sdk::Resource::builder() + .with_service_name(trace_name.clone()) + .with_attributes( + cfg.tracing + .otlp + .labels + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())), + ) + .with_attributes( + labels + .iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.to_string(), v.to_string())), + ) + .build(); + let (reporter_rt, otlp_reporter) = Thread::spawn(move || { // init runtime with 2 threads let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(2) @@ -186,8 +186,8 @@ pub fn init_logging( fastrace_opentelemetry::OpenTelemetryReporter::new( exporter, opentelemetry::trace::SpanKind::Server, - Cow::Owned(opentelemetry_sdk::Resource::new(kvs)), - opentelemetry::InstrumentationLibrary::builder(trace_name).build(), + Cow::Owned(resource), + opentelemetry::InstrumentationScope::builder(trace_name).build(), ) }); (rt, reporter) @@ -195,11 +195,12 @@ pub fn init_logging( .join() .unwrap(); + let trace_config = fastrace::collector::Config::default(); if cfg.structlog.on { let reporter = StructLogReporter::wrap(otlp_reporter); - fastrace::set_reporter(reporter, fastrace::collector::Config::default()); + fastrace::set_reporter(reporter, trace_config); } else { - fastrace::set_reporter(otlp_reporter, fastrace::collector::Config::default()); + fastrace::set_reporter(otlp_reporter, trace_config); } _drop_guards.push(Box::new(defer::defer(fastrace::flush))); @@ -228,8 +229,8 @@ pub fn init_logging( .filter(env_filter(&cfg.file.level)) .filter(ThreadTrackerFilter) .append(match cfg.file.format.as_str() { - "text" => normal_log_file.with_layout(TextLayout), - "json" => normal_log_file.with_layout(JsonLayout), + "text" => normal_log_file.with_layout(TextLayout::), + "json" => normal_log_file.with_layout(JsonLayout::), "identical" => normal_log_file.with_layout(IdenticalLayout), _ => { unimplemented!("file logging format {} is not supported", &cfg.file.format) @@ -245,8 +246,8 @@ pub fn init_logging( .filter(env_filter(&cfg.stderr.level)) .filter(ThreadTrackerFilter) .append(match cfg.stderr.format.as_str() { - "text" => logforth::append::Stderr::default().with_layout(TextLayout), - "json" => logforth::append::Stderr::default().with_layout(JsonLayout), + "text" => logforth::append::Stderr::default().with_layout(TextLayout::), + "json" => logforth::append::Stderr::default().with_layout(JsonLayout::), "identical" => logforth::append::Stderr::default().with_layout(IdenticalLayout), _ => { unimplemented!( @@ -371,17 +372,14 @@ pub fn init_logging( .chain(&endpoint.labels) .map(|(k, v)| (Cow::from(k.clone()), Cow::from(v.clone()))) .chain([(Cow::from("category"), Cow::from("profile"))]); - let mut otel_builder = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( + let otel = logforth::append::opentelemetry::OpentelemetryLogBuilder::new( log_name, format!("{}/v1/logs", &endpoint.endpoint), ) - .protocol(endpoint.protocol.into()); - for (k, v) in labels { - otel_builder = otel_builder.label(k, v); - } - let otel = otel_builder - .build() - .expect("initialize opentelemetry logger"); + .protocol(endpoint.protocol.into()) + .labels(labels) + .build() + .expect("initialize opentelemetry logger"); logger = logger.dispatch(|dispatch| { dispatch .filter(EnvFilter::new( diff --git a/src/common/tracing/src/loggers.rs b/src/common/tracing/src/loggers.rs index a2f4156bb7319..c66a253bc5b92 100644 --- a/src/common/tracing/src/loggers.rs +++ b/src/common/tracing/src/loggers.rs @@ -82,20 +82,27 @@ impl Layout for IdenticalLayout { } #[derive(Debug)] +pub struct TextLayout; -pub struct TextLayout; - -impl Layout for TextLayout { +impl Layout for TextLayout { fn format(&self, record: &Record, _diagnostics: &[Diagnostic]) -> anyhow::Result> { let mut buf = Vec::new(); if let Some(query_id) = ThreadTracker::query_id() { write!(buf, "{query_id} ")?; } + let timestamp = if FIXED_TIME { + chrono::DateTime::from_timestamp(1431648000, 123456789) + .unwrap() + .with_timezone(&chrono::Local) + } else { + chrono::Local::now() + }; + write!( buf, "{} {:>5} {}: {}:{} {}", - chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), record.level(), record.module_path().unwrap_or(""), Path::new(record.file().unwrap_or_default()) @@ -112,9 +119,9 @@ impl Layout for TextLayout { } #[derive(Debug)] -pub struct JsonLayout; +pub struct JsonLayout; -impl Layout for JsonLayout { +impl Layout for JsonLayout { fn format(&self, record: &Record, _diagnostics: &[Diagnostic]) -> anyhow::Result> { let mut fields = Map::new(); fields.insert("message".to_string(), format!("{}", record.args()).into()); @@ -122,11 +129,19 @@ impl Layout for JsonLayout { fields.insert(k, v.into()); } + let timestamp = if FIXED_TIME { + chrono::DateTime::from_timestamp(1431648000, 123456789) + .unwrap() + .with_timezone(&chrono::Local) + } else { + chrono::Local::now() + }; + let s = match ThreadTracker::query_id() { None => { format!( r#"{{"timestamp":"{}","level":"{}","fields":{}}}"#, - chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), record.level(), serde_json::to_string(&fields).unwrap_or_default(), ) @@ -134,7 +149,7 @@ impl Layout for JsonLayout { Some(query_id) => { format!( r#"{{"timestamp":"{}","level":"{}","query_id":"{}","fields":{}}}"#, - chrono::Local::now().to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), record.level(), query_id, serde_json::to_string(&fields).unwrap_or_default(), @@ -179,3 +194,207 @@ impl<'kvs> log::kv::VisitSource<'kvs> for KvCollector { Ok(()) } } + +#[cfg(test)] +mod tests { + use log::Level; + use log::Record; + + use super::*; + + // Test case structure for shared test data + struct TestCase { + name: &'static str, + message: &'static str, + record: Record<'static>, + expected_text_output: String, + expected_json_output: String, + } + + // Helper function to create test cases with records + fn create_test_cases() -> Vec { + // Generate the expected timestamp string with current timezone + let fixed_timestamp = chrono::DateTime::from_timestamp(1431648000, 123456789) + .unwrap() + .with_timezone(&chrono::Local); + let timestamp_str = fixed_timestamp.to_rfc3339_opts(chrono::SecondsFormat::Micros, true); + + vec![ + TestCase { + name: "basic message", + message: "test message", + record: Record::builder() + .args(format_args!("test message")) + .level(Level::Info) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} INFO test::module: test_file.rs:42 test message", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"INFO","fields":{{"message":"test message"}}}}"#, + timestamp_str + ), + }, + TestCase { + name: "empty message", + message: "", + record: Record::builder() + .args(format_args!("")) + .level(Level::Info) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} INFO test::module: test_file.rs:42 ", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"INFO","fields":{{"message":""}}}}"#, + timestamp_str + ), + }, + TestCase { + name: "error level", + message: "error occurred", + record: Record::builder() + .args(format_args!("error occurred")) + .level(Level::Error) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} ERROR test::module: test_file.rs:42 error occurred", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"ERROR","fields":{{"message":"error occurred"}}}}"#, + timestamp_str + ), + }, + TestCase { + name: "warn level", + message: "warning message", + record: Record::builder() + .args(format_args!("warning message")) + .level(Level::Warn) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} WARN test::module: test_file.rs:42 warning message", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"WARN","fields":{{"message":"warning message"}}}}"#, + timestamp_str + ), + }, + TestCase { + name: "debug level", + message: "debug info", + record: Record::builder() + .args(format_args!("debug info")) + .level(Level::Debug) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} DEBUG test::module: test_file.rs:42 debug info", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"DEBUG","fields":{{"message":"debug info"}}}}"#, + timestamp_str + ), + }, + TestCase { + name: "trace level", + message: "trace data", + record: Record::builder() + .args(format_args!("trace data")) + .level(Level::Trace) + .target("test_target") + .module_path(Some("test::module")) + .file(Some("test_file.rs")) + .line(Some(42)) + .build(), + expected_text_output: format!( + "{} TRACE test::module: test_file.rs:42 trace data", + timestamp_str + ), + expected_json_output: format!( + r#"{{"timestamp":"{}","level":"TRACE","fields":{{"message":"trace data"}}}}"#, + timestamp_str + ), + }, + ] + } + + #[test] + fn test_identical_layout() { + let layout = IdenticalLayout; + let diagnostics = []; + let test_cases = create_test_cases(); + + for test_case in &test_cases { + let result = layout.format(&test_case.record, &diagnostics).unwrap(); + let output = String::from_utf8(result).unwrap(); + + // IdenticalLayout should return the message as-is + assert_eq!( + output, test_case.message, + "Failed test case '{}': expected '{}', got '{}'", + test_case.name, test_case.message, output + ); + } + } + + #[test] + fn test_text_layout() { + let layout = TextLayout::; // Use fixed time + let diagnostics = []; + let test_cases = create_test_cases(); + + for test_case in &test_cases { + let result = layout.format(&test_case.record, &diagnostics).unwrap(); + let output = String::from_utf8(result).unwrap(); + + assert_eq!( + output, test_case.expected_text_output, + "Failed test case '{}': expected '{}', got '{}'", + test_case.name, test_case.expected_text_output, output + ); + } + } + + #[test] + fn test_json_layout() { + let layout = JsonLayout::; // Use fixed time + let diagnostics = []; + let test_cases = create_test_cases(); + + for test_case in &test_cases { + let result = layout.format(&test_case.record, &diagnostics).unwrap(); + let output = String::from_utf8(result).unwrap(); + + assert_eq!( + output, test_case.expected_json_output, + "Failed test case '{}': expected '{}', got '{}'", + test_case.name, test_case.expected_json_output, output + ); + } + } +} diff --git a/src/query/pipeline/transforms/Cargo.toml b/src/query/pipeline/transforms/Cargo.toml index f423beba00929..44c8238882517 100644 --- a/src/query/pipeline/transforms/Cargo.toml +++ b/src/query/pipeline/transforms/Cargo.toml @@ -9,6 +9,7 @@ edition = { workspace = true } [dependencies] async-backtrace = { workspace = true } async-trait = { workspace = true } +bytesize = { workspace = true } databend-common-base = { workspace = true } databend-common-exception = { workspace = true } databend-common-expression = { workspace = true } diff --git a/src/query/pipeline/transforms/src/processors/memory_settings.rs b/src/query/pipeline/transforms/src/processors/memory_settings.rs index 64e246e7bb53c..f6e39173885d2 100644 --- a/src/query/pipeline/transforms/src/processors/memory_settings.rs +++ b/src/query/pipeline/transforms/src/processors/memory_settings.rs @@ -12,9 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::fmt::Debug; use std::sync::atomic::Ordering; use std::sync::Arc; +use bytesize::ByteSize; use databend_common_base::runtime::MemStat; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::GLOBAL_MEM_STAT; @@ -32,6 +34,43 @@ pub struct MemorySettings { pub spill_unit_size: usize, } +impl Debug for MemorySettings { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + struct Tracking<'a>(&'a MemStat); + + impl Debug for Tracking<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemStat") + .field("used", &ByteSize(self.0.get_memory_usage() as _)) + .field("peak_used", &ByteSize(self.0.get_peak_memory_usage() as _)) + .field("memory_limit", &self.0.get_limit()) + .finish() + } + } + + let mut f = f.debug_struct("MemorySettings"); + let mut f = f + .field("max_memory_usage", &ByteSize(self.max_memory_usage as _)) + .field("enable_global_level_spill", &self.enable_global_level_spill) + .field( + "global_memory_tracking", + &Tracking(self.global_memory_tracking), + ) + .field( + "max_query_memory_usage", + &ByteSize(self.max_query_memory_usage as _), + ); + + if let Some(tracking) = &self.query_memory_tracking { + f = f.field("query_memory_tracking", &Tracking(tracking)); + } + + f.field("enable_query_level_spill", &self.enable_query_level_spill) + .field("spill_unit_size", &self.spill_unit_size) + .finish() + } +} + impl MemorySettings { pub fn disable_spill() -> MemorySettings { MemorySettings { @@ -80,6 +119,60 @@ impl MemorySettings { self.enable_query_level_spill && query_memory_tracking.get_memory_usage() >= self.max_query_memory_usage } + + fn check_global(&self) -> Option { + self.enable_global_level_spill.then(|| { + let usage = self.global_memory_tracking.get_memory_usage(); + if usage >= self.max_memory_usage { + -((usage - self.max_memory_usage) as isize) + } else { + (self.max_memory_usage - usage) as isize + } + }) + } + + fn check_workload_group(&self) -> Option { + let workload_group = ThreadTracker::workload_group()?; + let usage = workload_group.mem_stat.get_memory_usage(); + let max_memory_usage = workload_group.max_memory_usage.load(Ordering::Relaxed); + + if max_memory_usage == 0 { + return None; + } + + Some(if usage >= max_memory_usage { + -((usage - max_memory_usage) as isize) + } else { + (max_memory_usage - usage) as isize + }) + } + + fn check_query(&self) -> Option { + if !self.enable_query_level_spill { + return None; + } + + let query_memory_tracking = self.query_memory_tracking.as_ref()?; + let usage = query_memory_tracking.get_memory_usage(); + + Some(if usage >= self.max_query_memory_usage { + -((usage - self.max_query_memory_usage) as isize) + } else { + (self.max_query_memory_usage - usage) as isize + }) + } + + pub fn check_spill_remain(&self) -> isize { + [ + self.check_global(), + self.check_workload_group(), + self.check_query(), + ] + .into_iter() + .flatten() + .reduce(|a, b| a.min(b)) + .unwrap_or(1) + } } #[cfg(test)] diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index f769fcb8e7c86..a60630eb3d977 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -18,6 +18,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; +use bytesize::ByteSize; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::row::RowConverter as CommonConverter; @@ -50,7 +51,7 @@ pub struct TransformSortMerge { aborting: Arc, /// Record current memory usage. - num_bytes: usize, + num_bytes: ByteSize, num_rows: usize, _r: PhantomData, @@ -70,7 +71,7 @@ impl TransformSortMerge { block_size, buffer: vec![], aborting: Arc::new(AtomicBool::new(false)), - num_bytes: 0, + num_bytes: ByteSize(0), num_rows: 0, _r: PhantomData, } @@ -89,7 +90,7 @@ impl MergeSort for TransformSortMerge { return Ok(()); } - self.num_bytes += block.memory_size(); + self.num_bytes += block.memory_size() as u64; self.num_rows += block.num_rows(); self.buffer.push(Some((block, init_rows.to_column()))); @@ -109,7 +110,7 @@ impl MergeSort for TransformSortMerge { } #[inline(always)] - fn num_bytes(&self) -> usize { + fn num_bytes(&self) -> ByteSize { self.num_bytes } @@ -122,7 +123,7 @@ impl MergeSort for TransformSortMerge { let blocks = self.merge_sort(spill_batch_size)?; self.num_rows = 0; - self.num_bytes = 0; + self.num_bytes = ByteSize(0); debug_assert!(self.buffer.is_empty()); diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs index c3c4a8180099f..693532d38235f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs @@ -15,6 +15,7 @@ use std::marker::PhantomData; use std::sync::Arc; +use bytesize::ByteSize; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; @@ -31,6 +32,35 @@ pub struct SortSpillParams { pub num_merge: usize, } +impl SortSpillParams { + pub fn max_rows(&self) -> usize { + self.batch_rows * self.num_merge + } + + pub fn determine( + bytes: ByteSize, + rows: usize, + spill_unit_size: ByteSize, + max_block_rows: usize, + ) -> Self { + // We use the first memory calculation to estimate the batch size and the number of merge. + let block = usize::max( + (bytes.0).div_ceil(spill_unit_size.0) as _, + rows.div_ceil(max_block_rows), + ); + let batch_rows = (rows / block).max(1); + + /// The memory will be doubled during merging. + const MERGE_RATIO: usize = 2; + let num_merge = (rows / MERGE_RATIO / batch_rows).max(2); + log::info!(buffer_bytes:? = bytes, buffer_rows = rows, spill_unit_size:?, batch_rows, batch_num_merge = num_merge; "determine sort spill params"); + SortSpillParams { + batch_rows, + num_merge, + } + } +} + pub trait MergeSort { const NAME: &'static str; @@ -40,7 +70,7 @@ pub trait MergeSort { fn add_block(&mut self, block: DataBlock, init_rows: R) -> Result<()>; /// Return buffered data size. - fn num_bytes(&self) -> usize; + fn num_bytes(&self) -> ByteSize; /// Return buffered rows. fn num_rows(&self) -> usize; diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs index 6af796f82a110..f995fdb8ccfb7 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_limit.rs @@ -17,6 +17,7 @@ use std::cmp::Reverse; use std::collections::HashMap; use std::intrinsics::unlikely; +use bytesize::ByteSize; use databend_common_base::containers::FixedHeap; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -32,7 +33,7 @@ pub struct TransformSortMergeLimit { buffer: HashMap, /// Record current memory usage. - num_bytes: usize, + num_bytes: ByteSize, num_rows: usize, /// The index for the next input block. @@ -54,7 +55,7 @@ impl MergeSort for TransformSortMergeLimit { self.next_index += 1; let mut cursor = Cursor::new(input_index, init_rows); - self.num_bytes += block.memory_size(); + self.num_bytes += block.memory_size() as u64; self.num_rows += block.num_rows(); let cur_index = input_index; self.buffer.insert(cur_index, block); @@ -65,7 +66,7 @@ impl MergeSort for TransformSortMergeLimit { // Evict the first row of the block, // which means the block must not appear in the Top-N result. if let Some(block) = self.buffer.remove(&evict.input_index) { - self.num_bytes -= block.memory_size(); + self.num_bytes -= block.memory_size() as u64; self.num_rows -= block.num_rows(); } } @@ -90,7 +91,7 @@ impl MergeSort for TransformSortMergeLimit { } #[inline(always)] - fn num_bytes(&self) -> usize { + fn num_bytes(&self) -> ByteSize { self.num_bytes } @@ -138,7 +139,7 @@ impl TransformSortMergeLimit { buffer: HashMap::with_capacity(limit), block_size, next_index: 0, - num_bytes: 0, + num_bytes: ByteSize(0), num_rows: 0, } } @@ -184,7 +185,7 @@ impl TransformSortMergeLimit { } self.buffer.clear(); - self.num_bytes = 0; + self.num_bytes = ByteSize(0); self.num_rows = 0; output_blocks diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 40387b96d4a28..42540c2b6f02b 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -41,6 +41,7 @@ buf-list = { workspace = true } bumpalo = { workspace = true } byteorder = { workspace = true } bytes = { workspace = true } +bytesize = { workspace = true } chrono = { workspace = true } chrono-tz = { workspace = true } concurrent-queue = { workspace = true } diff --git a/src/query/service/src/interpreters/interpreter.rs b/src/query/service/src/interpreters/interpreter.rs index 3ab8936ea8705..65187ea3c3a09 100644 --- a/src/query/service/src/interpreters/interpreter.rs +++ b/src/query/service/src/interpreters/interpreter.rs @@ -220,6 +220,8 @@ fn log_query_finished(ctx: &QueryContext, error: Option, has_profiles ); } + log::info!(memory:? = ctx.get_node_peek_memory_usage(); "total memory usage"); + if let Err(error) = InterpreterQueryLog::log_finish(ctx, now, error, has_profiles) { error!("[INTERPRETER] Failed to log query finish: {:?}", error) } @@ -338,6 +340,7 @@ fn attach_query_hash(ctx: &Arc, stmt: &mut Option, sql: ctx.attach_query_hash(query_hash, query_parameterized_hash); } +#[fastrace::trace] pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc) -> Result<()> { let mut has_profiles = false; query_ctx.add_query_profiles(&info.profiling); diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 8b2df0a3e10d7..7f8dbc613d533 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -167,6 +167,7 @@ impl PipelineExecutor { Ok(()) } + #[fastrace::trace(name = "PipelineExecutor::execute")] pub fn execute(&self) -> Result<()> { let instants = Instant::now(); let _guard = defer(move || { diff --git a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs index 4837ea4053d95..82cc7df08412c 100644 --- a/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_pulling_executor.rs @@ -197,6 +197,7 @@ impl PipelinePullingExecutor { } } + #[fastrace::trace] pub fn finish(&self, cause: Option) { let _guard = ThreadTracker::tracking(self.tracking_payload.clone()); diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index 9f8122eb19dd8..b9899d0ce17e6 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -102,7 +102,7 @@ impl QueryPipelineExecutor { } } - #[fastrace::trace] + #[fastrace::trace(name = "QueryPipelineExecutor::from_pipelines")] pub fn from_pipelines( mut pipelines: Vec, settings: ExecutorSettings, @@ -192,6 +192,7 @@ impl QueryPipelineExecutor { })) } + #[fastrace::trace(name = "QueryPipelineExecutor::on_finished")] fn on_finished(&self, info: ExecutionInfo) -> Result<()> { let mut on_finished_chain = self.on_finished_chain.lock(); @@ -201,6 +202,7 @@ impl QueryPipelineExecutor { on_finished_chain.apply(info) } + #[fastrace::trace(name = "QueryPipelineExecutor::finish")] pub fn finish(&self, cause: Option>) { let cause = cause.map(|err| err.with_context("[PIPELINE-EXECUTOR] Pipeline executor finished")); @@ -223,7 +225,7 @@ impl QueryPipelineExecutor { self.global_tasks_queue.is_finished() } - #[fastrace::trace] + #[fastrace::trace(name = "QueryPipelineExecutor::execute")] pub fn execute(self: &Arc) -> Result<()> { self.init(self.graph.clone())?; diff --git a/src/query/service/src/pipelines/memory_settings.rs b/src/query/service/src/pipelines/memory_settings.rs index c7bea0e7922a9..b6b760e17f5e9 100644 --- a/src/query/service/src/pipelines/memory_settings.rs +++ b/src/query/service/src/pipelines/memory_settings.rs @@ -316,11 +316,11 @@ mod tests { let settings = ctx.get_settings(); ctx.set_enable_sort_spill(true); - settings.set_setting("sort_spilling_batch_bytes".into(), "8192".into())?; + settings.set_setting("sort_spilling_batch_bytes".into(), "1048576".into())?; let memory_settings = MemorySettings::from_sort_settings(&ctx)?; - assert_eq!(memory_settings.spill_unit_size, 8192); + assert_eq!(memory_settings.spill_unit_size, 1048576); Ok(()) } diff --git a/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs b/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs index c63491eca5392..a050845caadc7 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/merge_sort.rs @@ -19,6 +19,7 @@ use std::sync::atomic; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use bytesize::ByteSize; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; @@ -40,7 +41,7 @@ use super::sort_spill::MemoryMerger; use super::sort_spill::OutputData; use super::sort_spill::SortSpill; use super::Base; -use super::MemoryRows; +use super::RowsStat; use crate::spillers::Spiller; #[derive(Debug)] @@ -164,8 +165,8 @@ where fn collect_trans_to_spill(&mut self, input_data: Vec) { let (num_rows, num_bytes) = input_data .iter() - .map(|block| (block.num_rows(), block.memory_size())) - .fold((0, 0), |(acc_rows, acc_bytes), (rows, bytes)| { + .map(|block| (block.num_rows(), ByteSize(block.memory_size() as _))) + .fold((0, ByteSize(0)), |(acc_rows, acc_bytes), (rows, bytes)| { (acc_rows + rows, acc_bytes + bytes) }); let params = self.determine_params(num_bytes, num_rows); @@ -186,20 +187,13 @@ where } } - fn determine_params(&self, bytes: usize, rows: usize) -> SortSpillParams { - // We use the first memory calculation to estimate the batch size and the number of merge. - let unit_size = self.memory_settings.spill_unit_size; - let num_merge = bytes.div_ceil(unit_size).max(2); - let batch_rows = rows.div_ceil(num_merge); - - /// The memory will be doubled during merging. - const MERGE_RATIO: usize = 2; - let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2); - log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}"); - SortSpillParams { - batch_rows, - num_merge, - } + fn determine_params(&self, bytes: ByteSize, rows: usize) -> SortSpillParams { + SortSpillParams::determine( + bytes, + rows, + ByteSize(self.memory_settings.spill_unit_size as _), + self.max_block_size, + ) } fn collect_block(&mut self, block: DataBlock) -> Result<()> { @@ -275,28 +269,32 @@ where self.output.push_data(Ok(block)); } - fn input_rows(&self) -> usize { + fn input_rows(&self) -> (usize, usize) { match &self.inner { - Inner::Collect(input_data) | Inner::Spill(input_data, _) => input_data.in_memory_rows(), - _ => 0, + Inner::Collect(input_data) | Inner::Spill(input_data, _) => { + (input_data.len(), input_data.in_memory_rows()) + } + _ => (0, 0), } } fn check_spill(&self) -> bool { - if !self.memory_settings.check_spill() { - return false; - } - match &self.inner { Inner::Limit(limit_sort) => { - limit_sort.num_bytes() > self.memory_settings.spill_unit_size * 2 + self.memory_settings.check_spill() + && limit_sort.num_bytes() + >= ByteSize(self.memory_settings.spill_unit_size as _) * 2_u64 } Inner::Collect(input_data) => { - input_data.iter().map(|b| b.memory_size()).sum::() - > self.memory_settings.spill_unit_size * 2 + self.memory_settings.check_spill() + && input_data.iter().map(|b| b.memory_size()).sum::() + >= self.memory_settings.spill_unit_size * 2 } Inner::Spill(input_data, sort_spill) => { - input_data.in_memory_rows() > sort_spill.max_rows() + let rows = input_data.in_memory_rows(); + let params = sort_spill.params(); + self.memory_settings.check_spill() && rows >= params.batch_rows * 2 + || input_data.in_memory_rows() >= params.max_rows() } _ => unreachable!(), } @@ -422,22 +420,16 @@ where let finished = self.input.is_finished(); self.trans_to_spill()?; - let input = self.input_rows(); + let (incoming_block, incoming) = self.input_rows(); let Inner::Spill(input_data, spill_sort) = &mut self.inner else { unreachable!() }; - let memory_rows = spill_sort.collect_memory_rows(); - let max = spill_sort.max_rows(); - if memory_rows > 0 && memory_rows + input > max { - spill_sort - .collect_spill_last(memory_rows + input - max) - .await?; - } - let need_spill = input > max; - if need_spill || finished && input > 0 { + if incoming > 0 { + let total_rows = spill_sort.collect_total_rows(); + log::debug!(incoming_block, incoming_rows = incoming, total_rows, finished; "sort_input_data"); spill_sort - .sort_input_data(std::mem::take(input_data), need_spill, &self.aborting) + .sort_input_data(std::mem::take(input_data), !finished, &self.aborting) .await?; } if finished { @@ -450,7 +442,8 @@ where unreachable!() }; assert!(input_data.is_empty()); - let OutputData { block, finish, .. } = spill_sort.on_restore().await?; + let OutputData { block, finish, .. } = + spill_sort.on_restore(&self.memory_settings).await?; self.output_data.extend(block); if finish { self.state = State::Finish diff --git a/src/query/service/src/pipelines/processors/transforms/sort/mod.rs b/src/query/service/src/pipelines/processors/transforms/sort/mod.rs index 3cbaf81b28a22..de16eec22901c 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/mod.rs @@ -36,6 +36,8 @@ mod sort_merge_stream; mod sort_restore; mod sort_route; mod sort_spill; +#[cfg(test)] +mod test_memory; pub use merge_sort::*; pub use sort_broadcast::*; @@ -66,13 +68,19 @@ local_block_meta_serde!(SortCollectedMeta); #[typetag::serde(name = "sort_collected")] impl BlockMetaInfo for SortCollectedMeta {} -trait MemoryRows { +trait RowsStat { + fn total_rows(&self) -> usize; + fn in_memory_rows(&self) -> usize; } -impl MemoryRows for Vec { +impl RowsStat for Vec { + fn total_rows(&self) -> usize { + self.iter().map(|b| b.num_rows()).sum::() + } + fn in_memory_rows(&self) -> usize { - self.iter().map(|s| s.num_rows()).sum::() + self.total_rows() } } diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs index 9642f82e903c7..66593c708f0df 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_builder.rs @@ -340,6 +340,7 @@ impl Build<'_> { self.output.clone(), self.params.new_base(), self.params.output_order_col, + self.params.memory_settings.clone(), )?)) } diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs index e569e5e0e6d7a..9c6e389f6d3f4 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_collect.rs @@ -17,6 +17,7 @@ use std::sync::atomic; use std::sync::atomic::AtomicBool; use std::sync::Arc; +use bytesize::ByteSize; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_expression::SortColumnDescription; @@ -34,7 +35,7 @@ use databend_common_pipeline_transforms::TransformSortMergeLimit; use super::sort_spill::SortSpill; use super::Base; -use super::MemoryRows; +use super::RowsStat; enum Inner { Collect(Vec), @@ -141,8 +142,8 @@ where fn collect_trans_to_spill(&mut self, input_data: Vec, no_spill: bool) { let (num_rows, num_bytes) = input_data .iter() - .map(|block| (block.num_rows(), block.memory_size())) - .fold((0, 0), |(acc_rows, acc_bytes), (rows, bytes)| { + .map(|block| (block.num_rows(), ByteSize(block.memory_size() as _))) + .fold((0, ByteSize(0)), |(acc_rows, acc_bytes), (rows, bytes)| { (acc_rows + rows, acc_bytes + bytes) }); assert!(num_rows > 0); @@ -171,20 +172,13 @@ where } } - fn determine_params(&self, bytes: usize, rows: usize) -> SortSpillParams { - // We use the first memory calculation to estimate the batch size and the number of merge. - let unit_size = self.memory_settings.spill_unit_size; - let num_merge = bytes.div_ceil(unit_size).max(2); - let batch_rows = rows.div_ceil(num_merge); - - /// The memory will be doubled during merging. - const MERGE_RATIO: usize = 2; - let num_merge = num_merge.div_ceil(MERGE_RATIO).max(2); - log::info!("determine sort spill params, buffer_bytes: {bytes}, buffer_rows: {rows}, spill_unit_size: {unit_size}, batch_rows: {batch_rows}, batch_num_merge {num_merge}"); - SortSpillParams { - batch_rows, - num_merge, - } + fn determine_params(&self, bytes: ByteSize, rows: usize) -> SortSpillParams { + SortSpillParams::determine( + bytes, + rows, + ByteSize(self.memory_settings.spill_unit_size as _), + self.max_block_size, + ) } fn collect_block(&mut self, block: DataBlock) -> Result<()> { @@ -214,20 +208,22 @@ where } fn check_spill(&self) -> bool { - if !self.memory_settings.check_spill() { - return false; - } - match &self.inner { Inner::Limit(limit_sort) => { - limit_sort.num_bytes() > self.memory_settings.spill_unit_size * 2 + self.memory_settings.check_spill() + && limit_sort.num_bytes() + >= ByteSize(self.memory_settings.spill_unit_size as _) * 2_u64 } Inner::Collect(input_data) => { - input_data.iter().map(|b| b.memory_size()).sum::() - > self.memory_settings.spill_unit_size * 2 + self.memory_settings.check_spill() + && input_data.iter().map(|b| b.memory_size()).sum::() + >= self.memory_settings.spill_unit_size * 2 } Inner::Spill(input_data, sort_spill) => { - input_data.in_memory_rows() > sort_spill.max_rows() + let rows = input_data.in_memory_rows(); + let params = sort_spill.params(); + self.memory_settings.check_spill() && rows >= params.batch_rows * 2 + || input_data.in_memory_rows() >= params.max_rows() } _ => unreachable!(), } @@ -335,20 +331,12 @@ where }; let incoming = input_data.in_memory_rows(); - let memory_rows = spill_sort.collect_memory_rows(); - let max = spill_sort.max_rows(); - - if memory_rows > 0 && memory_rows + incoming > max { - log::debug!(incoming_rows = incoming, memory_rows, max_rows = max; "collect_spill_last"); - spill_sort - .collect_spill_last(memory_rows + incoming - max) - .await?; - } - let need_spill = incoming > max; - if need_spill || finished && incoming > 0 { - log::debug!(incoming_rows = incoming, memory_rows, max_rows = max, finished; "sort_input_data"); + let incoming_block = input_data.len(); + if incoming > 0 { + let total_rows = spill_sort.collect_total_rows(); + log::debug!(incoming_block, incoming_rows = incoming, total_rows, finished; "sort_input_data"); spill_sort - .sort_input_data(std::mem::take(input_data), need_spill, &self.aborting) + .sort_input_data(std::mem::take(input_data), !finished, &self.aborting) .await?; } if finished { diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs index 51a8bc88e32d5..32595ac728bb0 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_restore.rs @@ -25,6 +25,7 @@ use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; use databend_common_pipeline_transforms::HookTransform; use databend_common_pipeline_transforms::HookTransformer; +use databend_common_pipeline_transforms::MemorySettings; use super::sort_spill::OutputData; use super::sort_spill::SortSpill; @@ -40,6 +41,7 @@ pub struct TransformSortRestore { /// If the next transform of current transform is [`super::transform_multi_sort_merge::MultiSortMergeProcessor`], /// we can generate and output the order column to avoid the extra converting in the next transform. remove_order_col: bool, + memory_settings: MemorySettings, base: Base, inner: Option>, @@ -53,6 +55,7 @@ where A: SortAlgorithm + Send + 'static output: Arc, base: Base, output_order_col: bool, + memory_settings: MemorySettings, ) -> Result> { Ok(HookTransformer::new(input, output, Self { input: Vec::new(), @@ -60,6 +63,7 @@ where A: SortAlgorithm + Send + 'static remove_order_col: !output_order_col, base, inner: None, + memory_settings, })) } } @@ -120,7 +124,7 @@ where block, bound: (bound_index, _), finish, - } = spill_sort.on_restore().await?; + } = spill_sort.on_restore(&self.memory_settings).await?; if let Some(block) = block { let mut block = block.add_meta(Some(SortBound::create(bound_index, SortBoundNext::More)))?; diff --git a/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs index ccb10d44c2777..1fe9b977a54aa 100644 --- a/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/sort/sort_spill.rs @@ -36,12 +36,13 @@ use databend_common_pipeline_transforms::processors::sort::Merger; use databend_common_pipeline_transforms::processors::sort::Rows; use databend_common_pipeline_transforms::processors::sort::SortedStream; use databend_common_pipeline_transforms::processors::SortSpillParams; +use databend_common_pipeline_transforms::MemorySettings; use rand::rngs::StdRng; use rand::SeedableRng; use super::bounds::Bounds; use super::Base; -use super::MemoryRows; +use super::RowsStat; use super::SortCollectedMeta; use crate::spillers::Location; use crate::spillers::Spiller; @@ -139,21 +140,14 @@ where A: SortAlgorithm .await } - pub async fn collect_spill_last(&mut self, target_rows: usize) -> Result<()> { - let Step::Collect(collect) = &mut self.step else { - unreachable!() - }; - collect.spill_last(&self.base, target_rows).await - } - - pub fn collect_memory_rows(&self) -> usize { + pub fn collect_total_rows(&self) -> usize { match &self.step { - Step::Collect(step_collect) => step_collect.streams.in_memory_rows(), + Step::Collect(step_collect) => step_collect.streams.total_rows(), _ => unreachable!(), } } - pub async fn on_restore(&mut self) -> Result { + pub async fn on_restore(&mut self, _memory_settings: &MemorySettings) -> Result { match &mut self.step { Step::Collect(collect) => self.step = Step::Sort(collect.next_step(&self.base)?), Step::Sort(_) => (), @@ -164,35 +158,40 @@ where A: SortAlgorithm }; if sort.output_merger.is_some() { - return sort.restore_and_output(&self.base).await; + return sort + .restore_and_output(&self.base, sort.params.num_merge) + .await; } while sort.current.is_empty() { sort.choice_streams_by_bound(); } + let num_merge = sort.recalculate_num_merge(); + assert!(num_merge >= 2); log::debug!( current_len = sort.current.len(), subsequent_len = sort.subsequent.len(), - params:? = sort.params; "restore status"); - if sort.current.len() <= sort.params.num_merge { - return sort.restore_and_output(&self.base).await; + num_merge, + batch_rows = sort.params.batch_rows; + "restore params"); + if sort.current.len() <= num_merge { + sort.restore_and_output(&self.base, num_merge).await + } else { + sort.merge_current(&self.base, num_merge).await?; + Ok(OutputData { + block: None, + bound: (u32::MAX, None), + finish: false, + }) } - - sort.merge_current(&self.base).await?; - Ok(OutputData { - block: None, - bound: (u32::MAX, None), - finish: false, - }) } - pub fn max_rows(&self) -> usize { - let params = match &self.step { + pub fn params(&self) -> SortSpillParams { + match &self.step { Step::Collect(collect) => collect.params, Step::Sort(sort) => sort.params, - }; - params.num_merge * params.batch_rows + } } #[expect(unused)] @@ -250,7 +249,11 @@ impl StepCollect { let sorted = if input_data.len() == 1 { let data = input_data.pop().unwrap(); - vec![base.new_block(data)].into() + let mut block = base.new_block(data); + if need_spill { + block.spill(&base.spiller).await?; + } + vec![block].into() } else { // todo: using multi-threaded cascade two-way merge sorting algorithm to obtain the best performance // also see https://arxiv.org/pdf/1406.2628 @@ -283,6 +286,7 @@ impl StepCollect { Ok(()) } + #[allow(dead_code)] #[fastrace::trace(name = "StepCollect::spill_last")] async fn spill_last(&mut self, base: &Base, target_rows: usize) -> Result<()> { let Some(s) = self.streams.last_mut() else { @@ -336,14 +340,11 @@ impl StepSort { } #[fastrace::trace(name = "StepSort::merge_current")] - async fn merge_current(&mut self, base: &Base) -> Result<()> { + async fn merge_current(&mut self, base: &Base, num_merge: usize) -> Result<()> { for s in &mut self.subsequent { s.spill(0).await?; } - let SortSpillParams { - batch_rows, - num_merge, - } = self.params; + let batch_rows = self.params.batch_rows; for (i, s) in self.current.iter_mut().rev().enumerate() { if i < num_merge { s.spill(1).await?; @@ -375,7 +376,7 @@ impl StepSort { } #[fastrace::trace(name = "StepSort::restore_and_output")] - async fn restore_and_output(&mut self, base: &Base) -> Result { + async fn restore_and_output(&mut self, base: &Base, num_merge: usize) -> Result { let merger = match self.output_merger.as_mut() { Some(merger) => merger, None => { @@ -406,7 +407,8 @@ impl StepSort { }); } - self.sort_spill(base, self.params).await?; + self.sort_spill(base, self.params.batch_rows, num_merge) + .await?; let streams = mem::take(&mut self.current); let merger = Merger::::create( @@ -471,14 +473,7 @@ impl StepSort { } #[fastrace::trace(name = "StepSort::sort_spill")] - async fn sort_spill( - &mut self, - base: &Base, - SortSpillParams { - batch_rows, - num_merge, - }: SortSpillParams, - ) -> Result<()> { + async fn sort_spill(&mut self, base: &Base, batch_rows: usize, num_merge: usize) -> Result<()> { let need = self .current .iter() @@ -547,6 +542,24 @@ impl StepSort { self.current.sort_by_key(|s| s.blocks[0].data.is_some()); } + + fn recalculate_num_merge(&self) -> usize { + if self.current.len() <= 2 { + return self.params.num_merge; + } + let mut max_rows = self.params.max_rows(); + let batch_rows = self.params.batch_rows; + let mut num_merge = 0; + for s in self.current.iter().rev() { + // Edge case: rows may not always equal batch_rows, recalculate num_merge to mitigate risk + let rows = s.blocks[0].rows.max(batch_rows); + if max_rows >= rows || num_merge < 2 { + num_merge += 1; + } + max_rows = max_rows.saturating_sub(rows); + } + num_merge + } } impl Base { @@ -613,7 +626,11 @@ impl Base { } } -impl MemoryRows for Vec> { +impl RowsStat for Vec> { + fn total_rows(&self) -> usize { + self.iter().map(|s| s.total_rows()).sum::() + } + fn in_memory_rows(&self) -> usize { self.iter().map(|s| s.in_memory_rows()).sum::() } @@ -804,6 +821,10 @@ impl BoundBlockStream { self.len() == 0 } + fn total_rows(&self) -> usize { + self.blocks.iter().map(|b| b.rows).sum() + } + fn in_memory_rows(&self) -> usize { self.blocks .iter() diff --git a/src/query/service/src/pipelines/processors/transforms/sort/test_memory.rs b/src/query/service/src/pipelines/processors/transforms/sort/test_memory.rs new file mode 100644 index 0000000000000..0b0ac50367e5c --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/sort/test_memory.rs @@ -0,0 +1,271 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use bytesize::ByteSize; +use databend_common_base::mem_allocator::TrackingGlobalAllocator; +use databend_common_base::runtime::GLOBAL_MEM_STAT; +use databend_common_catalog::table_context::TableContext; +use databend_common_config::SpillConfig; +use databend_common_exception::Result; +use databend_common_expression::types::decimal::DecimalSize; +use databend_common_expression::types::number::Int32Type; +use databend_common_expression::types::number::Int64Type; +use databend_common_expression::types::DateType; +use databend_common_expression::types::Decimal128Type; +use databend_common_expression::types::StringType; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::FromData; +use databend_common_storage::DataOperator; +use databend_storages_common_cache::TempDirManager; +use rand::Rng; + +use crate::sessions::QueryContext; +use crate::spillers::Location; +use crate::spillers::Spiller; +use crate::spillers::SpillerConfig; +use crate::spillers::SpillerDiskConfig; +use crate::spillers::SpillerType; +use crate::test_kits::ConfigBuilder; +use crate::test_kits::TestFixture; + +#[global_allocator] +pub static GLOBAL_ALLOCATOR: TrackingGlobalAllocator = TrackingGlobalAllocator::create(); + +pub struct MockDataGen; + +impl MockDataGen { + /// Generate a complete TPC-H orders table DataBlock with specified number of rows + pub fn generate_orders_table(rows: usize) -> DataBlock { + let columns = vec![ + Self::o_orderkey(rows), + Self::o_custkey(rows), + Self::o_orderstatus(rows), + Self::o_totalprice(rows).unwrap(), + Self::o_orderdate(rows).unwrap(), + Self::o_orderpriority(rows), + Self::o_clerk(rows), + Self::o_shippriority(rows), + Self::o_comment(rows), + ]; + + DataBlock::new_from_columns(columns) + } + + pub fn o_orderkey(rows: usize) -> Column { + Int64Type::from_data((0..rows).map(|i| 215898241 + i as i64).collect()) + } + + pub fn o_custkey(rows: usize) -> Column { + let mut rng = rand::thread_rng(); + Int64Type::from_data((0..rows).map(|_| rng.gen_range(1..15000000)).collect()) + } + + pub fn o_orderstatus(rows: usize) -> Column { + let mut rng = rand::thread_rng(); + let statuses = ["O", "F", "P"]; + StringType::from_data( + (0..rows) + .map(|_| statuses[rng.gen_range(0..statuses.len())].to_string()) + .collect(), + ) + } + + pub fn o_totalprice(rows: usize) -> Result { + let mut rng = rand::thread_rng(); + + // Generate random decimal values between 1000.00 and 500000.00 + let decimal_values: Vec = (0..rows) + .map(|_| { + let random_price = rng.gen_range(1000.0..500000.0); + // Scale by 2 decimal places (multiply by 100) + (random_price * 100.0) as i128 + }) + .collect(); + + let size = DecimalSize::new_unchecked(15, 2); + Ok(Decimal128Type::from_data_with_size( + decimal_values, + Some(size), + )) + } + + pub fn o_orderdate(rows: usize) -> Result { + let mut rng = rand::thread_rng(); + + // Generate random dates between 1992-01-01 and 1998-12-31 + // Date range: from day 8036 (1992-01-01) to day 10592 (1998-12-31) + let start_day = 8036; // 1992-01-01 + let end_day = 10592; // 1998-12-31 + + let date_values: Vec = (0..rows) + .map(|_| rng.gen_range(start_day..=end_day)) + .collect(); + + Ok(DateType::from_data(date_values)) + } + + pub fn o_orderpriority(rows: usize) -> Column { + let mut rng = rand::thread_rng(); + let priorities = ["1-URGENT", "2-HIGH", "3-MEDIUM", "4-NOT SPECIFIED", "5-LOW"]; + StringType::from_data( + (0..rows) + .map(|_| priorities[rng.gen_range(0..priorities.len())].to_string()) + .collect(), + ) + } + + pub fn o_clerk(rows: usize) -> Column { + let mut rng = rand::thread_rng(); + StringType::from_data( + (0..rows) + .map(|_| { + let clerk_id = rng.gen_range(1..1000000); + format!("Clerk#{:09}", clerk_id) + }) + .collect(), + ) + } + + pub fn o_shippriority(rows: usize) -> Column { + Int32Type::from_data(vec![0; rows]) + } + + pub fn o_comment(rows: usize) -> Column { + let mut rng = rand::thread_rng(); + let comment_parts = [ + "carefully", + "quickly", + "furiously", + "fluffily", + "blithely", + "express", + "ironic", + "special", + "regular", + "final", + "deposits", + "packages", + "instructions", + "requests", + "accounts", + "above", + "across", + "against", + "along", + "around", + "the", + "some", + "all", + "many", + "few", + ]; + + StringType::from_data( + (0..rows) + .map(|_| { + let num_words = rng.gen_range(3..8); + let words: Vec<&str> = (0..num_words) + .map(|_| comment_parts[rng.gen_range(0..comment_parts.len())]) + .collect(); + words.join(" ") + }) + .collect(), + ) + } +} + +async fn init() -> Result<(TestFixture, Arc, Spiller)> { + let mut config = ConfigBuilder::create().config(); + config.spill = SpillConfig::new_for_test("test_data".to_string(), 0.01, 1 << 30); + let fixture = TestFixture::setup_with_config(&config).await?; + + let ctx = fixture.new_query_ctx().await?; + + let temp_dir_manager = TempDirManager::instance(); + let disk_spill = temp_dir_manager + .get_disk_spill_dir(1024 * 1024 * 1024 * 10, &ctx.get_id()) + .map(|temp_dir| SpillerDiskConfig::new(temp_dir, true)) + .transpose()?; + + // Create spiller configuration (using remote storage) + let location_prefix = ctx.query_id_spill_prefix(); + let spiller_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, + location_prefix, + disk_spill, + use_parquet: true, + }; + let operator = DataOperator::instance().spill_operator(); + let spiller = Spiller::create(ctx.clone(), operator, spiller_config)?; + Ok((fixture, ctx, spiller)) +} + +#[derive(Debug)] +#[allow(dead_code)] +pub struct TestResult { + pub rows: usize, + pub memory_size: ByteSize, + pub spill_size: ByteSize, + pub memory_usage_real: ByteSize, +} + +async fn run_spill_test(spiller: &Spiller, rows: usize) -> Result { + let data_block = MockDataGen::generate_orders_table(rows); + let memory_size = ByteSize(data_block.memory_size() as _); + + // Spill the data block + let Location::Local(path) = spiller.spill(vec![data_block]).await? else { + unreachable!() + }; + + let spill_size = ByteSize(path.metadata().unwrap().len()); + + let before = GLOBAL_MEM_STAT.get_memory_usage(); + let data_blocks = (0..100) + .map(|_| MockDataGen::generate_orders_table(rows)) + .collect::>(); + let real_usage = (GLOBAL_MEM_STAT.get_memory_usage() - before) / 100; + drop(data_blocks); + + Ok(TestResult { + rows, + memory_size, + spill_size, + memory_usage_real: ByteSize(real_usage as _), + }) +} + +// #[tokio::test] manual test only +#[allow(dead_code)] +async fn test_block_spill_sizes() -> Result<()> { + let (fixture, _, spiller) = init().await?; + fixture.keep_alive(); + + // Test different row counts + let row_counts = vec![1000, 10000, 50000, 65535]; + for rows in row_counts { + let result = run_spill_test(&spiller, rows).await?; + println!("{result:?}"); + } + + // TestResult { rows: 1000, memory_size: 145.9 KiB (149387 bytes), spill_size: 51.4 KiB (52645 bytes), memory_usage_real: 164.2 KiB (168147 bytes) } + // TestResult { rows: 10000, memory_size: 1.4 MiB (1490811 bytes), spill_size: 470.3 KiB (481620 bytes), memory_usage_real: 1.8 MiB (1854174 bytes) } + // TestResult { rows: 50000, memory_size: 7.1 MiB (7448980 bytes), spill_size: 2.2 MiB (2348600 bytes), memory_usage_real: 8.2 MiB (8582166 bytes) } + // TestResult { rows: 65535, memory_size: 9.3 MiB (9761210 bytes), spill_size: 2.9 MiB (3045927 bytes), memory_usage_real: 11.7 MiB (12298628 bytes) } + + Ok(()) +} diff --git a/src/query/service/src/test_kits/fixture.rs b/src/query/service/src/test_kits/fixture.rs index 9603f86704f8f..e772288c89088 100644 --- a/src/query/service/src/test_kits/fixture.rs +++ b/src/query/service/src/test_kits/fixture.rs @@ -141,6 +141,10 @@ impl Setup for OSSSetup { } impl TestFixture { + /// Extends the lifetime of TestFixture to prevent GlobalInstance from being used after drop, + /// typically used across await points + pub fn keep_alive(&self) {} + /// Create a new TestFixture with default config. pub async fn setup() -> Result { let config = ConfigBuilder::create().config(); diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 1f974c749493b..0c2b882b32f32 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -676,11 +676,11 @@ impl DefaultSettings { range: Some(SettingRange::Numeric(0..=u64::MAX)), }), ("sort_spilling_batch_bytes", DefaultSettingValue { - value: UserSettingValue::UInt64(8 * 1024 * 1024), + value: UserSettingValue::UInt64(20 * 1024 * 1024), desc: "Sets the uncompressed size that merge sorter will spill to storage", mode: SettingMode::Both, scope: SettingScope::Both, - range: Some(SettingRange::Numeric(4 * 1024..=u64::MAX)), + range: Some(SettingRange::Numeric(1024 * 1024..=u64::MAX)), }), ("enable_shuffle_sort", DefaultSettingValue { value: UserSettingValue::UInt64(0),