@@ -19,6 +19,8 @@ use std::path::Path;
1919use std:: path:: PathBuf ;
2020use std:: sync:: Arc ;
2121
22+ use databend_common_base:: base:: GlobalUniqName ;
23+ use databend_common_catalog:: cluster_info:: Cluster ;
2224use databend_common_catalog:: table_context:: TableContext ;
2325use databend_common_catalog:: BasicColumnStatistics ;
2426use databend_common_catalog:: TableStatistics ;
@@ -29,6 +31,7 @@ use databend_common_expression::types::Number;
2931use databend_common_expression:: types:: NumberScalar ;
3032use databend_common_expression:: types:: F64 ;
3133use databend_common_expression:: Scalar ;
34+ use databend_common_meta_types:: NodeInfo ;
3235use databend_common_sql:: executor:: PhysicalPlanBuilder ;
3336use databend_common_sql:: optimize;
3437use databend_common_sql:: optimizer:: ir:: SExpr ;
@@ -45,6 +48,7 @@ use databend_common_sql::IndexType;
4548use databend_common_sql:: Metadata ;
4649use databend_common_sql:: MetadataRef ;
4750use databend_common_storage:: Datum ;
51+ use databend_query:: clusters:: ClusterHelper ;
4852use databend_query:: sessions:: QueryContext ;
4953use databend_query:: test_kits:: TestFixture ;
5054use goldenfile:: Mint ;
@@ -66,7 +70,11 @@ struct TestSpec {
6670 #[ serde( default ) ]
6771 statistics_file : Option < String > ,
6872 #[ serde( default ) ]
73+ tables : HashMap < String , String > ,
74+ #[ serde( default ) ]
6975 auto_statistics : bool ,
76+ #[ serde( default ) ]
77+ node_num : Option < u64 > ,
7078 good_plan : Option < String > ,
7179}
7280
@@ -102,6 +110,8 @@ struct TestCase {
102110 auto_stats : bool ,
103111 stem : String ,
104112 subdir : Option < String > ,
113+ node_num : Option < u64 > ,
114+ tables : HashMap < String , String > ,
105115}
106116
107117struct TestSuite {
@@ -173,24 +183,6 @@ impl TestSuite {
173183 }
174184 }
175185
176- async fn setup_tables ( & self , ctx : & Arc < QueryContext > ) -> Result < ( ) > {
177- for file in self . find_files ( "tables" , & [ "sql" ] ) {
178- let sql = fs:: read_to_string ( & file)
179- . map_err ( |e| ErrorCode :: Internal ( format ! ( "Failed to read file: {}" , e) ) ) ?;
180- for statement in sql. split ( ';' ) . filter ( |s| !s. trim ( ) . is_empty ( ) ) {
181- match execute_sql ( ctx, statement) . await {
182- Ok ( _) => { }
183- Err ( e) if e. code ( ) == ErrorCode :: TABLE_ALREADY_EXISTS => {
184- // Ignore table already exists errors
185- continue ;
186- }
187- Err ( e) => return Err ( e) ,
188- }
189- }
190- }
191- Ok ( ( ) )
192- }
193-
194186 fn load_cases ( & self ) -> Result < Vec < TestCase > > {
195187 let cases_dir = self . base_path . join ( "cases" ) ;
196188 self . find_files ( "cases" , & [ "yaml" , "yml" ] )
@@ -218,6 +210,8 @@ impl TestSuite {
218210 let name = spec. name . clone ( ) ;
219211 let sql = spec. sql . clone ( ) ;
220212 let auto_stats = spec. auto_statistics ;
213+ let node_num = spec. node_num ;
214+ let tables = self . resolve_tables ( & spec) ?;
221215 let ( table_stats, column_stats) = self . resolve_stats ( spec) ?;
222216 let stem = path
223217 . file_stem ( )
@@ -235,14 +229,26 @@ impl TestSuite {
235229 Ok ( TestCase {
236230 name : Box :: leak ( full_name. into_boxed_str ( ) ) ,
237231 sql : Box :: leak ( sql. into_boxed_str ( ) ) ,
232+ tables,
238233 table_stats,
239234 column_stats,
240235 auto_stats,
241236 stem,
242237 subdir,
238+ node_num,
243239 } )
244240 }
245241
242+ fn resolve_tables ( & self , spec : & TestSpec ) -> Result < HashMap < String , String > > {
243+ let mut tables = HashMap :: with_capacity ( spec. tables . len ( ) ) ;
244+ for ( table_name, file_ref) in & spec. tables {
245+ let table_define_sql = self . load_table_file ( file_ref) ?;
246+ tables. insert ( table_name. clone ( ) , table_define_sql) ;
247+ }
248+
249+ Ok ( tables)
250+ }
251+
246252 fn resolve_stats (
247253 & self ,
248254 mut spec : TestSpec ,
@@ -255,6 +261,33 @@ impl TestSuite {
255261 Ok ( ( spec. table_statistics , spec. column_statistics ) )
256262 }
257263
264+ fn load_table_file ( & self , file_ref : & str ) -> Result < String > {
265+ let stats_files = self . find_files ( "tables" , & [ "sql" ] ) ;
266+ let target_stem = Path :: new ( file_ref)
267+ . file_stem ( )
268+ . and_then ( |s| s. to_str ( ) )
269+ . unwrap_or ( file_ref) ;
270+
271+ stats_files
272+ . iter ( )
273+ . find ( |path| {
274+ path. file_stem ( )
275+ . and_then ( |s| s. to_str ( ) )
276+ . map ( |s| s == target_stem || s. ends_with ( target_stem) )
277+ . unwrap_or ( false )
278+ } )
279+ . ok_or_else ( || {
280+ ErrorCode :: Internal ( format ! (
281+ "Tables file not found: {}, in {:?}" ,
282+ file_ref, stats_files
283+ ) )
284+ } )
285+ . and_then ( |path| {
286+ fs:: read_to_string ( path)
287+ . map_err ( |e| ErrorCode :: Internal ( format ! ( "Failed to read file: {}" , e) ) )
288+ } )
289+ }
290+
258291 fn load_stats_file ( & self , file_ref : & str ) -> Result < StatsFile > {
259292 let stats_files = self . find_files ( "statistics" , & [ "yaml" , "yml" ] ) ;
260293 let target_stem = Path :: new ( file_ref)
@@ -424,10 +457,6 @@ async fn test_optimizer() -> Result<()> {
424457
425458 let suite = TestSuite :: new ( base_path. clone ( ) , subdir) ;
426459 let fixture = TestFixture :: setup ( ) . await ?;
427- let ctx = fixture. new_query_ctx ( ) . await ?;
428- ctx. get_settings ( ) . set_enable_auto_materialize_cte ( 0 ) ?;
429-
430- suite. setup_tables ( & ctx) . await ?;
431460
432461 let cases = suite. load_cases ( ) ?;
433462 if cases. is_empty ( ) {
@@ -438,17 +467,79 @@ async fn test_optimizer() -> Result<()> {
438467 let mut root_mint = Mint :: new ( & results_dir) ;
439468 let mut subdir_mints: HashMap < String , Mint > = HashMap :: new ( ) ;
440469
470+ let local_id = GlobalUniqName :: unique ( ) ;
471+ let standalone_cluster = Cluster :: create ( vec ! [ create_node( & local_id) ] , local_id) ;
472+
441473 for case in cases {
442474 println ! ( "\n ========== Testing: {} ==========" , case. name) ;
443475
476+ let ctx = fixture. new_query_ctx ( ) . await ?;
477+ ctx. get_settings ( ) . set_enable_auto_materialize_cte ( 0 ) ?;
478+
479+ ctx. set_cluster ( standalone_cluster. clone ( ) ) ;
480+
481+ if let Some ( nodes) = case. node_num {
482+ let mut nodes_info = Vec :: with_capacity ( nodes as usize ) ;
483+ for _ in 0 ..nodes - 1 {
484+ nodes_info. push ( create_node ( & GlobalUniqName :: unique ( ) ) ) ;
485+ }
486+
487+ let local_id = GlobalUniqName :: unique ( ) ;
488+ nodes_info. push ( create_node ( & local_id) ) ;
489+ ctx. set_cluster ( Cluster :: create ( nodes_info, local_id) )
490+ }
491+
492+ setup_tables ( & ctx, & case) . await ?;
444493 run_test_case ( & ctx, & case, & mut root_mint, & mut subdir_mints, & results_dir) . await ?;
445494
495+ clean_tables ( & ctx, & case) . await ?;
496+ ctx. set_cluster ( standalone_cluster. clone ( ) ) ;
446497 println ! ( "✅ {} test passed!" , case. name) ;
447498 }
448499
449500 Ok ( ( ) )
450501}
451502
503+ fn create_node ( local_id : & str ) -> Arc < NodeInfo > {
504+ let mut node_info = NodeInfo :: create (
505+ local_id. to_string ( ) ,
506+ String :: new ( ) ,
507+ 0 ,
508+ String :: new ( ) ,
509+ String :: new ( ) ,
510+ String :: new ( ) ,
511+ String :: new ( ) ,
512+ String :: new ( ) ,
513+ ) ;
514+ node_info. cluster_id = "cluster_id" . to_string ( ) ;
515+ node_info. warehouse_id = "warehouse_id" . to_string ( ) ;
516+ Arc :: new ( node_info)
517+ }
518+
519+ async fn setup_tables ( ctx : & Arc < QueryContext > , case : & TestCase ) -> Result < ( ) > {
520+ for sql in case. tables . values ( ) {
521+ for statement in sql. split ( ';' ) . filter ( |s| !s. trim ( ) . is_empty ( ) ) {
522+ match execute_sql ( ctx, statement) . await {
523+ Ok ( _) => { }
524+ Err ( e) if e. code ( ) == ErrorCode :: TABLE_ALREADY_EXISTS => {
525+ // Ignore table already exists errors
526+ continue ;
527+ }
528+ Err ( e) => return Err ( e) ,
529+ }
530+ }
531+ }
532+
533+ Ok ( ( ) )
534+ }
535+
536+ async fn clean_tables ( ctx : & Arc < QueryContext > , case : & TestCase ) -> Result < ( ) > {
537+ for table_name in case. tables . keys ( ) {
538+ execute_sql ( ctx, & format ! ( "DROP TABLE {}" , table_name) ) . await ?;
539+ }
540+ Ok ( ( ) )
541+ }
542+
452543async fn run_test_case (
453544 ctx : & Arc < QueryContext > ,
454545 case : & TestCase ,
0 commit comments