Skip to content

Commit 322d17d

Browse files
committed
refactor
1 parent 6e3713f commit 322d17d

File tree

4 files changed

+213
-255
lines changed

4 files changed

+213
-255
lines changed

src/query/service/src/interpreters/interpreter_table_analyze.rs

Lines changed: 14 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
use std::collections::HashMap;
1616
use std::sync::Arc;
1717

18-
use chrono::Utc;
18+
use databend_common_catalog::plan::PartitionsShuffleKind;
1919
use databend_common_catalog::table::TableExt;
2020
use databend_common_exception::Result;
2121
use databend_common_pipeline_core::processors::ProcessorPtr;
@@ -26,20 +26,19 @@ use databend_common_sql::plans::Plan;
2626
use databend_common_sql::BindContext;
2727
use databend_common_sql::Planner;
2828
use databend_common_storage::DEFAULT_HISTOGRAM_BUCKETS;
29-
use databend_common_storages_factory::NavigationPoint;
3029
use databend_common_storages_factory::Table;
3130
use databend_common_storages_fuse::operations::AnalyzeLightMutator;
3231
use databend_common_storages_fuse::operations::HistogramInfoSink;
32+
use databend_common_storages_fuse::FuseLazyPartInfo;
3333
use databend_common_storages_fuse::FuseTable;
34+
use databend_storages_common_cache::Partitions;
3435
use databend_storages_common_index::Index;
3536
use databend_storages_common_index::RangeIndex;
36-
use itertools::Itertools;
3737
use log::info;
3838

3939
use crate::interpreters::Interpreter;
4040
use crate::pipelines::PipelineBuildResult;
4141
use crate::schedulers::build_query_pipeline;
42-
use crate::schedulers::build_query_pipeline_without_render_result_set;
4342
use crate::sessions::QueryContext;
4443
use crate::sessions::TableContext;
4544

@@ -133,87 +132,23 @@ impl Interpreter for AnalyzeTableInterpreter {
133132
return Ok(PipelineBuildResult::create());
134133
}
135134

136-
let table_statistics = table
137-
.read_table_snapshot_statistics(Some(&snapshot))
138-
.await?;
139-
if let Some(table_statistics) = &table_statistics {
140-
if table_statistics.snapshot_id == snapshot.snapshot_id {
141-
return Ok(PipelineBuildResult::create());
142-
}
135+
let mut parts = Vec::with_capacity(snapshot.segments.len());
136+
for (idx, segment_location) in snapshot.segments.iter().enumerate() {
137+
parts.push(FuseLazyPartInfo::create(idx, segment_location.clone()));
143138
}
139+
self.ctx
140+
.set_partitions(Partitions::create(PartitionsShuffleKind::Mod, parts))?;
144141

145-
// plan sql
146-
let (is_full, temporal_str) = if let Some(table_statistics) = &table_statistics {
147-
let is_full = match table
148-
.navigate_to_point(
149-
&NavigationPoint::SnapshotID(table_statistics.snapshot_id.simple().to_string()),
150-
self.ctx.clone().get_abort_checker(),
151-
)
152-
.await
153-
{
154-
Ok(t) => !t
155-
.read_table_snapshot()
156-
.await
157-
.is_ok_and(|s| s.is_some_and(|s| s.prev_table_seq.is_some())),
158-
Err(_) => true,
159-
};
160-
161-
let temporal_str = if is_full {
162-
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple())
163-
} else {
164-
// analyze only need to collect the added blocks.
165-
let table_alias = format!("_change_insert${:08x}", Utc::now().timestamp());
166-
format!(
167-
"CHANGES(INFORMATION => DEFAULT) AT (snapshot => '{}') END (snapshot => '{}') AS {table_alias}",
168-
table_statistics.snapshot_id.simple(),
169-
snapshot.snapshot_id.simple(),
170-
)
171-
};
172-
(is_full, temporal_str)
173-
} else {
174-
(
175-
true,
176-
format!("AT (snapshot => '{}')", snapshot.snapshot_id.simple()),
177-
)
178-
};
179-
142+
let mut build_res = PipelineBuildResult::create();
143+
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
144+
// It's possible to OOM if the table is too large and spilling isn't enabled.
145+
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
146+
let mut histogram_info_receivers = HashMap::new();
180147
let quote = self
181148
.ctx
182149
.get_settings()
183150
.get_sql_dialect()?
184151
.default_ident_quote();
185-
186-
// 0.01625 --> 12 buckets --> 4K size per column
187-
// 1.04 / math.sqrt(1<<12) --> 0.01625
188-
const DISTINCT_ERROR_RATE: f64 = 0.01625;
189-
let ndv_select_expr = snapshot
190-
.schema
191-
.fields()
192-
.iter()
193-
.filter(|f| RangeIndex::supported_type(&f.data_type().into()))
194-
.map(|f| {
195-
format!(
196-
"approx_count_distinct_state({DISTINCT_ERROR_RATE})({quote}{}{quote}) as ndv_{}",
197-
f.name,
198-
f.column_id()
199-
)
200-
})
201-
.join(", ");
202-
203-
let sql = format!(
204-
"SELECT {ndv_select_expr}, {is_full} as is_full from {}.{} {temporal_str}",
205-
plan.database, plan.table,
206-
);
207-
208-
info!("Analyze via sql: {sql}");
209-
210-
let (physical_plan, bind_context) = self.plan_sql(sql, false).await?;
211-
let mut build_res =
212-
build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?;
213-
// After profiling, computing histogram is heavy and the bottleneck is window function(90%).
214-
// It's possible to OOM if the table is too large and spilling isn't enabled.
215-
// We add a setting `enable_analyze_histogram` to control whether to compute histogram(default is closed).
216-
let mut histogram_info_receivers = HashMap::new();
217152
if self.ctx.get_settings().get_enable_analyze_histogram()? {
218153
let histogram_sqls = table
219154
.schema()
@@ -269,10 +204,7 @@ impl Interpreter for AnalyzeTableInterpreter {
269204
}
270205
FuseTable::do_analyze(
271206
self.ctx.clone(),
272-
bind_context.output_schema(),
273-
&self.plan.catalog,
274-
&self.plan.database,
275-
&self.plan.table,
207+
table,
276208
snapshot.snapshot_id,
277209
&mut build_res.main_pipeline,
278210
histogram_info_receivers,

src/query/service/tests/it/storages/fuse/operations/table_analyze.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use databend_common_expression::types::number::NumberScalar;
2323
use databend_common_expression::ColumnId;
2424
use databend_common_expression::Scalar;
2525
use databend_common_io::prelude::borsh_deserialize_from_slice;
26-
use databend_common_storage::MetaHLL12;
26+
use databend_common_storage::MetaHLL;
2727
use databend_common_storages_fuse::io::MetaReaders;
2828
use databend_common_storages_fuse::io::MetaWriter;
2929
use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut;
@@ -216,8 +216,7 @@ async fn test_table_analyze_without_prev_table_seq() -> Result<()> {
216216

217217
// generate table statistics.
218218
let col: Vec<u8> = vec![1, 3, 0, 0, 0, 118, 5, 1, 21, 6, 3, 229, 13, 3];
219-
let hll: HashMap<ColumnId, MetaHLL12> =
220-
HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]);
219+
let hll: HashMap<ColumnId, MetaHLL> = HashMap::from([(0, borsh_deserialize_from_slice(&col)?)]);
221220
let table_statistics =
222221
TableSnapshotStatistics::new(hll, HashMap::new(), snapshot_1.snapshot_id, 14);
223222
let table_statistics_location = location_gen.snapshot_statistics_location_from_uuid(

src/query/storages/common/table_meta/src/meta/v4/table_snapshot_statistics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub struct TableSnapshotStatistics {
3131
pub format_version: FormatVersion,
3232

3333
pub snapshot_id: SnapshotId,
34-
pub row_count: usize,
34+
pub row_count: u64,
3535
pub hll: HashMap<ColumnId, MetaHLL>,
3636
pub histograms: HashMap<ColumnId, Histogram>,
3737
}
@@ -41,7 +41,7 @@ impl TableSnapshotStatistics {
4141
hll: HashMap<ColumnId, MetaHLL>,
4242
histograms: HashMap<ColumnId, Histogram>,
4343
snapshot_id: SnapshotId,
44-
row_count: usize,
44+
row_count: u64,
4545
) -> Self {
4646
Self {
4747
format_version: TableSnapshotStatistics::VERSION,

0 commit comments

Comments
 (0)