@@ -663,11 +663,10 @@ impl DataSource for FileScanConfig {
663663
664664 fn eq_properties ( & self ) -> EquivalenceProperties {
665665 let schema = self . file_source . table_schema ( ) . table_schema ( ) ;
666- let mut eq_properties = EquivalenceProperties :: new_with_orderings (
667- Arc :: clone ( schema) ,
668- self . output_ordering . clone ( ) ,
669- )
670- . with_constraints ( self . constraints . clone ( ) ) ;
666+ let orderings = project_orderings ( & self . output_ordering , schema) ;
667+ let mut eq_properties =
668+ EquivalenceProperties :: new_with_orderings ( Arc :: clone ( schema) , orderings)
669+ . with_constraints ( self . constraints . clone ( ) ) ;
671670
672671 if let Some ( filter) = self . file_source . filter ( ) {
673672 // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
@@ -1349,7 +1348,7 @@ mod tests {
13491348 verify_sort_integrity,
13501349 } ;
13511350
1352- use arrow:: datatypes:: Field ;
1351+ use arrow:: datatypes:: { Field , Schema , TimeUnit } ;
13531352 use datafusion_common:: stats:: Precision ;
13541353 use datafusion_common:: { ColumnStatistics , internal_err} ;
13551354 use datafusion_expr:: { Operator , SortExpr } ;
@@ -1845,6 +1844,62 @@ mod tests {
18451844 }
18461845 }
18471846
1847+ #[ test]
1848+ fn equivalence_properties_reindex_output_ordering_for_partition_cols ( ) {
1849+ let file_schema = Arc :: new ( Schema :: new ( vec ! [
1850+ Field :: new( "f" , DataType :: Float64 , true ) ,
1851+ Field :: new(
1852+ "time" ,
1853+ DataType :: Timestamp ( TimeUnit :: Nanosecond , None ) ,
1854+ false ,
1855+ ) ,
1856+ ] ) ) ;
1857+ let object_store_url = ObjectStoreUrl :: parse ( "test:///" ) . unwrap ( ) ;
1858+
1859+ let table_schema = TableSchema :: new (
1860+ Arc :: clone ( & file_schema) ,
1861+ vec ! [ Arc :: new( Field :: new(
1862+ "tag1" ,
1863+ wrap_partition_type_in_dict( DataType :: Utf8 ) ,
1864+ true ,
1865+ ) ) ] ,
1866+ ) ;
1867+
1868+ let file_source: Arc < dyn FileSource > =
1869+ Arc :: new ( MockSource :: new ( table_schema. clone ( ) ) ) ;
1870+
1871+ let config = FileScanConfigBuilder :: new (
1872+ object_store_url. clone ( ) ,
1873+ Arc :: clone ( & file_source) ,
1874+ )
1875+ . with_projection_indices ( Some ( vec ! [ 2 , 0 , 1 ] ) )
1876+ . unwrap ( )
1877+ . with_output_ordering ( vec ! [
1878+ [
1879+ PhysicalSortExpr :: new_default( Arc :: new( Column :: new( "tag1" , 0 ) ) ) ,
1880+ PhysicalSortExpr :: new_default( Arc :: new( Column :: new( "time" , 1 ) ) ) ,
1881+ ]
1882+ . into( ) ,
1883+ ] )
1884+ . build ( ) ;
1885+
1886+ let eq_properties = config. eq_properties ( ) ;
1887+ let ordering = eq_properties
1888+ . output_ordering ( )
1889+ . expect ( "expected output ordering" ) ;
1890+
1891+ let mut iter = ordering. iter ( ) ;
1892+ let first = iter. next ( ) . unwrap ( ) ;
1893+ let first_col = first. expr . as_any ( ) . downcast_ref :: < Column > ( ) . unwrap ( ) ;
1894+ assert_eq ! ( first_col. name( ) , "tag1" ) ;
1895+ assert_eq ! ( first_col. index( ) , 0 ) ;
1896+
1897+ let second = iter. next ( ) . unwrap ( ) ;
1898+ let second_col = second. expr . as_any ( ) . downcast_ref :: < Column > ( ) . unwrap ( ) ;
1899+ assert_eq ! ( second_col. name( ) , "time" ) ;
1900+ assert_eq ! ( second_col. index( ) , 2 ) ;
1901+ }
1902+
18481903 #[ test]
18491904 fn test_file_scan_config_builder_defaults ( ) {
18501905 let file_schema = aggr_test_schema ( ) ;
0 commit comments