11use crate :: metastore:: Column ;
22use crate :: queryplanner:: metadata_cache:: MetadataCacheFactory ;
3+ use crate :: queryplanner:: pretty_printers:: { pp_plan_ext, PPOptions } ;
34use crate :: queryplanner:: { sql_to_rel_options, QueryPlannerImpl } ;
45use crate :: sql:: MySqlDialectWithBackTicks ;
56use crate :: streaming:: topic_table_provider:: TopicTableProvider ;
@@ -425,6 +426,12 @@ impl KafkaPostProcessPlanner {
425426 & self ,
426427 plan : & LogicalPlan ,
427428 ) -> Result < ( Arc < dyn ExecutionPlan > , Option < Arc < dyn ExecutionPlan > > ) , CubeError > {
429+ fn only_certain_plans_allowed_error ( plan : & LogicalPlan ) -> CubeError {
430+ CubeError :: user (
431+ format ! ( "Only Projection > [Filter] > TableScan plans are allowed for streaming; got plan {}" , pp_plan_ext( plan, & PPOptions :: show_all( ) ) ) ,
432+ )
433+ }
434+
428435 let source_schema = Arc :: new ( Schema :: new (
429436 self . source_columns
430437 . iter ( )
@@ -465,10 +472,7 @@ impl KafkaPostProcessPlanner {
465472
466473 Ok ( ( projection_phys_plan. clone ( ) , Some ( filter_phys_plan) ) )
467474 }
468- _ => Err ( CubeError :: user (
469- "Only Projection > [Filter] > TableScan plans are allowed for streaming"
470- . to_string ( ) ,
471- ) ) ,
475+ _ => Err ( only_certain_plans_allowed_error ( plan) ) ,
472476 } ,
473477 LogicalPlan :: TableScan { .. } => {
474478 let projection_plan =
@@ -484,15 +488,9 @@ impl KafkaPostProcessPlanner {
484488 . with_new_children ( vec ! [ empty_exec. clone( ) ] ) ?;
485489 Ok ( ( projection_phys_plan, None ) )
486490 }
487- _ => Err ( CubeError :: user (
488- "Only Projection > [Filter] > TableScan plans are allowed for streaming"
489- . to_string ( ) ,
490- ) ) ,
491+ _ => Err ( only_certain_plans_allowed_error ( plan) ) ,
491492 } ,
492- _ => Err ( CubeError :: user (
493- "Only Projection > [Filter] > TableScan plans are allowed for streaming"
494- . to_string ( ) ,
495- ) ) ,
493+ _ => Err ( only_certain_plans_allowed_error ( plan) ) ,
496494 }
497495 }
498496
0 commit comments