@@ -111,7 +111,26 @@ impl ExprSchemable for Expr {
111111 _ => expr. get_type ( schema) ,
112112 } ,
113113 Expr :: Negative ( expr) => expr. get_type ( schema) ,
114- Expr :: Column ( c) => Ok ( schema. data_type ( c) ?. clone ( ) ) ,
114+ Expr :: Column ( c) => {
115+ // First try to resolve the column as-is
116+ match schema. data_type ( c) {
117+ Ok ( data_type) => Ok ( data_type. clone ( ) ) ,
118+ Err ( e) => {
119+ // If the column has a qualifier but wasn't found, try without the qualifier
120+ // This handles cases where aggregations produce unqualified schemas
121+ // but subsequent operations still reference the qualified names
122+ if c. relation . is_some ( ) {
123+ let unqualified = Column :: new_unqualified ( & c. name ) ;
124+ match schema. data_type ( & unqualified) {
125+ Ok ( data_type) => Ok ( data_type. clone ( ) ) ,
126+ Err ( _) => Err ( e) , // Return the original error
127+ }
128+ } else {
129+ Err ( e)
130+ }
131+ }
132+ }
133+ }
115134 Expr :: OuterReferenceColumn ( field, _) => Ok ( field. data_type ( ) . clone ( ) ) ,
116135 Expr :: ScalarVariable ( ty, _) => Ok ( ty. clone ( ) ) ,
117136 Expr :: Literal ( l, _) => Ok ( l. data_type ( ) ) ,
@@ -275,7 +294,26 @@ impl ExprSchemable for Expr {
275294 || low. nullable ( input_schema) ?
276295 || high. nullable ( input_schema) ?) ,
277296
278- Expr :: Column ( c) => input_schema. nullable ( c) ,
297+ Expr :: Column ( c) => {
298+ // First try to resolve the column as-is
299+ match input_schema. nullable ( c) {
300+ Ok ( nullable) => Ok ( nullable) ,
301+ Err ( e) => {
302+ // If the column has a qualifier but wasn't found, try without the qualifier
303+ // This handles cases where aggregations produce unqualified schemas
304+ // but subsequent operations still reference the qualified names
305+ if c. relation . is_some ( ) {
306+ let unqualified = Column :: new_unqualified ( & c. name ) ;
307+ match input_schema. nullable ( & unqualified) {
308+ Ok ( nullable) => Ok ( nullable) ,
309+ Err ( _) => Err ( e) , // Return the original error
310+ }
311+ } else {
312+ Err ( e)
313+ }
314+ }
315+ }
316+ }
279317 Expr :: OuterReferenceColumn ( field, _) => Ok ( field. is_nullable ( ) ) ,
280318 Expr :: Literal ( value, _) => Ok ( value. is_null ( ) ) ,
281319 Expr :: Case ( case) => {
@@ -777,9 +815,12 @@ pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subq
777815#[ cfg( test) ]
778816mod tests {
779817 use super :: * ;
818+ use crate :: test:: function_stub:: avg;
780819 use crate :: { col, lit, out_ref_col_with_metadata} ;
781820
782- use datafusion_common:: { internal_err, DFSchema , HashMap , ScalarValue } ;
821+ use datafusion_common:: {
822+ internal_err, Column , DFSchema , HashMap , ScalarValue , TableReference ,
823+ } ;
783824
784825 macro_rules! test_is_expr_nullable {
785826 ( $EXPR_TYPE: ident) => { {
@@ -881,6 +922,126 @@ mod tests {
881922 ) ;
882923 }
883924
925+ #[ test]
926+ fn test_qualified_column_after_aggregation ( ) {
927+ // Test for qualified column reference resolution after aggregation
928+ // This test verifies the fix for the issue where binary expressions
929+ // fail when referencing qualified column names after aggregation
930+ // produces unqualified schemas.
931+
932+ // Create a schema that simulates the result of an aggregation
933+ // where the output field is unqualified (just "value")
934+ let unqualified_schema = DFSchema :: from_unqualified_fields (
935+ vec ! [ Field :: new( "value" , DataType :: Float64 , false ) ] . into ( ) ,
936+ std:: collections:: HashMap :: new ( ) ,
937+ )
938+ . unwrap ( ) ;
939+
940+ // Create a qualified column reference as would be produced
941+ // in a query like: avg(memory_usage_bytes) / 1024
942+ // where the aggregation produces "value" but the binary expression
943+ // still references the original qualified name
944+ let qualified_col = col ( "memory_usage_bytes.value" ) ;
945+
946+ // Before the fix, this would fail with:
947+ // "No field named memory_usage_bytes.value. Valid fields are value."
948+ // After the fix, it should successfully resolve to the unqualified "value" field
949+ let data_type = qualified_col. get_type ( & unqualified_schema) . unwrap ( ) ;
950+ assert_eq ! ( data_type, DataType :: Float64 ) ;
951+
952+ // Test nullable resolution as well
953+ let nullable = qualified_col. nullable ( & unqualified_schema) . unwrap ( ) ;
954+ assert ! ( !nullable) ;
955+
956+ // Test with binary expression
957+ let expr = qualified_col / lit ( 1024 ) ;
958+ let data_type = expr. get_type ( & unqualified_schema) . unwrap ( ) ;
959+ assert_eq ! ( data_type, DataType :: Float64 ) ;
960+ }
961+
962+ #[ test]
963+ fn test_qualified_column_fallback_behavior ( ) {
964+ // Test that the fallback only happens for qualified columns and preserves error messages
965+ let unqualified_schema = DFSchema :: from_unqualified_fields (
966+ vec ! [ Field :: new( "existing_col" , DataType :: Int32 , true ) ] . into ( ) ,
967+ std:: collections:: HashMap :: new ( ) ,
968+ )
969+ . unwrap ( ) ;
970+
971+ // Test 1: Qualified column that exists unqualified should work
972+ let qualified_existing = col ( "table.existing_col" ) ;
973+ assert ! ( qualified_existing. get_type( & unqualified_schema) . is_ok( ) ) ;
974+ assert ! ( qualified_existing. nullable( & unqualified_schema) . is_ok( ) ) ;
975+
976+ // Test 2: Qualified column that doesn't exist should return original error
977+ let qualified_nonexistent = col ( "table.nonexistent_col" ) ;
978+ let error = qualified_nonexistent
979+ . get_type ( & unqualified_schema)
980+ . unwrap_err ( ) ;
981+ assert ! ( error. to_string( ) . contains( "table.nonexistent_col" ) ) ;
982+
983+ // Test 3: Unqualified column that doesn't exist should return original error (no fallback)
984+ let unqualified_nonexistent = col ( "nonexistent_col" ) ;
985+ let error = unqualified_nonexistent
986+ . get_type ( & unqualified_schema)
987+ . unwrap_err ( ) ;
988+ assert ! ( error. to_string( ) . contains( "nonexistent_col" ) ) ;
989+ // Make sure it's not mentioning a qualified table prefix
990+ assert ! ( !error. to_string( ) . contains( "table.nonexistent_col" ) ) ;
991+ }
992+
993+ #[ test]
994+ fn test_aggregation_scenario ( ) {
995+ // Test a realistic aggregation scenario
996+ use crate :: logical_plan:: builder:: LogicalPlanBuilder ;
997+ use crate :: logical_plan:: builder:: LogicalTableSource ;
998+ use arrow:: datatypes:: Schema ;
999+ use std:: sync:: Arc ;
1000+
1001+ // Create input table schema with qualified columns
1002+ let table_schema = Arc :: new ( Schema :: new ( vec ! [
1003+ Field :: new( "usage_bytes" , DataType :: Int64 , false ) ,
1004+ Field :: new(
1005+ "timestamp" ,
1006+ DataType :: Timestamp ( arrow:: datatypes:: TimeUnit :: Second , None ) ,
1007+ false ,
1008+ ) ,
1009+ ] ) ) ;
1010+
1011+ // Build a plan that does aggregation
1012+ let plan = LogicalPlanBuilder :: scan (
1013+ "metrics" ,
1014+ Arc :: new ( LogicalTableSource :: new ( table_schema) ) ,
1015+ None ,
1016+ )
1017+ . unwrap ( )
1018+ . aggregate (
1019+ Vec :: < Expr > :: new ( ) , // no group by
1020+ vec ! [ avg( col( "metrics.usage_bytes" ) ) ] , // avg with qualified column
1021+ )
1022+ . unwrap ( )
1023+ . build ( )
1024+ . unwrap ( ) ;
1025+
1026+ // Get the output schema from the aggregation
1027+ let agg_schema = plan. schema ( ) ;
1028+
1029+ // The aggregation output should have unqualified column names
1030+ // Let's create a qualified reference to test the fallback mechanism
1031+ let actual_column_name = agg_schema. field ( 0 ) . name ( ) ;
1032+ let qualified_ref =
1033+ Column :: new ( Some ( TableReference :: bare ( "metrics" ) ) , actual_column_name) ;
1034+
1035+ // This should work due to the fallback mechanism
1036+ let result = Expr :: Column ( qualified_ref) . get_type ( agg_schema) ;
1037+ assert ! (
1038+ result. is_ok( ) ,
1039+ "Failed to resolve qualified column after aggregation: {:?}" ,
1040+ result. err( )
1041+ ) ;
1042+ assert_eq ! ( result. unwrap( ) , DataType :: Float64 ) ;
1043+ }
1044+
8841045 #[ test]
8851046 fn test_expr_metadata ( ) {
8861047 let mut meta = HashMap :: new ( ) ;
0 commit comments