Skip to content

Commit edcf7f5

Browse files
authored
feat(query): table_statistics(<database_name>[, table_name]) table functions (#17781)
* feat(query): table_statistics(<database_name>[, table_name]) table functions * add table statistics logic tst
1 parent 13146a5 commit edcf7f5

File tree

5 files changed

+325
-1
lines changed

5 files changed

+325
-1
lines changed

src/query/service/src/table_functions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod show_roles;
2626
mod show_variables;
2727
mod srf;
2828
mod sync_crash_me;
29+
mod system;
2930
mod table_function;
3031
mod table_function_factory;
3132

@@ -35,5 +36,6 @@ pub use numbers::NumbersTable;
3536
pub use openai::GPT2SQLTable;
3637
pub use others::LicenseInfoTable;
3738
pub use others::TenantQuotaTable;
39+
pub use system::TableStatisticsFunc;
3840
pub use table_function::TableFunction;
3941
pub use table_function_factory::TableFunctionFactory;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
mod table_statistics;
16+
pub use table_statistics::TableStatisticsFunc;
Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::catalog_kind::CATALOG_DEFAULT;
18+
use databend_common_catalog::plan::DataSourcePlan;
19+
use databend_common_catalog::table_args::string_value;
20+
use databend_common_catalog::table_args::TableArgs;
21+
use databend_common_exception::ErrorCode;
22+
use databend_common_exception::Result;
23+
use databend_common_expression::types::StringType;
24+
use databend_common_expression::DataBlock;
25+
use databend_common_expression::FromData;
26+
use databend_common_expression::TableDataType;
27+
use databend_common_expression::TableField;
28+
use databend_common_expression::TableSchema;
29+
use databend_common_expression::TableSchemaRefExt;
30+
use databend_common_storages_fuse::io::read::SnapshotHistoryReader;
31+
use databend_common_storages_fuse::io::MetaReaders;
32+
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
33+
use databend_common_storages_fuse::table_functions::string_literal;
34+
use databend_common_storages_fuse::table_functions::SimpleTableFunc;
35+
use databend_common_storages_fuse::FuseTable;
36+
use futures::stream::StreamExt;
37+
use log::warn;
38+
39+
use crate::sessions::TableContext;
40+
41+
pub struct TableStatisticsArgs {
42+
database_name: String,
43+
table_name: Option<String>,
44+
}
45+
46+
pub struct TableStatisticsFunc {
47+
args: TableStatisticsArgs,
48+
}
49+
50+
impl From<&TableStatisticsArgs> for TableArgs {
51+
fn from(args: &TableStatisticsArgs) -> Self {
52+
let mut positional_args = vec![string_literal(&args.database_name)];
53+
54+
if let Some(tbl) = &args.table_name {
55+
positional_args.push(string_literal(tbl));
56+
}
57+
58+
TableArgs::new_positioned(positional_args)
59+
}
60+
}
61+
62+
#[async_trait::async_trait]
63+
impl SimpleTableFunc for TableStatisticsFunc {
64+
fn table_args(&self) -> Option<TableArgs> {
65+
Some((&self.args).into())
66+
}
67+
68+
fn schema(&self) -> Arc<TableSchema> {
69+
TableSchemaRefExt::create(vec![
70+
TableField::new("database", TableDataType::String),
71+
TableField::new("table", TableDataType::String),
72+
TableField::new("engine", TableDataType::String),
73+
TableField::new("statistics_json", TableDataType::String),
74+
])
75+
}
76+
77+
async fn apply(
78+
&self,
79+
ctx: &Arc<dyn TableContext>,
80+
_plan: &DataSourcePlan,
81+
) -> Result<Option<DataBlock>> {
82+
let tenant_id = ctx.get_tenant();
83+
let catalog = ctx.get_catalog(CATALOG_DEFAULT).await?;
84+
85+
let mut db_names = Vec::new();
86+
let mut table_names = Vec::new();
87+
let mut engines = Vec::new();
88+
let mut stats_jsons = Vec::new();
89+
90+
// If a specific table is specified
91+
if let Some(table_name) = &self.args.table_name {
92+
// Directly get the specified table
93+
if let Ok(tbl) = catalog
94+
.get_table(&tenant_id, &self.args.database_name, table_name)
95+
.await
96+
{
97+
let engine = tbl.get_table_info().engine().to_string();
98+
match engine.to_lowercase().as_str() {
99+
"fuse" => {
100+
if let Some(stats) = get_fuse_table_statistics(tbl).await? {
101+
db_names.push(self.args.database_name.clone());
102+
table_names.push(table_name.clone());
103+
engines.push(engine.clone());
104+
stats_jsons.push(stats);
105+
}
106+
}
107+
_ => {
108+
warn!(
109+
"TableStatistics: Database {} Table {} is not a Fuse table",
110+
self.args.database_name, table_name
111+
);
112+
}
113+
}
114+
}
115+
}
116+
// If only database is specified
117+
else {
118+
// Get all tables in the specified database
119+
if let Ok(db) = catalog
120+
.get_database(&tenant_id, &self.args.database_name)
121+
.await
122+
{
123+
let tables = db.list_tables().await?;
124+
125+
// Process each table
126+
for table_info in tables {
127+
let table_name = table_info.name().to_string();
128+
129+
// Get table object
130+
if let Ok(tbl) = catalog
131+
.get_table(&tenant_id, &self.args.database_name, &table_name)
132+
.await
133+
{
134+
let engine = tbl.get_table_info().engine().to_string();
135+
match engine.to_lowercase().as_str() {
136+
"fuse" => {
137+
if let Some(stats) = get_fuse_table_statistics(tbl).await? {
138+
db_names.push(self.args.database_name.clone());
139+
table_names.push(table_name.clone());
140+
engines.push(engine.clone());
141+
stats_jsons.push(stats);
142+
}
143+
}
144+
_ => {
145+
warn!(
146+
"TableStatistics: Database {} Table {} is not a Fuse table",
147+
self.args.database_name, table_name
148+
);
149+
}
150+
}
151+
}
152+
}
153+
}
154+
}
155+
156+
Ok(Some(DataBlock::new_from_columns(vec![
157+
StringType::from_data(db_names),
158+
StringType::from_data(table_names),
159+
StringType::from_data(engines),
160+
StringType::from_data(stats_jsons),
161+
])))
162+
}
163+
164+
fn create(func_name: &str, table_args: TableArgs) -> Result<Self>
165+
where Self: Sized {
166+
// Require at least database name parameter
167+
if table_args.positioned.is_empty() {
168+
return Err(ErrorCode::BadArguments(format!(
169+
"Function {} requires at least one argument: database name",
170+
func_name
171+
)));
172+
}
173+
174+
match table_args.positioned.len() {
175+
1 => {
176+
// Only database name is provided
177+
let database_name = string_value(&table_args.positioned[0])?;
178+
Ok(Self {
179+
args: TableStatisticsArgs {
180+
database_name,
181+
table_name: None,
182+
},
183+
})
184+
}
185+
2 => {
186+
// Both database and table name are provided
187+
let database_name = string_value(&table_args.positioned[0])?;
188+
let table_name = string_value(&table_args.positioned[1])?;
189+
Ok(Self {
190+
args: TableStatisticsArgs {
191+
database_name,
192+
table_name: Some(table_name),
193+
},
194+
})
195+
}
196+
_ => Err(ErrorCode::BadArguments(format!(
197+
"Function {} takes 1 or 2 arguments: database_name, [table_name]",
198+
func_name
199+
))),
200+
}
201+
}
202+
}
203+
204+
// Get Fuse table statistics, returns JSON string
205+
async fn get_fuse_table_statistics(
206+
table: Arc<dyn databend_common_catalog::table::Table>,
207+
) -> Result<Option<String>> {
208+
// Try to convert to FuseTable
209+
if let Ok(fuse_table) = FuseTable::try_from_table(table.as_ref()) {
210+
let meta_location_generator = fuse_table.meta_location_generator().clone();
211+
let snapshot_location = fuse_table.snapshot_loc();
212+
213+
if let Some(snapshot_location) = snapshot_location {
214+
let table_snapshot_reader =
215+
MetaReaders::table_snapshot_reader(fuse_table.get_operator());
216+
let format_version =
217+
TableMetaLocationGenerator::snapshot_version(snapshot_location.as_str());
218+
219+
let lite_snapshot_stream = table_snapshot_reader.snapshot_history(
220+
snapshot_location,
221+
format_version,
222+
meta_location_generator.clone(),
223+
);
224+
225+
if let Some(Ok((snapshot, _v))) = lite_snapshot_stream.take(1).next().await {
226+
// Convert snapshot to JSON, but filter out segments field
227+
let mut snapshot_json = serde_json::to_value(&snapshot)?;
228+
229+
// If it's an object and contains segments field, remove it
230+
if let serde_json::Value::Object(ref mut obj) = snapshot_json {
231+
obj.remove("segments");
232+
}
233+
234+
// Convert JSON to variant
235+
return Ok(Some(serde_json::to_string(&snapshot_json)?));
236+
}
237+
}
238+
}
239+
240+
Ok(None)
241+
}

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::table_functions::show_roles::ShowRoles;
5959
use crate::table_functions::show_variables::ShowVariables;
6060
use crate::table_functions::srf::RangeTable;
6161
use crate::table_functions::sync_crash_me::SyncCrashMeTable;
62+
use crate::table_functions::system::TableStatisticsFunc;
6263
use crate::table_functions::GPT2SQLTable;
6364
use crate::table_functions::TableFunction;
6465
type TableFunctionCreators = RwLock<HashMap<String, (MetaId, Arc<dyn TableFunctionCreator>)>>;
@@ -342,7 +343,13 @@ impl TableFunctionFactory {
342343
"show_roles".to_string(),
343344
(next_id(), Arc::new(ShowRoles::create)),
344345
);
345-
346+
creators.insert(
347+
"table_statistics".to_string(),
348+
(
349+
next_id(),
350+
Arc::new(TableFunctionTemplate::<TableStatisticsFunc>::create),
351+
),
352+
);
346353
creators.insert(
347354
"iceberg_snapshot".to_string(),
348355
(next_id(), Arc::new(IcebergInspectTable::create)),
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
statement ok
2+
CREATE OR REPLACE DATABASE table_statistics;
3+
4+
statement ok
5+
USE table_statistics;
6+
7+
statement ok
8+
create table t1(c int);
9+
10+
statement ok
11+
create table t2(c int);
12+
13+
statement ok
14+
insert into t1 values (1);
15+
16+
statement ok
17+
insert into t2 values (1);
18+
19+
query I
20+
select count(*) from table_statistics('table_statistics', 't1');
21+
----
22+
1
23+
24+
query I
25+
select count(*) from table_statistics('table_statistics', 't2');
26+
----
27+
1
28+
29+
query I
30+
select count(*) from table_statistics('table_statistics');
31+
----
32+
2
33+
34+
query T
35+
select engine from table_statistics('table_statistics', 't1');
36+
----
37+
FUSE
38+
39+
query T
40+
select engine from table_statistics('table_statistics', 't2');
41+
----
42+
FUSE
43+
44+
query T
45+
select engine from table_statistics('table_statistics');
46+
----
47+
FUSE
48+
FUSE
49+
50+
query III
51+
select parse_json(statistics_json):summary:block_count from table_statistics('table_statistics');
52+
----
53+
1
54+
1
55+
56+
57+
statement error
58+
select * from table_statistics();

0 commit comments

Comments
 (0)