Skip to content

refactor: analyze table #18514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ sha1 = "0.10.5"
sha2 = "0.10.8"
simdutf8 = "0.1.4"
similar = "2.7.0"
simple_hll = { version = "0.0.1", features = ["serde_borsh"] }
simple_hll = { version = "0.0.4", features = ["serde_borsh"] }
simsearch = "0.2"
siphasher = "0.3"
sled = { version = "0.34", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ prometheus-client = { workspace = true }
regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
simple_hll = { workspace = true, features = ["serde_borsh"] }
thiserror = { workspace = true }
url = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ pub use histogram::HistogramBucket;
pub use histogram::DEFAULT_HISTOGRAM_BUCKETS;
pub use merge::MutationStatus;
pub use meta_hll::MetaHLL;
pub use meta_hll::MetaHLL12;
pub use multi_table_insert::MultiTableInsertStatus;
pub use statistics::Datum;
pub use statistics::F64;
39 changes: 38 additions & 1 deletion src/common/storage/src/meta_hll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::hash::Hash;

use ahash::RandomState;

pub type MetaHLL12 = simple_hll::HyperLogLog<12>;

const P: usize = 7_usize;
const Q: usize = 64 - P;
const M: usize = 1 << P;
Expand Down Expand Up @@ -51,6 +53,11 @@ impl MetaHLL {
}
}

pub fn with_registers(registers: Vec<u8>) -> Self {
assert_eq!(registers.len(), M);
Self { registers }
}

/// Adds an hash to the MetaHLL.
/// hash value is dertermined by caller
#[inline]
Expand All @@ -67,7 +74,7 @@ impl MetaHLL {

/// Adds an object to the MetaHLL.
/// Though we could pass different types into this method, caller should notice that
pub fn add_object<T: Hash>(&mut self, obj: &T) {
pub fn add_object<T: ?Sized + Hash>(&mut self, obj: &T) {
let hash = SEED.hash_one(obj);
self.add_hash(hash);
}
Expand Down Expand Up @@ -161,6 +168,20 @@ fn hll_tau(x: f64) -> f64 {
}
}

impl From<MetaHLL12> for MetaHLL {
fn from(value: MetaHLL12) -> Self {
let registers = value.get_registers();
let mut new_registers = vec![0; M];
let group_size = registers.len() / M;
for i in 0..M {
for j in 0..group_size {
new_registers[i] = new_registers[i].max(registers[i * group_size + j]);
}
}
Self::with_registers(new_registers)
}
}

#[derive(serde::Serialize, borsh::BorshSerialize)]
enum MetaHLLVariantRef<'a> {
Empty,
Expand Down Expand Up @@ -366,4 +387,20 @@ mod tests {
}
compare_with_delta(hll.count(), 1000);
}

#[test]
fn test_from_hll() {
let mut hll = MetaHLL12::new();
for i in 0..100_000 {
hll.add_object(&i);
}

let hll = MetaHLL::from(hll);
let count = hll.count();
let error_rate = 1.04 / ((M as f64).sqrt());
let diff = count as f64 / 100_000f64;

assert!(diff >= 1.0 - error_rate);
assert!(diff <= 1.0 + error_rate);
}
}
95 changes: 21 additions & 74 deletions src/query/service/src/interpreters/interpreter_table_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;

use chrono::Utc;
use databend_common_catalog::plan::PartitionsShuffleKind;
use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_pipeline_core::processors::ProcessorPtr;
Expand All @@ -26,20 +26,19 @@ use databend_common_sql::plans::Plan;
use databend_common_sql::BindContext;
use databend_common_sql::Planner;
use databend_common_storage::DEFAULT_HISTOGRAM_BUCKETS;
use databend_common_storages_factory::NavigationPoint;
use databend_common_storages_factory::Table;
use databend_common_storages_fuse::operations::AnalyzeLightMutator;
use databend_common_storages_fuse::operations::HistogramInfoSink;
use databend_common_storages_fuse::FuseLazyPartInfo;
use databend_common_storages_fuse::FuseTable;
use databend_storages_common_cache::Partitions;
use databend_storages_common_index::Index;
use databend_storages_common_index::RangeIndex;
use itertools::Itertools;
use log::info;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::schedulers::build_query_pipeline;
use crate::schedulers::build_query_pipeline_without_render_result_set;
use crate::sessions::QueryContext;
use crate::sessions::TableContext;

Expand Down Expand Up @@ -136,79 +135,30 @@ impl Interpreter for AnalyzeTableInterpreter {
let table_statistics = table
.read_table_snapshot_statistics(Some(&snapshot))
.await?;
if let Some(table_statistics) = &table_statistics {
let prev_snapshot_id = snapshot.prev_snapshot_id.map(|(id, _)| id);
if Some(table_statistics.snapshot_id) == prev_snapshot_id {
return Ok(PipelineBuildResult::create());
}
}

// plan sql
let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics {
let is_full = match table
.navigate_to_point(
&NavigationPoint::SnapshotID(table_statistics.snapshot_id.simple().to_string()),
self.ctx.clone().get_abort_checker(),
)
.await
{
Ok(t) => !t
.read_table_snapshot()
.await
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
Err(_) => true,
};

let temporal_str = if is_full {
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple())
} else {
// analyze only need to collect the added blocks.
let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp());
format!(
"CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}",
table_statistics.snapshot_id.simple(),
snapshot.snapshot_id.simple(),
)
};
(is_full, temporal_str)
} else {
(
true,
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()),
)
};
let mut parts = Vec::with_capacity(snapshot.segments.len());
for (idx, segment_location) in snapshot.segments.iter().enumerate() {
parts.push(FuseLazyPartInfo::create(idx, segment_location.clone()));
}
self.ctx
.set_partitions(Partitions::create(PartitionsShuffleKind::Mod, parts))?;

let mut build_res = PipelineBuildResult::create();
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
// It's possible to OOM if the table is too large and spilling isn't enabled.
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
let mut histogram_info_receivers = HashMap::new();
let quote = self
.ctx
.get_settings()
.get_sql_dialect()?
.default_ident_quote();

// 0.01625 --> 12 buckets --> 4K size per column
// 1.04 / math.sqrt(1<<12) --> 0.01625
const DISTINCT_ERROR_RATE: f64 = 0.01625;
let ndv_select_expr = snapshot
.schema
.fields()
.iter()
.filter(|f| RangeIndex::supported_type(&f.data_type().into()))
.map(|f| {
format!(
"approx_count_distinct_state({DISTINCT_ERROR_RATE})({quote}{}{quote}) as ndv_{}",
f.name,
f.column_id()
)
})
.join(", ");

let sql = format!(
"SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}",
plan.database, plan.table,
);

info!("Analyze via sql: {sql}");

let (physical_plan, bind_context) = self.plan_sql(sql, false).await?;
let mut build_res =
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
// It's possible to OOM if the table is too large and spilling isn't enabled.
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
let mut histogram_info_receivers = HashMap::new();
if self.ctx.get_settings().get_enable_analyze_histogram()? {
let histogram_sqls = table
.schema()
Expand Down Expand Up @@ -264,10 +214,7 @@ impl Interpreter for AnalyzeTableInterpreter {
}
FuseTable::do_analyze(
self.ctx.clone(),
bind_context.output_schema(),
&self.plan.catalog,
&self.plan.database,
&self.plan.table,
table,
snapshot.snapshot_id,
&mut build_res.main_pipeline,
histogram_info_receivers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl PipelineBuilder {
Vec::with_capacity(mutation_source.partitions.partitions.len());
for part in &mutation_source.partitions.partitions {
// Safe to downcast because we know the partition is lazy
let part: &FuseLazyPartInfo = FuseLazyPartInfo::from_part(part)?;
let part = FuseLazyPartInfo::from_part(part)?;
segment_locations.push(SegmentLocation {
segment_idx: part.segment_index,
location: part.segment_location.clone(),
Expand Down
12 changes: 11 additions & 1 deletion src/query/service/src/test_kits/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use databend_common_storages_fuse::operations::load_last_snapshot_hint;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SEGMENT_STATISTICS_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
Expand Down Expand Up @@ -79,7 +80,7 @@ pub async fn check_data_dir(
segment_count: u32,
block_count: u32,
index_count: u32,
_block_stat_count: u32,
segment_stats_count: u32,
check_last_snapshot: Option<()>,
check_table_statistic_file: Option<()>,
) -> Result<()> {
Expand All @@ -91,6 +92,7 @@ pub async fn check_data_dir(
let mut ss_count = 0;
let mut ts_count = 0;
let mut sg_count = 0;
let mut hs_count = 0;
let mut b_count = 0;
let mut i_count = 0;
let mut table_statistic_files = vec![];
Expand All @@ -99,6 +101,7 @@ pub async fn check_data_dir(
let prefix_segment = FUSE_TBL_SEGMENT_PREFIX;
let prefix_block = FUSE_TBL_BLOCK_PREFIX;
let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
let prefix_segment_stats = FUSE_TBL_SEGMENT_STATISTICS_PREFIX;
for entry in WalkDir::new(root) {
let entry = entry.unwrap();
if entry.file_type().is_file() {
Expand All @@ -117,6 +120,8 @@ pub async fn check_data_dir(
} else if path.starts_with(prefix_snapshot_statistics) {
ts_count += 1;
table_statistic_files.push(entry_path.to_string());
} else if path.starts_with(prefix_segment_stats) {
hs_count += 1;
}
}
}
Expand All @@ -136,6 +141,11 @@ pub async fn check_data_dir(
"case [{}], check segment count",
case_name
);
assert_eq!(
hs_count, segment_stats_count,
"case [{}], check segment statistics count",
case_name
);

assert_eq!(
b_count, block_count,
Expand Down
25 changes: 7 additions & 18 deletions src/query/service/tests/it/storages/fuse/operations/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,12 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> {
let case_name = "analyze_statistic_purge";
do_insertions(&fixture).await?;

// optimize statistics three times
for i in 0..3 {
analyze_table(&fixture).await?;
check_data_dir(
&fixture,
case_name,
3 + i,
1 + i,
2,
2,
2,
2,
Some(()),
None,
)
.await?;
}
analyze_table(&fixture).await?;
check_data_dir(&fixture, case_name, 3, 1, 2, 2, 2, 2, Some(()), None).await?;

append_sample_data(1, &fixture).await?;
analyze_table(&fixture).await?;
check_data_dir(&fixture, case_name, 5, 2, 3, 3, 3, 3, Some(()), None).await?;

// Purge will keep at least two snapshots.
let table = fixture.latest_default_table().await?;
Expand All @@ -126,7 +115,7 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> {
fuse_table
.do_purge(&table_ctx, snapshot_files, None, true, false)
.await?;
check_data_dir(&fixture, case_name, 1, 1, 1, 1, 1, 1, Some(()), Some(())).await?;
check_data_dir(&fixture, case_name, 1, 1, 2, 2, 2, 2, Some(()), Some(())).await?;

Ok(())
}
Loading