diff --git a/Cargo.lock b/Cargo.lock index f7c7e6f8c1994..2816f38e03659 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4250,6 +4250,7 @@ dependencies = [ "regex", "reqwest", "serde", + "simple_hll", "thiserror 1.0.69", "url", ] @@ -5635,7 +5636,6 @@ dependencies = [ "rmp-serde", "serde", "serde_json", - "simple_hll", "snap", "typetag", "zstd 0.12.4", @@ -14021,9 +14021,9 @@ dependencies = [ [[package]] name = "simple_hll" -version = "0.0.1" +version = "0.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbdc537413bd6a291f57e2cc0a17579beb5ccaeea534e9c3001e39d9a07fa14f" +checksum = "23fffa79a74515ff36cfed9899bafa88ea5eeae07512a130aa7c50a870d96e02" dependencies = [ "ahash 0.8.12", "borsh", diff --git a/Cargo.toml b/Cargo.toml index db3e6706dcda0..2b0e11c0e2df2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -491,7 +491,7 @@ sha1 = "0.10.5" sha2 = "0.10.8" simdutf8 = "0.1.4" similar = "2.7.0" -simple_hll = { version = "0.0.1", features = ["serde_borsh"] } +simple_hll = { version = "0.0.4", features = ["serde_borsh"] } simsearch = "0.2" siphasher = "0.3" sled = { version = "0.34", default-features = false } diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index 4d4d0e4f205d5..0333ffa588500 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -37,6 +37,7 @@ prometheus-client = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } +simple_hll = { workspace = true, features = ["serde_borsh"] } thiserror = { workspace = true } url = { workspace = true } diff --git a/src/common/storage/src/lib.rs b/src/common/storage/src/lib.rs index aef8bc4cc86b6..62e7462f35235 100644 --- a/src/common/storage/src/lib.rs +++ b/src/common/storage/src/lib.rs @@ -85,6 +85,7 @@ pub use histogram::HistogramBucket; pub use histogram::DEFAULT_HISTOGRAM_BUCKETS; pub use merge::MutationStatus; pub use meta_hll::MetaHLL; +pub use meta_hll::MetaHLL12; pub use multi_table_insert::MultiTableInsertStatus; pub use statistics::Datum; pub use statistics::F64; diff --git a/src/common/storage/src/meta_hll.rs b/src/common/storage/src/meta_hll.rs index dff041f9b155e..6f347bb808d30 100644 --- a/src/common/storage/src/meta_hll.rs +++ b/src/common/storage/src/meta_hll.rs @@ -18,6 +18,8 @@ use std::hash::Hash; use ahash::RandomState; +pub type MetaHLL12 = simple_hll::HyperLogLog<12>; + const P: usize = 7_usize; const Q: usize = 64 - P; const M: usize = 1 << P; @@ -51,6 +53,11 @@ impl MetaHLL { } } + pub fn with_registers(registers: Vec) -> Self { + assert_eq!(registers.len(), M); + Self { registers } + } + /// Adds an hash to the MetaHLL. /// hash value is dertermined by caller #[inline] @@ -67,7 +74,7 @@ impl MetaHLL { /// Adds an object to the MetaHLL. /// Though we could pass different types into this method, caller should notice that - pub fn add_object(&mut self, obj: &T) { + pub fn add_object(&mut self, obj: &T) { let hash = SEED.hash_one(obj); self.add_hash(hash); } @@ -161,6 +168,20 @@ fn hll_tau(x: f64) -> f64 { } } +impl From for MetaHLL { + fn from(value: MetaHLL12) -> Self { + let registers = value.get_registers(); + let mut new_registers = vec![0; M]; + let group_size = registers.len() / M; + for i in 0..M { + for j in 0..group_size { + new_registers[i] = new_registers[i].max(registers[i * group_size + j]); + } + } + Self::with_registers(new_registers) + } +} + #[derive(serde::Serialize, borsh::BorshSerialize)] enum MetaHLLVariantRef<'a> { Empty, @@ -366,4 +387,20 @@ mod tests { } compare_with_delta(hll.count(), 1000); } + + #[test] + fn test_from_hll() { + let mut hll = MetaHLL12::new(); + for i in 0..100_000 { + hll.add_object(&i); + } + + let hll = MetaHLL::from(hll); + let count = hll.count(); + let error_rate = 1.04 / ((M as f64).sqrt()); + let diff = count as f64 / 100_000f64; + + assert!(diff >= 1.0 - error_rate); + assert!(diff <= 1.0 + error_rate); + } } diff --git a/src/query/service/src/interpreters/interpreter_table_analyze.rs b/src/query/service/src/interpreters/interpreter_table_analyze.rs index 0609ca305a83d..d4bfdf092f82f 100644 --- a/src/query/service/src/interpreters/interpreter_table_analyze.rs +++ b/src/query/service/src/interpreters/interpreter_table_analyze.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use chrono::Utc; +use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -26,20 +26,19 @@ use databend_common_sql::plans::Plan; use databend_common_sql::BindContext; use databend_common_sql::Planner; use databend_common_storage::DEFAULT_HISTOGRAM_BUCKETS; -use databend_common_storages_factory::NavigationPoint; use databend_common_storages_factory::Table; use databend_common_storages_fuse::operations::AnalyzeLightMutator; use databend_common_storages_fuse::operations::HistogramInfoSink; +use databend_common_storages_fuse::FuseLazyPartInfo; use databend_common_storages_fuse::FuseTable; +use databend_storages_common_cache::Partitions; use databend_storages_common_index::Index; use databend_storages_common_index::RangeIndex; -use itertools::Itertools; use log::info; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline; -use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; @@ -133,82 +132,23 @@ impl Interpreter for AnalyzeTableInterpreter { return Ok(PipelineBuildResult::create()); } - let table_statistics = table - .read_table_snapshot_statistics(Some(&snapshot)) - .await?; - - // plan sql - let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics { - let is_full = match table - .navigate_to_point( - &NavigationPoint::SnapshotID(table_statistics.snapshot_id.simple().to_string()), - self.ctx.clone().get_abort_checker(), - ) - .await - { - Ok(t) => !t - .read_table_snapshot() - .await - .is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())), - Err(_) => true, - }; - - let temporal_str = if is_full { - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()) - } else { - // analyze only need to collect the added blocks. - let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp()); - format!( - "CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}", - table_statistics.snapshot_id.simple(), - snapshot.snapshot_id.simple(), - ) - }; - (is_full, temporal_str) - } else { - ( - true, - format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()), - ) - }; + let mut parts = Vec::with_capacity(snapshot.segments.len()); + for (idx, segment_location) in snapshot.segments.iter().enumerate() { + parts.push(FuseLazyPartInfo::create(idx, segment_location.clone())); + } + self.ctx + .set_partitions(Partitions::create(PartitionsShuffleKind::Mod, parts))?; + let mut build_res = PipelineBuildResult::create(); + // After profiling, computing histogram is heavy and the bottleneck is window function(90%). + // It's possible to OOM if the table is too large and spilling isn't enabled. + // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed). + let mut histogram_info_receivers = HashMap::new(); let quote = self .ctx .get_settings() .get_sql_dialect()? .default_ident_quote(); - - // 0.01625 --> 12 buckets --> 4K size per column - // 1.04 / math.sqrt(1<<12) --> 0.01625 - const DISTINCT_ERROR_RATE: f64 = 0.01625; - let ndv_select_expr = snapshot - .schema - .fields() - .iter() - .filter(|f| RangeIndex::supported_type(&f.data_type().into())) - .map(|f| { - format!( - "approx_count_distinct_state({DISTINCT_ERROR_RATE})({quote}{}{quote}) as ndv_{}", - f.name, - f.column_id() - ) - }) - .join(", "); - - let sql = format!( - "SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}", - plan.database, plan.table, - ); - - info!("Analyze via sql: {sql}"); - - let (physical_plan, bind_context) = self.plan_sql(sql, false).await?; - let mut build_res = - build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; - // After profiling, computing histogram is heavy and the bottleneck is window function(90%). - // It's possible to OOM if the table is too large and spilling isn't enabled. - // We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed). - let mut histogram_info_receivers = HashMap::new(); if self.ctx.get_settings().get_enable_analyze_histogram()? { let histogram_sqls = table .schema() @@ -262,12 +202,8 @@ impl Interpreter for AnalyzeTableInterpreter { histogram_info_receivers.insert(col_id, rx); } } - FuseTable::do_analyze( + table.do_analyze( self.ctx.clone(), - bind_context.output_schema(), - &self.plan.catalog, - &self.plan.database, - &self.plan.table, snapshot.snapshot_id, &mut build_res.main_pipeline, histogram_info_receivers, diff --git a/src/query/service/src/pipelines/builders/builder_mutation_source.rs b/src/query/service/src/pipelines/builders/builder_mutation_source.rs index 69ab3ef6fa316..223b418a1a684 100644 --- a/src/query/service/src/pipelines/builders/builder_mutation_source.rs +++ b/src/query/service/src/pipelines/builders/builder_mutation_source.rs @@ -80,7 +80,7 @@ impl PipelineBuilder { Vec::with_capacity(mutation_source.partitions.partitions.len()); for part in &mutation_source.partitions.partitions { // Safe to downcast because we know the partition is lazy - let part: &FuseLazyPartInfo = FuseLazyPartInfo::from_part(part)?; + let part = FuseLazyPartInfo::from_part(part)?; segment_locations.push(SegmentLocation { segment_idx: part.segment_index, location: part.segment_location.clone(), diff --git a/src/query/service/src/test_kits/check.rs b/src/query/service/src/test_kits/check.rs index 5a0ebf2296757..119349af52d18 100644 --- a/src/query/service/src/test_kits/check.rs +++ b/src/query/service/src/test_kits/check.rs @@ -25,6 +25,7 @@ use databend_common_storages_fuse::operations::load_last_snapshot_hint; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX; +use databend_common_storages_fuse::FUSE_TBL_SEGMENT_STATISTICS_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX; use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX; use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; @@ -79,7 +80,7 @@ pub async fn check_data_dir( segment_count: u32, block_count: u32, index_count: u32, - _block_stat_count: u32, + segment_stats_count: u32, check_last_snapshot: Option<()>, check_table_statistic_file: Option<()>, ) -> Result<()> { @@ -91,6 +92,7 @@ pub async fn check_data_dir( let mut ss_count = 0; let mut ts_count = 0; let mut sg_count = 0; + let mut hs_count = 0; let mut b_count = 0; let mut i_count = 0; let mut table_statistic_files = vec![]; @@ -99,6 +101,7 @@ pub async fn check_data_dir( let prefix_segment = FUSE_TBL_SEGMENT_PREFIX; let prefix_block = FUSE_TBL_BLOCK_PREFIX; let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; + let prefix_segment_stats = FUSE_TBL_SEGMENT_STATISTICS_PREFIX; for entry in WalkDir::new(root) { let entry = entry.unwrap(); if entry.file_type().is_file() { @@ -117,6 +120,8 @@ pub async fn check_data_dir( } else if path.starts_with(prefix_snapshot_statistics) { ts_count += 1; table_statistic_files.push(entry_path.to_string()); + } else if path.starts_with(prefix_segment_stats) { + hs_count += 1; } } } @@ -136,6 +141,11 @@ pub async fn check_data_dir( "case [{}], check segment count", case_name ); + assert_eq!( + hs_count, segment_stats_count, + "case [{}], check segment statistics count", + case_name + ); assert_eq!( b_count, block_count, diff --git a/src/query/service/tests/it/storages/fuse/operations/analyze.rs b/src/query/service/tests/it/storages/fuse/operations/analyze.rs index 0b5ee07035bc5..310fbad458e26 100644 --- a/src/query/service/tests/it/storages/fuse/operations/analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/analyze.rs @@ -100,23 +100,12 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> { let case_name = "analyze_statistic_purge"; do_insertions(&fixture).await?; - // optimize statistics three times - for i in 0..3 { - analyze_table(&fixture).await?; - check_data_dir( - &fixture, - case_name, - 3 + i, - 1 + i, - 2, - 2, - 2, - 2, - Some(()), - None, - ) - .await?; - } + analyze_table(&fixture).await?; + check_data_dir(&fixture, case_name, 3, 1, 2, 2, 2, 2, Some(()), None).await?; + + append_sample_data(1, &fixture).await?; + analyze_table(&fixture).await?; + check_data_dir(&fixture, case_name, 5, 2, 3, 3, 3, 3, Some(()), None).await?; // Purge will keep at least two snapshots. let table = fixture.latest_default_table().await?; @@ -126,7 +115,7 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> { fuse_table .do_purge(&table_ctx, snapshot_files, None, true, false) .await?; - check_data_dir(&fixture, case_name, 1, 1, 1, 1, 1, 1, Some(()), Some(())).await?; + check_data_dir(&fixture, case_name, 1, 1, 2, 2, 2, 2, Some(()), Some(())).await?; Ok(()) } diff --git a/src/query/service/tests/it/storages/fuse/operations/gc.rs b/src/query/service/tests/it/storages/fuse/operations/gc.rs index bb60f0dbf240f..0a69e421e84ea 100644 --- a/src/query/service/tests/it/storages/fuse/operations/gc.rs +++ b/src/query/service/tests/it/storages/fuse/operations/gc.rs @@ -252,7 +252,7 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> { let expected_num_of_segment = 3; let expected_num_of_blocks = 3; let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_block_stats = expected_num_of_blocks; + let expected_num_of_segment_stats = expected_num_of_segment; check_data_dir( &fixture, "do_gc: verify retention period", @@ -261,7 +261,7 @@ async fn test_fuse_purge_orphan_retention() -> Result<()> { expected_num_of_segment, expected_num_of_blocks, expected_num_of_index, - expected_num_of_block_stats, + expected_num_of_segment_stats, Some(()), None, ) @@ -300,7 +300,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { let expected_num_of_segment = 3; let expected_num_of_blocks = 6; let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_block_stats = expected_num_of_blocks; + let expected_num_of_segment_stats = expected_num_of_segment; check_data_dir( &fixture, "do_gc: navigate to time point", @@ -309,7 +309,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { expected_num_of_segment, expected_num_of_blocks, expected_num_of_index, - expected_num_of_block_stats, + expected_num_of_segment_stats, Some(()), None, ) @@ -323,7 +323,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { { let table = fixture.latest_default_table().await?; compact_segment(ctx.clone(), &table).await?; - check_data_dir(&fixture, "", 4, 0, 5, 7, 7, 7, Some(()), None).await?; + check_data_dir(&fixture, "", 4, 0, 5, 7, 7, 5, Some(()), None).await?; } let table = fixture.latest_default_table().await?; @@ -339,7 +339,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { let expected_num_of_segment = 1; let expected_num_of_blocks = 7; let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_block_stats = expected_num_of_blocks; + let expected_num_of_segment_stats = expected_num_of_segment; check_data_dir( &fixture, "do_gc: with older version", @@ -348,7 +348,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { expected_num_of_segment, expected_num_of_blocks, expected_num_of_index, - expected_num_of_block_stats, + expected_num_of_segment_stats, Some(()), None, ) @@ -365,7 +365,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { let expected_num_of_segment = 0; let expected_num_of_blocks = 0; let expected_num_of_index = expected_num_of_blocks; - let expected_num_of_block_stats = expected_num_of_blocks; + let expected_num_of_segment_stats = expected_num_of_segment; check_data_dir( &fixture, "do_gc: purge last snapshot", @@ -374,7 +374,7 @@ async fn test_fuse_purge_older_version() -> Result<()> { expected_num_of_segment, expected_num_of_blocks, expected_num_of_index, - expected_num_of_block_stats, + expected_num_of_segment_stats, Some(()), None, ) diff --git a/src/query/service/tests/it/storages/fuse/operations/purge_drop.rs b/src/query/service/tests/it/storages/fuse/operations/purge_drop.rs index 54b4a535c9946..c79b3f958d989 100644 --- a/src/query/service/tests/it/storages/fuse/operations/purge_drop.rs +++ b/src/query/service/tests/it/storages/fuse/operations/purge_drop.rs @@ -54,7 +54,7 @@ async fn test_fuse_snapshot_truncate_in_drop_all_stmt() -> Result<()> { 1, // 0 segments 1, // 0 blocks 1, // 0 index - 1, // 0 block statistic + 1, // 0 segment statistic None, None, ) @@ -72,7 +72,7 @@ async fn test_fuse_snapshot_truncate_in_drop_all_stmt() -> Result<()> { 0, // 0 segments 0, // 0 blocks 0, // 0 index - 0, // 0 block statistic + 0, // 0 segment statistic None, None, ) diff --git a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs index 125b3ded06e39..92603d4ad6cd0 100644 --- a/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs +++ b/src/query/service/tests/it/storages/fuse/operations/table_analyze.rs @@ -23,6 +23,7 @@ use databend_common_expression::types::number::NumberScalar; use databend_common_expression::ColumnId; use databend_common_expression::Scalar; use databend_common_io::prelude::borsh_deserialize_from_slice; +use databend_common_storage::MetaHLL12; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::MetaWriter; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; @@ -33,7 +34,7 @@ use databend_query::sql::plans::Plan; use databend_query::sql::Planner; use databend_query::test_kits::*; use databend_storages_common_cache::LoadParams; -use databend_storages_common_table_meta::meta::MetaHLL12; +use databend_storages_common_table_meta::meta::testing::TableSnapshotStatisticsV3; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; @@ -218,8 +219,9 @@ async fn test_table_analyze_without_prev_table_seq() -> Result<()> { let col: Vec = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3]; let hll: HashMap = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]); - let table_statistics = - TableSnapshotStatistics::new(hll, HashMap::new(), snapshot_1.snapshot_id); + let table_statistics_v3 = + TableSnapshotStatisticsV3::new(hll, HashMap::new(), snapshot_1.snapshot_id); + let table_statistics = TableSnapshotStatistics::from(table_statistics_v3); let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid( &table_statistics.snapshot_id, table_statistics.format_version(), diff --git a/src/query/service/tests/it/storages/fuse/utils.rs b/src/query/service/tests/it/storages/fuse/utils.rs index 2dbf9d36de9dc..7ebb753ab17ab 100644 --- a/src/query/service/tests/it/storages/fuse/utils.rs +++ b/src/query/service/tests/it/storages/fuse/utils.rs @@ -54,7 +54,7 @@ pub async fn do_purge_test( segment_count: u32, block_count: u32, index_count: u32, - block_stat_count: u32, + segment_stat_count: u32, ) -> Result<()> { let fixture = TestFixture::setup().await?; fixture.create_default_database().await?; @@ -82,7 +82,7 @@ pub async fn do_purge_test( segment_count, block_count, index_count, - block_stat_count, + segment_stat_count, Some(()), None, ) diff --git a/src/query/storages/common/table_meta/Cargo.toml b/src/query/storages/common/table_meta/Cargo.toml index bba25e6f0f4dd..3a45db5535f46 100644 --- a/src/query/storages/common/table_meta/Cargo.toml +++ b/src/query/storages/common/table_meta/Cargo.toml @@ -30,7 +30,6 @@ parquet = { workspace = true } rmp-serde = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } -simple_hll = { workspace = true, features = ["serde_borsh"] } snap = { workspace = true, optional = true } typetag = { workspace = true } zstd = { workspace = true } diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index cdb11e4b13e02..4484ee7e81e0d 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -21,19 +21,17 @@ pub use v2::ColumnStatistics; pub use v2::DraftVirtualBlockMeta; pub use v2::DraftVirtualColumnMeta; pub use v2::ExtendedBlockMeta; -pub use v2::MetaHLL12; pub use v2::SegmentStatistics; pub use v2::Statistics; pub use v2::VirtualBlockMeta; pub use v2::VirtualColumnMeta; -pub use v3::TableSnapshotStatistics; pub use v4::CompactSegmentInfo; pub use v4::RawBlockMeta; pub use v4::SegmentInfo; pub use v4::TableSnapshot; pub use v4::TableSnapshotLite; +pub use v4::TableSnapshotStatistics; use super::v0; use super::v2; -use super::v3; use super::v4; diff --git a/src/query/storages/common/table_meta/src/meta/mod.rs b/src/query/storages/common/table_meta/src/meta/mod.rs index 5e1e67a68507f..e3e7c5b7491fe 100644 --- a/src/query/storages/common/table_meta/src/meta/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/mod.rs @@ -66,5 +66,6 @@ pub mod testing { pub use super::v2::TableSnapshot as TableSnapshotV2; pub use super::v3::SegmentInfo as SegmentInfoV3; pub use super::v3::TableSnapshot as TableSnapshotV3; + pub use super::v3::TableSnapshotStatistics as TableSnapshotStatisticsV3; pub use super::v4::TableSnapshot as TableSnapshotV4; } diff --git a/src/query/storages/common/table_meta/src/meta/v2/mod.rs b/src/query/storages/common/table_meta/src/meta/v2/mod.rs index 26e79eab9bc32..8025672d549fe 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/mod.rs @@ -32,5 +32,4 @@ pub use statistics::AdditionalStatsMeta; pub use statistics::ClusterStatistics; pub use statistics::ColumnStatistics; pub use statistics::Statistics; -pub use table_snapshot_statistics::MetaHLL12; pub use table_snapshot_statistics::TableSnapshotStatistics; diff --git a/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs b/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs index 86e4d06cab518..ecaf619b983f4 100644 --- a/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs +++ b/src/query/storages/common/table_meta/src/meta/v2/table_snapshot_statistics.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use databend_common_expression::ColumnId; +use databend_common_storage::MetaHLL12; use serde::Deserialize; use serde::Serialize; @@ -23,8 +24,6 @@ use crate::meta::FormatVersion; use crate::meta::SnapshotId; use crate::meta::Versioned; -pub type MetaHLL12 = simple_hll::HyperLogLog<12>; - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct TableSnapshotStatistics { /// format version of snapshot diff --git a/src/query/storages/common/table_meta/src/meta/v3/table_snapshot_statistics.rs b/src/query/storages/common/table_meta/src/meta/v3/table_snapshot_statistics.rs index 7cb19c1455421..8bcd744487e9f 100644 --- a/src/query/storages/common/table_meta/src/meta/v3/table_snapshot_statistics.rs +++ b/src/query/storages/common/table_meta/src/meta/v3/table_snapshot_statistics.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use databend_common_expression::ColumnId; use databend_common_storage::Histogram; +use databend_common_storage::MetaHLL12; use serde::Deserialize; use serde::Serialize; @@ -25,8 +26,6 @@ use crate::meta::FormatVersion; use crate::meta::SnapshotId; use crate::meta::Versioned; -pub type MetaHLL12 = simple_hll::HyperLogLog<12>; - #[derive(Serialize, Deserialize, Clone, Debug)] pub struct TableSnapshotStatistics { /// format version of statistics @@ -69,7 +68,7 @@ impl From for TableSnapshotStatistics { Self { format_version: TableSnapshotStatistics::VERSION, snapshot_id: value.snapshot_id, - hll: HashMap::new(), + hll: value.hll, histograms: HashMap::new(), } } diff --git a/src/query/storages/common/table_meta/src/meta/v4/mod.rs b/src/query/storages/common/table_meta/src/meta/v4/mod.rs index 6a596b9ec8807..dd538b7524877 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/mod.rs @@ -14,9 +14,11 @@ mod segment; mod snapshot; +mod table_snapshot_statistics; pub use segment::CompactSegmentInfo; pub use segment::RawBlockMeta; pub use segment::SegmentInfo; pub use snapshot::TableSnapshot; pub use snapshot::TableSnapshotLite; +pub use table_snapshot_statistics::TableSnapshotStatistics; diff --git a/src/query/storages/common/table_meta/src/meta/v4/table_snapshot_statistics.rs b/src/query/storages/common/table_meta/src/meta/v4/table_snapshot_statistics.rs new file mode 100644 index 0000000000000..0e359bb14e014 --- /dev/null +++ b/src/query/storages/common/table_meta/src/meta/v4/table_snapshot_statistics.rs @@ -0,0 +1,103 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use databend_common_expression::ColumnId; +use databend_common_storage::Histogram; +use databend_common_storage::MetaHLL; + +use crate::meta::v1; +use crate::meta::v2; +use crate::meta::v3; +use crate::meta::FormatVersion; +use crate::meta::SnapshotId; +use crate::meta::Versioned; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)] +pub struct TableSnapshotStatistics { + /// format version of snapshot + pub format_version: FormatVersion, + + pub snapshot_id: SnapshotId, + pub row_count: u64, + pub hll: HashMap, + pub histograms: HashMap, +} + +impl TableSnapshotStatistics { + pub fn new( + hll: HashMap, + histograms: HashMap, + snapshot_id: SnapshotId, + row_count: u64, + ) -> Self { + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id, + hll, + histograms, + row_count, + } + } + + pub fn format_version(&self) -> u64 { + self.format_version + } + + pub fn column_distinct_values(&self) -> HashMap { + self.hll + .iter() + .map(|hll| (*hll.0, hll.1.count() as u64)) + .collect() + } +} + +impl From for TableSnapshotStatistics { + fn from(value: v1::TableSnapshotStatistics) -> Self { + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id: value.snapshot_id, + row_count: 0, + hll: HashMap::new(), + histograms: HashMap::new(), + } + } +} + +impl From for TableSnapshotStatistics { + fn from(value: v2::TableSnapshotStatistics) -> Self { + let hll = value.hll.into_iter().map(|(k, v)| (k, v.into())).collect(); + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id: value.snapshot_id, + row_count: 0, + hll, + histograms: HashMap::new(), + } + } +} + +impl From for TableSnapshotStatistics { + fn from(value: v3::TableSnapshotStatistics) -> Self { + let hll = value.hll.into_iter().map(|(k, v)| (k, v.into())).collect(); + Self { + format_version: TableSnapshotStatistics::VERSION, + snapshot_id: value.snapshot_id, + row_count: 0, + hll, + histograms: value.histograms, + } + } +} diff --git a/src/query/storages/common/table_meta/src/meta/versions.rs b/src/query/storages/common/table_meta/src/meta/versions.rs index 98411980f7762..dac9276237d5e 100644 --- a/src/query/storages/common/table_meta/src/meta/versions.rs +++ b/src/query/storages/common/table_meta/src/meta/versions.rs @@ -105,6 +105,7 @@ impl SnapshotVersion { impl Versioned<0> for v1::TableSnapshotStatistics {} impl Versioned<2> for v2::TableSnapshotStatistics {} impl Versioned<3> for v3::TableSnapshotStatistics {} +impl Versioned<4> for v4::TableSnapshotStatistics {} impl Versioned<2> for DataBlock {} @@ -112,6 +113,7 @@ pub enum TableSnapshotStatisticsVersion { V0(PhantomData), V2(PhantomData), V3(PhantomData), + V4(PhantomData), } impl TableSnapshotStatisticsVersion { @@ -120,6 +122,7 @@ impl TableSnapshotStatisticsVersion { TableSnapshotStatisticsVersion::V0(a) => Self::ver(a), TableSnapshotStatisticsVersion::V2(a) => Self::ver(a), TableSnapshotStatisticsVersion::V3(a) => Self::ver(a), + TableSnapshotStatisticsVersion::V4(a) => Self::ver(a), } } @@ -206,8 +209,11 @@ mod converters { 3 => Ok(TableSnapshotStatisticsVersion::V3(testify_version::<_, 3>( PhantomData, ))), + 4 => Ok(TableSnapshotStatisticsVersion::V4(testify_version::<_, 4>( + PhantomData, + ))), _ => Err(ErrorCode::Internal(format!( - "unknown table snapshot statistics version {value}, versions supported: 0, 2, 3" + "unknown table snapshot statistics version {value}, versions supported: 0, 2, 3, 4" ))), } } diff --git a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs index 7ce87d65e098e..525b1932a9fae 100644 --- a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs +++ b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs @@ -43,7 +43,11 @@ impl VersionedReader for TableSnapshotStatisticsVersion let ts = load_json(reader, v)?; TableSnapshotStatistics::from(ts) } - TableSnapshotStatisticsVersion::V3(v) => load_json(reader, v)?, + TableSnapshotStatisticsVersion::V3(v) => { + let ts = load_json(reader, v)?; + TableSnapshotStatistics::from(ts) + } + TableSnapshotStatisticsVersion::V4(v) => load_json(reader, v)?, }; Ok(r) } diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index e389b9023927b..a98de49cdb430 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -55,6 +55,8 @@ static SNAPSHOT_STATISTICS_V2: TableSnapshotStatisticsVersion = static SNAPSHOT_STATISTICS_V3: TableSnapshotStatisticsVersion = TableSnapshotStatisticsVersion::V3(PhantomData); +static SNAPSHOT_STATISTICS_V4: TableSnapshotStatisticsVersion = + TableSnapshotStatisticsVersion::V4(PhantomData); #[derive(Clone)] pub struct TableMetaLocationGenerator { @@ -225,19 +227,30 @@ impl TableMetaLocationGenerator { } pub fn table_statistics_version(table_statistics_location: impl AsRef) -> u64 { - if table_statistics_location - .as_ref() - .ends_with(SNAPSHOT_STATISTICS_V0.suffix().as_str()) - { - SNAPSHOT_STATISTICS_V0.version() - } else if table_statistics_location - .as_ref() - .ends_with(SNAPSHOT_STATISTICS_V2.suffix().as_str()) - { - SNAPSHOT_STATISTICS_V2.version() - } else { - SNAPSHOT_STATISTICS_V3.version() - } + let version_map = [ + ( + SNAPSHOT_STATISTICS_V0.suffix(), + SNAPSHOT_STATISTICS_V0.version(), + ), + ( + SNAPSHOT_STATISTICS_V2.suffix(), + SNAPSHOT_STATISTICS_V2.version(), + ), + ( + SNAPSHOT_STATISTICS_V3.suffix(), + SNAPSHOT_STATISTICS_V3.version(), + ), + ( + SNAPSHOT_STATISTICS_V4.suffix(), + SNAPSHOT_STATISTICS_V4.version(), + ), + ]; + + version_map + .iter() + .find(|(suffix, _)| table_statistics_location.as_ref().ends_with(suffix)) + .map(|(_, version)| *version) + .unwrap_or(SNAPSHOT_STATISTICS_V4.version()) } pub fn gen_agg_index_location_from_block_location(loc: &str, index_id: u64) -> String { @@ -375,6 +388,7 @@ impl SnapshotLocationCreator for TableSnapshotStatisticsVersion { TableSnapshotStatisticsVersion::V0(_) => "_ts_v0.json".to_string(), TableSnapshotStatisticsVersion::V2(_) => "_ts_v2.json".to_string(), TableSnapshotStatisticsVersion::V3(_) => "_ts_v3.json".to_string(), + TableSnapshotStatisticsVersion::V4(_) => "_ts_v4.json".to_string(), } } } diff --git a/src/query/storages/fuse/src/io/read/meta/mod.rs b/src/query/storages/fuse/src/io/read/meta/mod.rs index 48800d39064d4..258b61bbb6650 100644 --- a/src/query/storages/fuse/src/io/read/meta/mod.rs +++ b/src/query/storages/fuse/src/io/read/meta/mod.rs @@ -17,4 +17,5 @@ mod meta_readers; pub use meta_readers::bytes_reader; pub use meta_readers::CompactSegmentInfoReader; pub use meta_readers::MetaReaders; +pub use meta_readers::SegmentStatsReader; pub use meta_readers::TableSnapshotReader; diff --git a/src/query/storages/fuse/src/operations/analyze.rs b/src/query/storages/fuse/src/operations/analyze.rs index 324446fa56231..1fc7464717e59 100644 --- a/src/query/storages/fuse/src/operations/analyze.rs +++ b/src/query/storages/fuse/src/operations/analyze.rs @@ -13,24 +13,36 @@ // limitations under the License. use std::any::Any; +use std::collections::BTreeMap; use std::collections::HashMap; +use std::fmt::Debug; +use std::fmt::Formatter; use std::sync::Arc; use std::time::Instant; use async_channel::Receiver; use async_channel::Sender; use async_trait::async_trait; -use databend_common_catalog::catalog::CatalogManager; +use databend_common_base::base::tokio::sync::Semaphore; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::TrySpawn; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; +use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; -use databend_common_io::prelude::borsh_deserialize_from_slice; +use databend_common_expression::FieldIndex; +use databend_common_expression::TableField; use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_core::Pipeline; @@ -39,17 +51,42 @@ use databend_common_pipeline_sinks::AsyncSinker; use databend_common_storage::Datum; use databend_common_storage::Histogram; use databend_common_storage::HistogramBucket; +use databend_common_storage::MetaHLL; +use databend_storages_common_cache::BlockMeta; +use databend_storages_common_cache::CompactSegmentInfo; +use databend_storages_common_cache::LoadParams; +use databend_storages_common_cache::SegmentStatistics; +use databend_storages_common_index::RangeIndex; +use databend_storages_common_io::ReadSettings; +use databend_storages_common_table_meta::meta::decode_column_hll; +use databend_storages_common_table_meta::meta::encode_column_hll; +use databend_storages_common_table_meta::meta::AdditionalStatsMeta; +use databend_storages_common_table_meta::meta::BlockHLL; use databend_storages_common_table_meta::meta::ClusterStatistics; -use databend_storages_common_table_meta::meta::MetaHLL12; +use databend_storages_common_table_meta::meta::Location; +use databend_storages_common_table_meta::meta::RawBlockHLL; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SnapshotId; +use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; - +use databend_storages_common_table_meta::meta::Versioned; +use opendal::Operator; + +use crate::io::build_column_hlls; +use crate::io::read::meta::SegmentStatsReader; +use crate::io::BlockReader; +use crate::io::CachedMetaWriter; +use crate::io::CompactSegmentInfoReader; +use crate::io::MetaReaders; use crate::io::SegmentsIO; +use crate::io::TableMetaLocationGenerator; +use crate::operations::acquire_task_permit; use crate::statistics::reduce_block_statistics; use crate::statistics::reduce_cluster_statistics; +use crate::FuseLazyPartInfo; +use crate::FuseStorageFormat; use crate::FuseTable; #[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq)] @@ -62,22 +99,21 @@ enum AnalyzeStep { impl FuseTable { #[allow(clippy::too_many_arguments)] pub fn do_analyze( + &self, ctx: Arc, - output_schema: Arc, - catalog: &str, - database: &str, - table: &str, snapshot_id: SnapshotId, pipeline: &mut Pipeline, histogram_info_receivers: HashMap>, ) -> Result<()> { + pipeline.add_source( + |output| AnalyzeCollectNDVSource::try_create(output, self, ctx.clone()), + ctx.get_settings().get_max_threads()? as usize, + )?; + pipeline.try_resize(1)?; pipeline.add_sink(|input| { SinkAnalyzeState::create( ctx.clone(), - output_schema.clone(), - catalog, - database, - table, + self, snapshot_id, input, histogram_info_receivers.clone(), @@ -89,17 +125,15 @@ impl FuseTable { struct SinkAnalyzeState { ctx: Arc, - output_schema: Arc, input_port: Arc, - catalog: String, - database: String, - table: String, + table: Arc, snapshot_id: SnapshotId, histogram_info_receivers: HashMap>, input_data: Option, committed: bool, - ndv_states: HashMap, + row_count: u64, + ndv_states: HashMap, histograms: HashMap, step: AnalyzeStep, } @@ -108,101 +142,26 @@ impl SinkAnalyzeState { #[allow(clippy::too_many_arguments)] pub fn create( ctx: Arc, - output_schema: Arc, - catalog: &str, - database: &str, - table: &str, + table: &FuseTable, snapshot_id: SnapshotId, - input: Arc, + input_port: Arc, histogram_info_receivers: HashMap>, ) -> Result { Ok(ProcessorPtr::create(Box::new(SinkAnalyzeState { ctx, - output_schema, - input_port: input, - catalog: catalog.to_string(), - database: database.to_string(), - table: table.to_string(), + input_port, + table: Arc::new(table.clone()), snapshot_id, histogram_info_receivers, input_data: None, committed: false, + row_count: 0, ndv_states: Default::default(), histograms: Default::default(), step: AnalyzeStep::CollectNDV, }))) } - async fn get_table(&self) -> Result> { - // always use the latest table - let tenant = self.ctx.get_tenant(); - let catalog = CatalogManager::instance() - .get_catalog( - tenant.tenant_name(), - &self.catalog, - self.ctx.session_state(), - ) - .await?; - let table = catalog - .get_table(&tenant, &self.database, &self.table) - .await?; - Ok(table) - } - - #[async_backtrace::framed] - pub async fn merge_analyze_states(&mut self, data_block: DataBlock) -> Result<()> { - if data_block.num_rows() == 0 { - return Ok(()); - } - let table = self.get_table().await?; - let table = FuseTable::try_from_table(table.as_ref())?; - let snapshot = table.read_table_snapshot().await?; - - if snapshot.is_none() { - return Ok(()); - } - let table_statistics = table - .read_table_snapshot_statistics(snapshot.as_ref()) - .await?; - - let is_full = data_block.columns()[self.output_schema.num_fields() - 1] - .index(0) - .unwrap(); - - let is_full = is_full.as_boolean().unwrap(); - - let mut ndv_states = table_statistics.map(|s| s.hll.clone()).unwrap_or_default(); - - let index_num = self.output_schema.num_fields() - 1; - - for (f, col) in self - .output_schema - .fields() - .iter() - .take(index_num) - .zip(data_block.columns()) - { - let name = f.name(); - let index: u32 = name.strip_prefix("ndv_").unwrap().parse().unwrap(); - - let col = col.index(0).unwrap(); - let data = col.as_tuple().unwrap()[0].as_binary().unwrap(); - let hll: MetaHLL12 = borsh_deserialize_from_slice(data)?; - - if !is_full { - ndv_states - .entry(index) - .and_modify(|c| c.merge(&hll)) - .or_insert(hll); - } else { - ndv_states.insert(index, hll); - } - } - - self.ndv_states = ndv_states; - Ok(()) - } - #[async_backtrace::framed] async fn create_histogram(&mut self, col_id: u32, data_block: DataBlock) -> Result<()> { if data_block.num_rows() == 0 { @@ -242,19 +201,23 @@ impl SinkAnalyzeState { Ok(()) } - async fn commit_statistics(&self) -> Result<()> { - let table = self.get_table().await?; + async fn commit_statistics(&mut self) -> Result<()> { + let table = self.table.refresh(self.ctx.as_ref()).await?; let table = FuseTable::try_from_table(table.as_ref())?; let snapshot = table.read_table_snapshot().await?; if snapshot.is_none() { return Ok(()); } let snapshot = snapshot.unwrap(); + let column_ids = snapshot.schema.to_leaf_column_id_set(); + self.ndv_states.retain(|k, _| column_ids.contains(k)); + // 3. Generate new table statistics let table_statistics = TableSnapshotStatistics::new( self.ndv_states.clone(), self.histograms.clone(), self.snapshot_id, + snapshot.summary.row_count, ); let table_statistics_location = table .meta_location_generator @@ -304,7 +267,7 @@ impl Processor for SinkAnalyzeState { if !self.input_port.has_data() { self.input_port.set_need_data(); } - return Ok(Event::Async); + return Ok(Event::Sync); } if self.input_port.is_finished() { @@ -328,21 +291,42 @@ impl Processor for SinkAnalyzeState { if self.input_port.has_data() { self.input_data = Some(self.input_port.pull_data().unwrap()?); - return Ok(Event::Async); + return Ok(Event::Sync); } self.input_port.set_need_data(); Ok(Event::NeedData) } - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { + fn process(&mut self) -> Result<()> { match self.step { AnalyzeStep::CollectNDV => { - if let Some(data_block) = self.input_data.take() { - self.merge_analyze_states(data_block.clone()).await?; + if let Some(mut data_block) = self.input_data.take() { + assert!(data_block.is_empty()); + if let Some(meta) = data_block + .take_meta() + .and_then(AnalyzeNDVMeta::downcast_from) + { + if meta.row_count > 0 { + for (column_id, column_hll) in meta.column_hlls.iter() { + self.ndv_states + .entry(*column_id) + .and_modify(|hll| hll.merge(column_hll)) + .or_insert_with(|| column_hll.clone()); + } + self.row_count += meta.row_count; + } + } } } + _ => unreachable!(), + } + Ok(()) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + match self.step { AnalyzeStep::CollectHistogram => { let mut finished_count = 0; let receivers = self.histogram_info_receivers.clone(); @@ -372,8 +356,9 @@ impl Processor for SinkAnalyzeState { } self.committed = true; } + _ => unreachable!(), } - return Ok(()); + Ok(()) } } @@ -461,3 +446,318 @@ impl AsyncSink for HistogramInfoSink { Ok(false) } } + +struct SegmentWithHLL { + segment_location: Location, + block_metas: Vec>, + origin_summary: Statistics, + raw_block_hlls: Vec, + + new_block_hlls: Vec>, + block_indexes: Vec, +} + +enum State { + ReadData(Option), + CollectNDV { + segment_location: Location, + segment_info: Arc, + block_hlls: Vec, + }, + BuildHLL, + MergeHLL, + WriteMeta, + Finish, +} + +pub struct AnalyzeCollectNDVSource { + state: State, + output: Arc, + column_hlls: HashMap, + row_count: u64, + + segment_with_hll: Option, + + ctx: Arc, + block_reader: Arc, + dal: Operator, + settings: ReadSettings, + storage_format: FuseStorageFormat, + segment_reader: CompactSegmentInfoReader, + stats_reader: SegmentStatsReader, + ndv_columns_map: BTreeMap, +} + +impl AnalyzeCollectNDVSource { + pub fn try_create( + output: Arc, + table: &FuseTable, + ctx: Arc, + ) -> Result { + let table_schema = table.schema(); + let ndv_columns_map = table + .approx_distinct_cols + .distinct_column_fields(table_schema.clone(), RangeIndex::supported_table_type)?; + let field_indices = ndv_columns_map.keys().cloned().collect(); + let projection = Projection::Columns(field_indices); + let block_reader = + table.create_block_reader(ctx.clone(), projection, false, false, false)?; + + let dal = table.get_operator(); + let settings = ReadSettings::from_ctx(&ctx)?; + let segment_reader = MetaReaders::segment_info_reader(dal.clone(), table_schema.clone()); + let stats_reader = MetaReaders::segment_stats_reader(dal.clone()); + Ok(ProcessorPtr::create(Box::new(Self { + state: State::ReadData(None), + output, + column_hlls: HashMap::new(), + row_count: 0, + segment_with_hll: None, + ctx, + block_reader, + dal, + settings, + storage_format: table.get_storage_format(), + segment_reader, + stats_reader, + ndv_columns_map, + }))) + } +} + +#[async_trait::async_trait] +impl Processor for AnalyzeCollectNDVSource { + fn name(&self) -> String { + "AnalyzeCollectNDVSource".to_string() + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if matches!(self.state, State::Finish) { + self.output.finish(); + return Ok(Event::Finished); + } + + if self.output.is_finished() { + return Ok(Event::Finished); + } + + if !self.output.can_push() { + return Ok(Event::NeedConsume); + } + + if matches!(self.state, State::ReadData(None)) { + if let Some(part) = self.ctx.get_partition() { + self.state = State::ReadData(Some(part)); + } else { + self.output + .push_data(Ok(DataBlock::empty_with_meta(Box::new(AnalyzeNDVMeta { + row_count: self.row_count, + column_hlls: std::mem::take(&mut self.column_hlls), + })))); + self.state = State::Finish; + return Ok(Event::NeedConsume); + } + } + + if matches!( + self.state, + State::ReadData(_) | State::BuildHLL | State::WriteMeta + ) { + Ok(Event::Async) + } else { + Ok(Event::Sync) + } + } + + fn process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Finish) { + State::CollectNDV { + segment_location, + segment_info, + block_hlls, + } => { + let mut indexes = vec![]; + for (idx, data) in block_hlls.iter().enumerate() { + let block_hll = decode_column_hll(data)?; + if let Some(column_hlls) = &block_hll { + for (column_id, column_hll) in column_hlls.iter() { + self.column_hlls + .entry(*column_id) + .and_modify(|hll| hll.merge(column_hll)) + .or_insert_with(|| column_hll.clone()); + } + } else { + indexes.push(idx); + } + } + + self.row_count += segment_info.summary.row_count; + if !indexes.is_empty() { + assert!(self.segment_with_hll.is_none()); + let new_hlls = Vec::with_capacity(indexes.len()); + self.segment_with_hll = Some(SegmentWithHLL { + segment_location, + block_metas: segment_info.block_metas()?, + origin_summary: segment_info.summary.clone(), + raw_block_hlls: block_hlls, + new_block_hlls: new_hlls, + block_indexes: indexes, + }); + self.state = State::BuildHLL; + } else { + self.state = State::ReadData(None); + } + } + State::MergeHLL => { + let segment_with_hll = self.segment_with_hll.as_mut().unwrap(); + let new_hlls = std::mem::take(&mut segment_with_hll.new_block_hlls); + for (idx, new) in new_hlls.into_iter().enumerate() { + if let Some(column_hlls) = new { + for (column_id, column_hll) in column_hlls.iter() { + self.column_hlls + .entry(*column_id) + .and_modify(|hll| hll.merge(column_hll)) + .or_insert_with(|| column_hll.clone()); + } + segment_with_hll.raw_block_hlls[idx] = encode_column_hll(&column_hlls)?; + } + } + self.state = State::WriteMeta; + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + match std::mem::replace(&mut self.state, State::Finish) { + State::ReadData(Some(part)) => { + let part = FuseLazyPartInfo::from_part(&part)?; + let (path, ver) = &part.segment_location; + if *ver < 2 { + self.state = State::ReadData(None); + return Ok(()); + } + let load_param = LoadParams { + location: path.clone(), + len_hint: None, + ver: *ver, + put_cache: true, + }; + let compact_segment_info = self.segment_reader.read(&load_param).await?; + + let block_count = compact_segment_info.summary.block_count as usize; + let block_hlls = + if let Some(meta) = &compact_segment_info.summary.additional_stats_meta { + let (path, ver) = &meta.location; + let load_param = LoadParams { + location: path.clone(), + len_hint: None, + ver: *ver, + put_cache: true, + }; + let stats = self.stats_reader.read(&load_param).await?; + stats.block_hlls.clone() + } else { + vec![vec![]; block_count] + }; + self.state = State::CollectNDV { + segment_location: part.segment_location.clone(), + segment_info: compact_segment_info, + block_hlls, + }; + } + State::BuildHLL => { + let segment_with_hll = self.segment_with_hll.as_mut().unwrap(); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_concurrency = std::cmp::max(max_threads * 2, 10); + let semaphore = Arc::new(Semaphore::new(max_concurrency)); + let runtime = GlobalIORuntime::instance(); + let mut handlers = Vec::with_capacity(segment_with_hll.block_indexes.len()); + for &idx in &segment_with_hll.block_indexes { + let permit = acquire_task_permit(semaphore.clone()).await?; + let block_reader = self.block_reader.clone(); + let settings = self.settings; + let storage_format = self.storage_format; + let block_meta = segment_with_hll.block_metas[idx].clone(); + let ndv_columns_map = self.ndv_columns_map.clone(); + let handler = runtime.spawn(async move { + let block = block_reader + .read_by_meta(&settings, &block_meta, &storage_format) + .await?; + let column_hlls = build_column_hlls(&block, &ndv_columns_map)?; + drop(permit); + Ok::<_, ErrorCode>(column_hlls) + }); + handlers.push(handler); + } + + let joint = futures::future::try_join_all(handlers).await.map_err(|e| { + ErrorCode::StorageOther(format!( + "[BLOCK-COMPACT] Failed to deserialize segment blocks: {}", + e + )) + })?; + let new_hlls = joint.into_iter().collect::>>()?; + if new_hlls.iter().all(|v| v.is_none()) { + self.segment_with_hll = None; + self.state = State::ReadData(None); + } else { + segment_with_hll.new_block_hlls = new_hlls; + self.state = State::MergeHLL; + } + } + State::WriteMeta => { + let SegmentWithHLL { + segment_location, + block_metas, + mut origin_summary, + raw_block_hlls, + .. + } = std::mem::take(&mut self.segment_with_hll).unwrap(); + + let segment_loc = segment_location.0.as_str(); + let data = SegmentStatistics::new(raw_block_hlls).to_bytes()?; + let segment_stats_location = + TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location( + segment_loc, + ); + let additional_stats_meta = AdditionalStatsMeta { + size: data.len() as u64, + location: (segment_stats_location.clone(), SegmentStatistics::VERSION), + }; + self.dal.write(&segment_stats_location, data).await?; + origin_summary.additional_stats_meta = Some(additional_stats_meta); + let new_segment = SegmentInfo::new(block_metas, origin_summary); + new_segment + .write_meta_through_cache(&self.dal, segment_loc) + .await?; + self.state = State::ReadData(None); + } + _ => return Err(ErrorCode::Internal("It's a bug.")), + } + Ok(()) + } +} + +#[derive(Clone)] +pub struct AnalyzeNDVMeta { + row_count: u64, + column_hlls: HashMap, +} + +impl Debug for AnalyzeNDVMeta { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + f.debug_struct("AnalyzeNDVMeta").finish() + } +} + +local_block_meta_serde!(AnalyzeNDVMeta); + +#[typetag::serde(name = "analyze_ndv")] +impl BlockMetaInfo for AnalyzeNDVMeta {} diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test index c77d1813d9da1..42523dca475d3 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0020_analyze.test @@ -129,7 +129,7 @@ analyze table `t` query TI select * from fuse_statistic('db_09_0020', 't') ---- -a 3 (empty) +a 2 (empty) statement ok create or replace table t1 as select number as a from numbers(10); @@ -243,5 +243,49 @@ select count() from fuse_snapshot('db_09_0020','t2'); statement ok DROP TABLE t2; +statement ok +CREATE TABLE t3(a int, b string) approx_distinct_columns = ''; + +statement ok +insert into t3 values(1, 'a'), (2, 'b'); + +statement ok +alter table t3 set options(approx_distinct_columns = 'a, b'); + +statement ok +insert into t3 values(4, 'c'), (3, 'b'); + +query B +select segment_stats_size is null from fuse_segment('db_09_0020','t3') order by file_location; +---- +1 +0 + +statement ok +analyze table t3; + +query B +select segment_stats_size is null from fuse_segment('db_09_0020','t3') order by file_location; +---- +0 +0 + +query TI +select column_name, distinct_count from fuse_statistic('db_09_0020', 't3') order by column_name; +---- +a 4 +b 3 + +statement ok +analyze table t3; + +query I +select count() from fuse_snapshot('db_09_0020', 't3'); +---- +4 + +statement ok +DROP TABLE t3; + statement ok DROP DATABASE db_09_0020 diff --git a/tests/sqllogictests/suites/tpch/join_order.test b/tests/sqllogictests/suites/tpch/join_order.test index 2e7d4f8486bb2..e13de3722ff44 100644 --- a/tests/sqllogictests/suites/tpch/join_order.test +++ b/tests/sqllogictests/suites/tpch/join_order.test @@ -312,21 +312,21 @@ HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build -│ │ └── Scan: default.tpch_test.nation (#4) (read rows: 25) +│ │ └── HashJoin: INNER +│ │ ├── Build +│ │ │ └── Scan: default.tpch_test.nation (#5) (read rows: 25) +│ │ └── Probe +│ │ └── Scan: default.tpch_test.customer (#3) (read rows: 150000) │ └── Probe -│ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) +│ └── Scan: default.tpch_test.orders (#2) (read rows: 1500000) └── Probe └── HashJoin: INNER ├── Build │ └── HashJoin: INNER │ ├── Build - │ │ └── HashJoin: INNER - │ │ ├── Build - │ │ │ └── Scan: default.tpch_test.nation (#5) (read rows: 25) - │ │ └── Probe - │ │ └── Scan: default.tpch_test.customer (#3) (read rows: 150000) + │ │ └── Scan: default.tpch_test.nation (#4) (read rows: 25) │ └── Probe - │ └── Scan: default.tpch_test.orders (#2) (read rows: 1500000) + │ └── Scan: default.tpch_test.supplier (#0) (read rows: 10000) └── Probe └── Scan: default.tpch_test.lineitem (#1) (read rows: 6001215) diff --git a/tests/suites/1_stateful/09_http_handler/09_0007_session.py b/tests/suites/1_stateful/09_http_handler/09_0007_session.py index 426e56ccd024f..32b22f574c96d 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0007_session.py +++ b/tests/suites/1_stateful/09_http_handler/09_0007_session.py @@ -70,7 +70,10 @@ def login(self): response = self.client.post( login_url, auth=auth, - headers={"Content-Type": "application/json", "X-DATABEND-CLIENT-CAPS": "session_header"}, + headers={ + "Content-Type": "application/json", + "X-DATABEND-CLIENT-CAPS": "session_header", + }, json=payload, ) return response diff --git a/tests/suites/1_stateful/09_http_handler/09_0008_forward.py b/tests/suites/1_stateful/09_http_handler/09_0008_forward.py index 7c249e467e520..5aa28f9c17443 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0008_forward.py +++ b/tests/suites/1_stateful/09_http_handler/09_0008_forward.py @@ -29,9 +29,12 @@ def do_query(query, port=8000, session=None, node_id=None, wait=100): def get_txn_state(resp): - return (resp.get("state") == "Succeeded", - resp.get("session").get("need_sticky"), - resp.get("session").get("txn_state")) + return ( + resp.get("state") == "Succeeded", + resp.get("session").get("need_sticky"), + resp.get("session").get("txn_state"), + ) + def test_txn_success(): resp = do_query("create or replace table t1(a int)").json() @@ -63,22 +66,16 @@ def test_txn_fail(): session = resp.get("session") # fail - resp = do_query( - "select 1/0", session=session - ).json() + resp = do_query("select 1/0", session=session).json() assert get_txn_state(resp) == (False, False, "Fail"), resp # fail because wrong node - resp = do_query( - "select 1", port=8002, session=session - ).json() + resp = do_query("select 1", port=8002, session=session).json() session = resp.get("session") assert get_txn_state(resp) == (False, False, "Fail"), resp # keep fail state until commit/abort - resp = do_query( - "select 1", session=session, node_id=node_id - ).json() + resp = do_query("select 1", session=session, node_id=node_id).json() assert get_txn_state(resp) == (False, False, "Fail"), resp session = resp.get("session") diff --git a/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py b/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py index fcb7e87522b39..4747c7d728310 100755 --- a/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py +++ b/tests/suites/1_stateful/09_http_handler/09_0009_cookie.py @@ -34,7 +34,6 @@ def do_query(session_client, query, session_state=None, enable_cookie=True): if enable_cookie: headers["X-DATABEND-CLIENT-CAPS"] = "session_cookie" - response = session_client.post(url, headers=headers, json=query_payload, auth=auth) return response @@ -85,7 +84,7 @@ def test_temp_table(): resp = do_query(client, "select * from t1", session_state) assert resp.status_code == 200, resp.text - assert resp.json()["data"] == [["3"], ["4"]],resp.json() + assert resp.json()["data"] == [["3"], ["4"]], resp.json() session_state = resp.json()["session"] assert session_state["need_sticky"], resp.text assert session_state["need_keep_alive"]