@@ -13,7 +13,7 @@ use datafusion::common;
1313use datafusion:: common:: { DFSchema , DFSchemaRef } ;
1414use datafusion:: config:: ConfigOptions ;
1515use datafusion:: logical_expr:: expr:: { Alias , ScalarFunction } ;
16- use datafusion:: logical_expr:: { Expr , Filter , LogicalPlan , Projection } ;
16+ use datafusion:: logical_expr:: { Expr , Filter , LogicalPlan , Projection , SubqueryAlias } ;
1717use datafusion:: physical_plan:: empty:: EmptyExec ;
1818use datafusion:: physical_plan:: { collect, ExecutionPlan } ;
1919use datafusion:: sql:: parser:: Statement as DFStatement ;
@@ -431,6 +431,14 @@ impl KafkaPostProcessPlanner {
431431 format ! ( "Only Projection > [Filter] > TableScan plans are allowed for streaming; got plan {}" , pp_plan_ext( plan, & PPOptions :: show_all( ) ) ) ,
432432 )
433433 }
434+ fn remove_subquery_alias_around_table_scan ( plan : & LogicalPlan ) -> & LogicalPlan {
435+ if let LogicalPlan :: SubqueryAlias ( SubqueryAlias { input, .. } ) = plan {
436+ if matches ! ( input. as_ref( ) , LogicalPlan :: TableScan { .. } ) {
437+ return input. as_ref ( ) ;
438+ }
439+ }
440+ return plan;
441+ }
434442
435443 let source_schema = Arc :: new ( Schema :: new (
436444 self . source_columns
@@ -445,8 +453,8 @@ impl KafkaPostProcessPlanner {
445453 expr,
446454 schema,
447455 ..
448- } ) => match projection_input. as_ref ( ) {
449- filter_plan @ LogicalPlan :: Filter ( Filter { input, .. } ) => match input. as_ref ( ) {
456+ } ) => match remove_subquery_alias_around_table_scan ( projection_input. as_ref ( ) ) {
457+ filter_plan @ LogicalPlan :: Filter ( Filter { input, .. } ) => match remove_subquery_alias_around_table_scan ( input. as_ref ( ) ) {
450458 LogicalPlan :: TableScan { .. } => {
451459 let projection_plan = self . make_projection_plan (
452460 expr,
0 commit comments