Skip to content

Commit 2ba98f2

Browse files
authored
refactor: analyze table (#18514)
* update * update * update * refactor * fix test * fix test * fix test * fix test * fix test * fix
1 parent 4a23deb commit 2ba98f2

File tree

31 files changed

+720
-276
lines changed

31 files changed

+720
-276
lines changed

Cargo.lock

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ sha1 = "0.10.5"
491491
sha2 = "0.10.8"
492492
simdutf8 = "0.1.4"
493493
similar = "2.7.0"
494-
simple_hll = { version = "0.0.1", features = ["serde_borsh"] }
494+
simple_hll = { version = "0.0.4", features = ["serde_borsh"] }
495495
simsearch = "0.2"
496496
siphasher = "0.3"
497497
sled = { version = "0.34", default-features = false }

src/common/storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ prometheus-client = { workspace = true }
3737
regex = { workspace = true }
3838
reqwest = { workspace = true }
3939
serde = { workspace = true }
40+
simple_hll = { workspace = true, features = ["serde_borsh"] }
4041
thiserror = { workspace = true }
4142
url = { workspace = true }
4243

src/common/storage/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub use histogram::HistogramBucket;
8585
pub use histogram::DEFAULT_HISTOGRAM_BUCKETS;
8686
pub use merge::MutationStatus;
8787
pub use meta_hll::MetaHLL;
88+
pub use meta_hll::MetaHLL12;
8889
pub use multi_table_insert::MultiTableInsertStatus;
8990
pub use statistics::Datum;
9091
pub use statistics::F64;

src/common/storage/src/meta_hll.rs

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use std::hash::Hash;
1818

1919
use ahash::RandomState;
2020

21+
pub type MetaHLL12 = simple_hll::HyperLogLog<12>;
22+
2123
const P: usize = 7_usize;
2224
const Q: usize = 64 - P;
2325
const M: usize = 1 << P;
@@ -51,6 +53,11 @@ impl MetaHLL {
5153
}
5254
}
5355

56+
pub fn with_registers(registers: Vec<u8>) -> Self {
57+
assert_eq!(registers.len(), M);
58+
Self { registers }
59+
}
60+
5461
/// Adds an hash to the MetaHLL.
5562
/// hash value is dertermined by caller
5663
#[inline]
@@ -67,7 +74,7 @@ impl MetaHLL {
6774

6875
/// Adds an object to the MetaHLL.
6976
/// Though we could pass different types into this method, caller should notice that
70-
pub fn add_object<T: Hash>(&mut self, obj: &T) {
77+
pub fn add_object<T: ?Sized + Hash>(&mut self, obj: &T) {
7178
let hash = SEED.hash_one(obj);
7279
self.add_hash(hash);
7380
}
@@ -161,6 +168,20 @@ fn hll_tau(x: f64) -> f64 {
161168
}
162169
}
163170

171+
impl From<MetaHLL12> for MetaHLL {
172+
fn from(value: MetaHLL12) -> Self {
173+
let registers = value.get_registers();
174+
let mut new_registers = vec![0; M];
175+
let group_size = registers.len() / M;
176+
for i in 0..M {
177+
for j in 0..group_size {
178+
new_registers[i] = new_registers[i].max(registers[i * group_size + j]);
179+
}
180+
}
181+
Self::with_registers(new_registers)
182+
}
183+
}
184+
164185
#[derive(serde::Serialize, borsh::BorshSerialize)]
165186
enum MetaHLLVariantRef<'a> {
166187
Empty,
@@ -366,4 +387,20 @@ mod tests {
366387
}
367388
compare_with_delta(hll.count(), 1000);
368389
}
390+
391+
#[test]
392+
fn test_from_hll() {
393+
let mut hll = MetaHLL12::new();
394+
for i in 0..100_000 {
395+
hll.add_object(&i);
396+
}
397+
398+
let hll = MetaHLL::from(hll);
399+
let count = hll.count();
400+
let error_rate = 1.04 / ((M as f64).sqrt());
401+
let diff = count as f64 / 100_000f64;
402+
403+
assert!(diff >= 1.0 - error_rate);
404+
assert!(diff <= 1.0 + error_rate);
405+
}
369406
}

src/query/service/src/interpreters/interpreter_table_analyze.rs

Lines changed: 15 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18-
use chrono::Utc;
18+
use databend_common_catalog::plan::PartitionsShuffleKind;
1919
use databend_common_catalog::table::TableExt;
2020
use databend_common_exception::Result;
2121
use databend_common_pipeline_core::processors::ProcessorPtr;
@@ -26,20 +26,19 @@ use databend_common_sql::plans::Plan;
2626
use databend_common_sql::BindContext;
2727
use databend_common_sql::Planner;
2828
use databend_common_storage::DEFAULT_HISTOGRAM_BUCKETS;
29-
use databend_common_storages_factory::NavigationPoint;
3029
use databend_common_storages_factory::Table;
3130
use databend_common_storages_fuse::operations::AnalyzeLightMutator;
3231
use databend_common_storages_fuse::operations::HistogramInfoSink;
32+
use databend_common_storages_fuse::FuseLazyPartInfo;
3333
use databend_common_storages_fuse::FuseTable;
34+
use databend_storages_common_cache::Partitions;
3435
use databend_storages_common_index::Index;
3536
use databend_storages_common_index::RangeIndex;
36-
use itertools::Itertools;
3737
use log::info;
3838

3939
use crate::interpreters::Interpreter;
4040
use crate::pipelines::PipelineBuildResult;
4141
use crate::schedulers::build_query_pipeline;
42-
use crate::schedulers::build_query_pipeline_without_render_result_set;
4342
use crate::sessions::QueryContext;
4443
use crate::sessions::TableContext;
4544

@@ -133,82 +132,23 @@ impl Interpreter for AnalyzeTableInterpreter {
133132
return Ok(PipelineBuildResult::create());
134133
}
135134

136-
let table_statistics = table
137-
.read_table_snapshot_statistics(Some(&snapshot))
138-
.await?;
139-
140-
// plan sql
141-
let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics {
142-
let is_full = match table
143-
.navigate_to_point(
144-
&NavigationPoint::SnapshotID(table_statistics.snapshot_id.simple().to_string()),
145-
self.ctx.clone().get_abort_checker(),
146-
)
147-
.await
148-
{
149-
Ok(t) => !t
150-
.read_table_snapshot()
151-
.await
152-
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
153-
Err(_) => true,
154-
};
155-
156-
let temporal_str = if is_full {
157-
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple())
158-
} else {
159-
// analyze only need to collect the added blocks.
160-
let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp());
161-
format!(
162-
"CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}",
163-
table_statistics.snapshot_id.simple(),
164-
snapshot.snapshot_id.simple(),
165-
)
166-
};
167-
(is_full, temporal_str)
168-
} else {
169-
(
170-
true,
171-
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()),
172-
)
173-
};
135+
let mut parts = Vec::with_capacity(snapshot.segments.len());
136+
for (idx, segment_location) in snapshot.segments.iter().enumerate() {
137+
parts.push(FuseLazyPartInfo::create(idx, segment_location.clone()));
138+
}
139+
self.ctx
140+
.set_partitions(Partitions::create(PartitionsShuffleKind::Mod, parts))?;
174141

142+
let mut build_res = PipelineBuildResult::create();
143+
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
144+
// It's possible to OOM if the table is too large and spilling isn't enabled.
145+
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
146+
let mut histogram_info_receivers = HashMap::new();
175147
let quote = self
176148
.ctx
177149
.get_settings()
178150
.get_sql_dialect()?
179151
.default_ident_quote();
180-
181-
// 0.01625 --> 12 buckets --> 4K size per column
182-
// 1.04 / math.sqrt(1<<12) --> 0.01625
183-
const DISTINCT_ERROR_RATE: f64 = 0.01625;
184-
let ndv_select_expr = snapshot
185-
.schema
186-
.fields()
187-
.iter()
188-
.filter(|f| RangeIndex::supported_type(&f.data_type().into()))
189-
.map(|f| {
190-
format!(
191-
"approx_count_distinct_state({DISTINCT_ERROR_RATE})({quote}{}{quote}) as ndv_{}",
192-
f.name,
193-
f.column_id()
194-
)
195-
})
196-
.join(", ");
197-
198-
let sql = format!(
199-
"SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}",
200-
plan.database, plan.table,
201-
);
202-
203-
info!("Analyze via sql: {sql}");
204-
205-
let (physical_plan, bind_context) = self.plan_sql(sql, false).await?;
206-
let mut build_res =
207-
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
208-
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
209-
// It's possible to OOM if the table is too large and spilling isn't enabled.
210-
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
211-
let mut histogram_info_receivers = HashMap::new();
212152
if self.ctx.get_settings().get_enable_analyze_histogram()? {
213153
let histogram_sqls = table
214154
.schema()
@@ -262,12 +202,8 @@ impl Interpreter for AnalyzeTableInterpreter {
262202
histogram_info_receivers.insert(col_id, rx);
263203
}
264204
}
265-
FuseTable::do_analyze(
205+
table.do_analyze(
266206
self.ctx.clone(),
267-
bind_context.output_schema(),
268-
&self.plan.catalog,
269-
&self.plan.database,
270-
&self.plan.table,
271207
snapshot.snapshot_id,
272208
&mut build_res.main_pipeline,
273209
histogram_info_receivers,

src/query/service/src/pipelines/builders/builder_mutation_source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl PipelineBuilder {
8080
Vec::with_capacity(mutation_source.partitions.partitions.len());
8181
for part in &mutation_source.partitions.partitions {
8282
// Safe to downcast because we know the partition is lazy
83-
let part: &FuseLazyPartInfo = FuseLazyPartInfo::from_part(part)?;
83+
let part = FuseLazyPartInfo::from_part(part)?;
8484
segment_locations.push(SegmentLocation {
8585
segment_idx: part.segment_index,
8686
location: part.segment_location.clone(),

src/query/service/src/test_kits/check.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use databend_common_storages_fuse::operations::load_last_snapshot_hint;
2525
use databend_common_storages_fuse::FuseTable;
2626
use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX;
2727
use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
28+
use databend_common_storages_fuse::FUSE_TBL_SEGMENT_STATISTICS_PREFIX;
2829
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
2930
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_STATISTICS_PREFIX;
3031
use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
@@ -79,7 +80,7 @@ pub async fn check_data_dir(
7980
segment_count: u32,
8081
block_count: u32,
8182
index_count: u32,
82-
_block_stat_count: u32,
83+
segment_stats_count: u32,
8384
check_last_snapshot: Option<()>,
8485
check_table_statistic_file: Option<()>,
8586
) -> Result<()> {
@@ -91,6 +92,7 @@ pub async fn check_data_dir(
9192
let mut ss_count = 0;
9293
let mut ts_count = 0;
9394
let mut sg_count = 0;
95+
let mut hs_count = 0;
9496
let mut b_count = 0;
9597
let mut i_count = 0;
9698
let mut table_statistic_files = vec![];
@@ -99,6 +101,7 @@ pub async fn check_data_dir(
99101
let prefix_segment = FUSE_TBL_SEGMENT_PREFIX;
100102
let prefix_block = FUSE_TBL_BLOCK_PREFIX;
101103
let prefix_index = FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
104+
let prefix_segment_stats = FUSE_TBL_SEGMENT_STATISTICS_PREFIX;
102105
for entry in WalkDir::new(root) {
103106
let entry = entry.unwrap();
104107
if entry.file_type().is_file() {
@@ -117,6 +120,8 @@ pub async fn check_data_dir(
117120
} else if path.starts_with(prefix_snapshot_statistics) {
118121
ts_count += 1;
119122
table_statistic_files.push(entry_path.to_string());
123+
} else if path.starts_with(prefix_segment_stats) {
124+
hs_count += 1;
120125
}
121126
}
122127
}
@@ -136,6 +141,11 @@ pub async fn check_data_dir(
136141
"case [{}], check segment count",
137142
case_name
138143
);
144+
assert_eq!(
145+
hs_count, segment_stats_count,
146+
"case [{}], check segment statistics count",
147+
case_name
148+
);
139149

140150
assert_eq!(
141151
b_count, block_count,

src/query/service/tests/it/storages/fuse/operations/analyze.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -100,23 +100,12 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> {
100100
let case_name = "analyze_statistic_purge";
101101
do_insertions(&fixture).await?;
102102

103-
// optimize statistics three times
104-
for i in 0..3 {
105-
analyze_table(&fixture).await?;
106-
check_data_dir(
107-
&fixture,
108-
case_name,
109-
3 + i,
110-
1 + i,
111-
2,
112-
2,
113-
2,
114-
2,
115-
Some(()),
116-
None,
117-
)
118-
.await?;
119-
}
103+
analyze_table(&fixture).await?;
104+
check_data_dir(&fixture, case_name, 3, 1, 2, 2, 2, 2, Some(()), None).await?;
105+
106+
append_sample_data(1, &fixture).await?;
107+
analyze_table(&fixture).await?;
108+
check_data_dir(&fixture, case_name, 5, 2, 3, 3, 3, 3, Some(()), None).await?;
120109

121110
// Purge will keep at least two snapshots.
122111
let table = fixture.latest_default_table().await?;
@@ -126,7 +115,7 @@ async fn test_fuse_snapshot_analyze_purge() -> Result<()> {
126115
fuse_table
127116
.do_purge(&table_ctx, snapshot_files, None, true, false)
128117
.await?;
129-
check_data_dir(&fixture, case_name, 1, 1, 1, 1, 1, 1, Some(()), Some(())).await?;
118+
check_data_dir(&fixture, case_name, 1, 1, 2, 2, 2, 2, Some(()), Some(())).await?;
130119

131120
Ok(())
132121
}

0 commit comments

Comments
 (0)