Skip to content

Commit dc47394

Browse files
committed
fix: Use table schema rather than file schema
1 parent 569e880 commit dc47394

File tree

1 file changed

+57
-2
lines changed

1 file changed

+57
-2
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,9 +663,10 @@ impl DataSource for FileScanConfig {
663663

664664
fn eq_properties(&self) -> EquivalenceProperties {
665665
let schema = self.file_source.table_schema().table_schema();
666+
let orderings = project_orderings(&self.output_ordering, schema);
666667
let mut eq_properties = EquivalenceProperties::new_with_orderings(
667668
Arc::clone(schema),
668-
self.output_ordering.clone(),
669+
orderings,
669670
)
670671
.with_constraints(self.constraints.clone());
671672

@@ -1349,7 +1350,7 @@ mod tests {
13491350
verify_sort_integrity,
13501351
};
13511352

1352-
use arrow::datatypes::Field;
1353+
use arrow::datatypes::{Field, Schema, TimeUnit};
13531354
use datafusion_common::stats::Precision;
13541355
use datafusion_common::{ColumnStatistics, internal_err};
13551356
use datafusion_expr::{Operator, SortExpr};
@@ -1845,6 +1846,60 @@ mod tests {
18451846
}
18461847
}
18471848

1849+
#[test]
1850+
fn equivalence_properties_reindex_output_ordering_for_partition_cols() {
1851+
let file_schema = Arc::new(Schema::new(vec![
1852+
Field::new("f", DataType::Float64, true),
1853+
Field::new(
1854+
"time",
1855+
DataType::Timestamp(TimeUnit::Nanosecond, None),
1856+
false,
1857+
),
1858+
]));
1859+
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1860+
1861+
let table_schema = TableSchema::new(
1862+
Arc::clone(&file_schema),
1863+
vec![Arc::new(Field::new(
1864+
"tag1",
1865+
wrap_partition_type_in_dict(DataType::Utf8),
1866+
true,
1867+
))],
1868+
);
1869+
1870+
let file_source: Arc<dyn FileSource> =
1871+
Arc::new(MockSource::new(table_schema.clone()));
1872+
1873+
let config = FileScanConfigBuilder::new(
1874+
object_store_url.clone(),
1875+
Arc::clone(&file_source),
1876+
)
1877+
.with_projection_indices(Some(vec![2, 0, 1]))
1878+
.unwrap()
1879+
.with_output_ordering(vec![[
1880+
PhysicalSortExpr::new_default(Arc::new(Column::new("tag1", 0))),
1881+
PhysicalSortExpr::new_default(Arc::new(Column::new("time", 1))),
1882+
]
1883+
.into()])
1884+
.build();
1885+
1886+
let eq_properties = config.eq_properties();
1887+
let ordering = eq_properties
1888+
.output_ordering()
1889+
.expect("expected output ordering");
1890+
1891+
let mut iter = ordering.iter();
1892+
let first = iter.next().unwrap();
1893+
let first_col = first.expr.as_any().downcast_ref::<Column>().unwrap();
1894+
assert_eq!(first_col.name(), "tag1");
1895+
assert_eq!(first_col.index(), 0);
1896+
1897+
let second = iter.next().unwrap();
1898+
let second_col = second.expr.as_any().downcast_ref::<Column>().unwrap();
1899+
assert_eq!(second_col.name(), "time");
1900+
assert_eq!(second_col.index(), 2);
1901+
}
1902+
18481903
#[test]
18491904
fn test_file_scan_config_builder_defaults() {
18501905
let file_schema = aggr_test_schema();

0 commit comments

Comments
 (0)