14
14
15
15
use std:: sync:: Arc ;
16
16
17
+ use databend_common_catalog:: catalog:: Catalog ;
18
+ use databend_common_catalog:: catalog:: CatalogManager ;
17
19
use databend_common_catalog:: catalog_kind:: CATALOG_DEFAULT ;
20
+ use databend_common_catalog:: database:: Database ;
18
21
use databend_common_catalog:: plan:: PushDownInfo ;
19
22
use databend_common_catalog:: table:: Table ;
20
23
use databend_common_catalog:: table_context:: TableContext ;
@@ -29,16 +32,20 @@ use databend_common_expression::TableField;
29
32
use databend_common_expression:: TableSchemaRefExt ;
30
33
use databend_common_functions:: BUILTIN_FUNCTIONS ;
31
34
use databend_common_meta_app:: schema:: database_name_ident:: DatabaseNameIdent ;
35
+ use databend_common_meta_app:: schema:: CatalogInfo ;
36
+ use databend_common_meta_app:: schema:: CatalogNameIdent ;
32
37
use databend_common_meta_app:: schema:: TableIdent ;
33
38
use databend_common_meta_app:: schema:: TableInfo ;
34
39
use databend_common_meta_app:: schema:: TableMeta ;
40
+ use databend_common_meta_app:: tenant:: Tenant ;
35
41
use databend_common_sql:: Planner ;
36
42
use databend_common_storages_stream:: stream_table:: StreamTable ;
37
43
use databend_common_storages_stream:: stream_table:: STREAM_ENGINE ;
38
44
use databend_common_storages_view:: view_table:: QUERY ;
39
45
use databend_common_storages_view:: view_table:: VIEW_ENGINE ;
40
46
use log:: warn;
41
47
48
+ use crate :: generate_catalog_meta;
42
49
use crate :: table:: AsyncOneBlockSystemTable ;
43
50
use crate :: table:: AsyncSystemTable ;
44
51
use crate :: util:: find_eq_filter;
@@ -61,7 +68,15 @@ impl AsyncSystemTable for ColumnsTable {
61
68
ctx : Arc < dyn TableContext > ,
62
69
push_downs : Option < PushDownInfo > ,
63
70
) -> Result < DataBlock > {
64
- let rows = self . dump_table_columns ( ctx, push_downs) . await ?;
71
+ let catalog_mgr = CatalogManager :: instance ( ) ;
72
+ let catalog = catalog_mgr
73
+ . get_catalog (
74
+ ctx. get_tenant ( ) . tenant_name ( ) ,
75
+ self . get_table_info ( ) . catalog ( ) ,
76
+ ctx. session_state ( ) ,
77
+ )
78
+ . await ?;
79
+ let rows = self . dump_table_columns ( ctx, push_downs, & catalog) . await ?;
65
80
let mut names: Vec < String > = Vec :: with_capacity ( rows. len ( ) ) ;
66
81
let mut tables: Vec < String > = Vec :: with_capacity ( rows. len ( ) ) ;
67
82
let mut databases: Vec < String > = Vec :: with_capacity ( rows. len ( ) ) ;
@@ -111,7 +126,7 @@ impl AsyncSystemTable for ColumnsTable {
111
126
}
112
127
113
128
impl ColumnsTable {
114
- pub fn create ( table_id : u64 ) -> Arc < dyn Table > {
129
+ pub fn create ( table_id : u64 , ctl_name : & str ) -> Arc < dyn Table > {
115
130
let schema = TableSchemaRefExt :: create ( vec ! [
116
131
TableField :: new( "name" , TableDataType :: String ) ,
117
132
TableField :: new( "database" , TableDataType :: String ) ,
@@ -135,6 +150,11 @@ impl ColumnsTable {
135
150
engine : "SystemColumns" . to_string ( ) ,
136
151
..Default :: default ( )
137
152
} ,
153
+ catalog_info : Arc :: new ( CatalogInfo {
154
+ name_ident : CatalogNameIdent :: new ( Tenant :: new_literal ( "dummy" ) , ctl_name) . into ( ) ,
155
+ meta : generate_catalog_meta ( ctl_name) ,
156
+ ..Default :: default ( )
157
+ } ) ,
138
158
..Default :: default ( )
139
159
} ;
140
160
@@ -146,8 +166,9 @@ impl ColumnsTable {
146
166
& self ,
147
167
ctx : Arc < dyn TableContext > ,
148
168
push_downs : Option < PushDownInfo > ,
169
+ catalog : & Arc < dyn Catalog > ,
149
170
) -> Result < Vec < ( String , String , String , TableField ) > > {
150
- let database_and_tables = dump_tables ( & ctx, push_downs) . await ?;
171
+ let database_and_tables = dump_tables ( & ctx, push_downs, catalog ) . await ?;
151
172
152
173
let mut rows: Vec < ( String , String , String , TableField ) > = vec ! [ ] ;
153
174
for ( database, tables) in database_and_tables {
@@ -237,24 +258,24 @@ impl ColumnsTable {
237
258
pub ( crate ) async fn dump_tables (
238
259
ctx : & Arc < dyn TableContext > ,
239
260
push_downs : Option < PushDownInfo > ,
261
+ catalog : & Arc < dyn Catalog > ,
240
262
) -> Result < Vec < ( String , Vec < Arc < dyn Table > > ) > > {
241
263
let tenant = ctx. get_tenant ( ) ;
242
264
243
265
// For performance considerations, we do not require the most up-to-date table information here:
244
266
// - for regular tables, the data is certainly fresh
245
267
// - for read-only attached tables, the data may be outdated
268
+ let catalog = catalog. clone ( ) . disable_table_info_refresh ( ) ?;
246
269
247
- let catalog = ctx
248
- . get_catalog ( CATALOG_DEFAULT )
249
- . await ?
250
- . disable_table_info_refresh ( ) ?;
251
-
252
- let mut tables: Vec < String > = Vec :: new ( ) ;
253
- let mut databases: Vec < String > = Vec :: new ( ) ;
270
+ let mut filtered_db_names: Option < Vec < String > > = None ;
271
+ let mut filtered_table_names: Option < Vec < String > > = None ;
254
272
255
273
if let Some ( push_downs) = push_downs {
256
274
if let Some ( filter) = push_downs. filters . as_ref ( ) . map ( |f| & f. filter ) {
257
275
let expr = filter. as_expr ( & BUILTIN_FUNCTIONS ) ;
276
+ let mut databases: Vec < String > = Vec :: new ( ) ;
277
+ let mut tables: Vec < String > = Vec :: new ( ) ;
278
+
258
279
find_eq_filter ( & expr, & mut |col_name, scalar| {
259
280
if col_name == "database" {
260
281
if let Scalar :: String ( database) = scalar {
@@ -271,102 +292,134 @@ pub(crate) async fn dump_tables(
271
292
}
272
293
Ok ( ( ) )
273
294
} ) ;
295
+ if !databases. is_empty ( ) {
296
+ filtered_db_names = Some ( databases) ;
297
+ }
298
+ if !tables. is_empty ( ) {
299
+ filtered_table_names = Some ( tables) ;
300
+ }
274
301
}
275
302
}
276
303
277
- let visibility_checker = ctx. get_visibility_checker ( false ) . await ?;
304
+ let visibility_checker = if catalog. is_external ( ) {
305
+ None
306
+ } else {
307
+ Some ( ctx. get_visibility_checker ( false ) . await ?)
308
+ } ;
278
309
279
- let mut final_dbs: Vec < ( String , u64 ) > = Vec :: new ( ) ;
310
+ let mut final_dbs: Vec < Arc < dyn Database > > = Vec :: new ( ) ;
280
311
281
- if !databases. is_empty ( ) {
282
- for db in databases {
283
- let db_id = catalog
284
- . get_database ( & tenant, & db)
285
- . await ?
286
- . get_db_info ( )
287
- . database_id
288
- . db_id ;
289
- if visibility_checker. check_database_visibility ( CATALOG_DEFAULT , & db, db_id) {
290
- final_dbs. push ( ( db. to_string ( ) , db_id) ) ;
312
+ match ( filtered_db_names, & visibility_checker) {
313
+ ( Some ( db_names) , Some ( checker) ) => {
314
+ // Filtered databases + Visibility check
315
+ for db_name in db_names {
316
+ let db = catalog. get_database ( & tenant, & db_name) . await ?;
317
+ let db_id = db. get_db_info ( ) . database_id . db_id ;
318
+ if checker. check_database_visibility ( CATALOG_DEFAULT , & db_name, db_id) {
319
+ final_dbs. push ( db) ;
320
+ }
291
321
}
292
322
}
293
- } else {
294
- let catalog_dbs = visibility_checker. get_visibility_database ( ) ;
295
- // None means has global level privileges
296
- if let Some ( catalog_dbs) = catalog_dbs {
297
- for ( catalog_name, dbs) in catalog_dbs {
298
- if catalog_name == CATALOG_DEFAULT {
299
- let mut catalog_db_ids = vec ! [ ] ;
300
- let mut catalog_db_names = vec ! [ ] ;
301
- catalog_db_names. extend (
302
- dbs. iter ( )
303
- . filter_map ( |( db_name, _) | * db_name)
304
- . map ( |db_name| db_name. to_string ( ) ) ,
305
- ) ;
306
- catalog_db_ids. extend ( dbs. iter ( ) . filter_map ( |( _, db_id) | * db_id) ) ;
307
- if let Ok ( databases) = catalog
308
- . mget_database_names_by_ids ( & tenant, & catalog_db_ids)
309
- . await
310
- {
311
- catalog_db_names. extend ( databases. into_iter ( ) . flatten ( ) ) ;
312
- } else {
313
- let msg = format ! ( "Failed to get database name by id: {}" , catalog. name( ) ) ;
314
- warn ! ( "{}" , msg) ;
315
- }
316
- let db_idents = catalog_db_names
323
+ ( Some ( db_names) , None ) => {
324
+ // Filtered databases + No visibility check
325
+ for db_name in db_names {
326
+ let db = catalog. get_database ( & tenant, & db_name) . await ?;
327
+ final_dbs. push ( db) ;
328
+ }
329
+ }
330
+ ( None , Some ( checker) ) => {
331
+ // All databases + Visibility check
332
+ let catalog_dbs = checker. get_visibility_database ( ) ;
333
+ if let Some ( catalog_dbs) = catalog_dbs {
334
+ if let Some ( dbs_in_default_catalog) =
335
+ catalog_dbs. get ( & ctx. get_default_catalog ( ) ?. name ( ) )
336
+ {
337
+ let db_idents = dbs_in_default_catalog
317
338
. iter ( )
318
- . map ( |name| DatabaseNameIdent :: new ( & tenant, name) )
339
+ . filter_map ( |( db_name, _) | * db_name) // Get only names provided by checker
340
+ . map ( |db_name| DatabaseNameIdent :: new ( & tenant, db_name) )
319
341
. collect :: < Vec < DatabaseNameIdent > > ( ) ;
320
- let dbs: Vec < ( String , u64 ) > = catalog
321
- . mget_databases ( & tenant, & db_idents)
322
- . await ?
323
- . iter ( )
324
- . map ( |db| ( db. name ( ) . to_string ( ) , db. get_db_info ( ) . database_id . db_id ) )
325
- . collect ( ) ;
326
- final_dbs. extend ( dbs) ;
342
+
343
+ let databases = catalog. mget_databases ( & tenant, & db_idents) . await ?;
344
+ // mget_databases returns Vec<Arc<dyn Database>>, checker already filtered by ID/Name
345
+ for db in databases {
346
+ // Double check visibility in case mget_databases returned something unexpected,
347
+ // although checker should be the source of truth here.
348
+ let db_id = db. get_db_info ( ) . database_id . db_id ;
349
+ if checker. check_database_visibility ( CATALOG_DEFAULT , db. name ( ) , db_id) {
350
+ final_dbs. push ( db) ;
351
+ } else {
352
+ // This case should ideally not happen if checker is correct, but good for safety
353
+ warn ! ( "Visibility checker returned database {} but check_database_visibility failed." , db. name( ) ) ;
354
+ }
355
+ }
327
356
}
328
- }
329
- } else {
330
- let all_databases = catalog. list_databases ( & tenant) . await ?;
331
- for db in all_databases {
332
- let db_id = db. get_db_info ( ) . database_id . db_id ;
333
- let db_name = db. name ( ) ;
334
- if visibility_checker. check_database_visibility ( CATALOG_DEFAULT , db_name, db_id) {
335
- final_dbs. push ( ( db_name. to_string ( ) , db_id) ) ;
357
+ } else {
358
+ // User has global privileges, check all
359
+ let all_databases = catalog. list_databases ( & tenant) . await ?;
360
+ for db in all_databases {
361
+ let db_id = db. get_db_info ( ) . database_id . db_id ;
362
+ let db_name = db. name ( ) ;
363
+ if checker. check_database_visibility ( CATALOG_DEFAULT , db_name, db_id) {
364
+ final_dbs. push ( db) ;
365
+ }
336
366
}
337
367
}
338
368
}
369
+ ( None , None ) => {
370
+ // All databases + No visibility check
371
+ final_dbs = catalog. list_databases ( & tenant) . await ?;
372
+ }
339
373
}
340
374
341
375
let mut final_tables: Vec < ( String , Vec < Arc < dyn Table > > ) > = Vec :: with_capacity ( final_dbs. len ( ) ) ;
342
- for ( database, db_id) in final_dbs {
343
- let tables = if tables. is_empty ( ) {
344
- catalog
345
- . list_tables ( & tenant, & database)
346
- . await
347
- . unwrap_or_default ( )
348
- } else {
349
- let mut res = Vec :: new ( ) ;
350
- for table in & tables {
351
- if let Ok ( table) = catalog. get_table ( & tenant, & database, table) . await {
352
- res. push ( table) ;
376
+
377
+ for db in final_dbs {
378
+ let db_name = db. name ( ) . to_string ( ) ;
379
+ let db_id = db. get_db_info ( ) . database_id . db_id ;
380
+
381
+ let tables_in_db = match & filtered_table_names {
382
+ Some ( table_names) => {
383
+ // Filtered tables
384
+ let mut res = Vec :: new ( ) ;
385
+ for table_name in table_names {
386
+ // Use get_table for specific names
387
+ if let Ok ( table) = catalog. get_table ( & tenant, & db_name, table_name) . await {
388
+ res. push ( table) ;
389
+ }
353
390
}
391
+ res
392
+ }
393
+ None => {
394
+ // All tables in database
395
+ // Use list_tables for all tables, handle error by returning empty vec
396
+ catalog
397
+ . list_tables ( & tenant, & db_name)
398
+ . await
399
+ . unwrap_or_default ( )
354
400
}
355
- res
356
401
} ;
357
- let mut filtered_tables = Vec :: with_capacity ( tables. len ( ) ) ;
358
- for table in tables {
359
- if visibility_checker. check_table_visibility (
360
- CATALOG_DEFAULT ,
361
- & database,
362
- table. name ( ) ,
363
- db_id,
364
- table. get_id ( ) ,
365
- ) {
402
+
403
+ let mut filtered_tables = Vec :: with_capacity ( tables_in_db. len ( ) ) ;
404
+ for table in tables_in_db {
405
+ // Apply table visibility check if checker exists
406
+ let is_visible = match & visibility_checker {
407
+ Some ( checker) => checker. check_table_visibility (
408
+ CATALOG_DEFAULT ,
409
+ & db_name,
410
+ table. name ( ) ,
411
+ db_id,
412
+ table. get_id ( ) ,
413
+ ) ,
414
+ None => true , // No checker, all tables are visible
415
+ } ;
416
+
417
+ if is_visible {
366
418
filtered_tables. push ( table) ;
367
419
}
368
420
}
369
- final_tables. push ( ( database , filtered_tables) ) ;
421
+ final_tables. push ( ( db_name , filtered_tables) ) ;
370
422
}
423
+
371
424
Ok ( final_tables)
372
425
}
0 commit comments