@@ -27,9 +27,11 @@ use databend_common_expression::types::TimestampType;
27
27
use databend_common_expression:: types:: UInt64Type ;
28
28
use databend_common_expression:: utils:: FromData ;
29
29
use databend_common_expression:: DataBlock ;
30
+ use databend_common_expression:: Scalar ;
30
31
use databend_common_expression:: TableDataType ;
31
32
use databend_common_expression:: TableField ;
32
33
use databend_common_expression:: TableSchemaRefExt ;
34
+ use databend_common_functions:: BUILTIN_FUNCTIONS ;
33
35
use databend_common_meta_app:: principal:: OwnershipObject ;
34
36
use databend_common_meta_app:: schema:: database_name_ident:: DatabaseNameIdent ;
35
37
use databend_common_meta_app:: schema:: TableIdent ;
@@ -41,6 +43,7 @@ use log::warn;
41
43
42
44
use crate :: table:: AsyncOneBlockSystemTable ;
43
45
use crate :: table:: AsyncSystemTable ;
46
+ use crate :: util:: find_eq_filter;
44
47
45
48
pub type DatabasesTableWithHistory = DatabasesTable < true > ;
46
49
pub type DatabasesTableWithoutHistory = DatabasesTable < false > ;
@@ -98,17 +101,40 @@ where DatabasesTable<WITH_HISTORY>: HistoryAware
98
101
async fn get_full_data (
99
102
& self ,
100
103
ctx : Arc < dyn TableContext > ,
101
- _push_downs : Option < PushDownInfo > ,
104
+ push_downs : Option < PushDownInfo > ,
102
105
) -> Result < DataBlock > {
103
106
let tenant = ctx. get_tenant ( ) ;
104
107
105
- let catalogs = CatalogManager :: instance ( ) ;
106
- let catalogs: Vec < ( String , Arc < dyn Catalog > ) > = catalogs
107
- . list_catalogs ( & tenant, ctx. session_state ( ) )
108
- . await ?
109
- . iter ( )
110
- . map ( |e| ( e. name ( ) , e. clone ( ) ) )
111
- . collect ( ) ;
108
+ // Check filters (catalog name)
109
+ let mut filter_catalog_name = None ;
110
+ if let Some ( push_downs) = push_downs {
111
+ if let Some ( filter) = push_downs. filters . as_ref ( ) . map ( |f| & f. filter ) {
112
+ let expr = filter. as_expr ( & BUILTIN_FUNCTIONS ) ;
113
+ find_eq_filter ( & expr, & mut |col_name, scalar| {
114
+ if col_name == "catalog" {
115
+ if let Scalar :: String ( catalog) = scalar {
116
+ filter_catalog_name = Some ( catalog. clone ( ) ) ;
117
+ }
118
+ }
119
+ Ok ( ( ) )
120
+ } ) ;
121
+ }
122
+ }
123
+
124
+ let catalogs = if let Some ( filter_catalog_name) = filter_catalog_name {
125
+ let mut res = vec ! [ ] ;
126
+ let ctl = ctx. get_catalog ( & filter_catalog_name) . await ?;
127
+ res. push ( ( filter_catalog_name, ctl) ) ;
128
+ res
129
+ } else {
130
+ let catalogs = CatalogManager :: instance ( ) ;
131
+ catalogs
132
+ . list_catalogs ( & tenant, ctx. session_state ( ) )
133
+ . await ?
134
+ . iter ( )
135
+ . map ( |e| ( e. name ( ) , e. clone ( ) ) )
136
+ . collect ( )
137
+ } ;
112
138
113
139
let user_api = UserApiProvider :: instance ( ) ;
114
140
let mut catalog_names = vec ! [ ] ;
0 commit comments