Skip to content

Commit b2b4ca2

Browse files
committed
fix: Use table schema rather than file schema
1 parent f35a475 commit b2b4ca2

File tree

1 file changed

+61
-6
lines changed

1 file changed

+61
-6
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -663,11 +663,10 @@ impl DataSource for FileScanConfig {
663663

664664
fn eq_properties(&self) -> EquivalenceProperties {
665665
let schema = self.file_source.table_schema().table_schema();
666-
let mut eq_properties = EquivalenceProperties::new_with_orderings(
667-
Arc::clone(schema),
668-
self.output_ordering.clone(),
669-
)
670-
.with_constraints(self.constraints.clone());
666+
let orderings = project_orderings(&self.output_ordering, schema);
667+
let mut eq_properties =
668+
EquivalenceProperties::new_with_orderings(Arc::clone(schema), orderings)
669+
.with_constraints(self.constraints.clone());
671670

672671
if let Some(filter) = self.file_source.filter() {
673672
// We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
@@ -1349,7 +1348,7 @@ mod tests {
13491348
verify_sort_integrity,
13501349
};
13511350

1352-
use arrow::datatypes::Field;
1351+
use arrow::datatypes::{Field, Schema, TimeUnit};
13531352
use datafusion_common::stats::Precision;
13541353
use datafusion_common::{ColumnStatistics, internal_err};
13551354
use datafusion_expr::{Operator, SortExpr};
@@ -1845,6 +1844,62 @@ mod tests {
18451844
}
18461845
}
18471846

1847+
#[test]
1848+
fn equivalence_properties_reindex_output_ordering_for_partition_cols() {
1849+
let file_schema = Arc::new(Schema::new(vec![
1850+
Field::new("f", DataType::Float64, true),
1851+
Field::new(
1852+
"time",
1853+
DataType::Timestamp(TimeUnit::Nanosecond, None),
1854+
false,
1855+
),
1856+
]));
1857+
let object_store_url = ObjectStoreUrl::parse("test:///").unwrap();
1858+
1859+
let table_schema = TableSchema::new(
1860+
Arc::clone(&file_schema),
1861+
vec![Arc::new(Field::new(
1862+
"tag1",
1863+
wrap_partition_type_in_dict(DataType::Utf8),
1864+
true,
1865+
))],
1866+
);
1867+
1868+
let file_source: Arc<dyn FileSource> =
1869+
Arc::new(MockSource::new(table_schema.clone()));
1870+
1871+
let config = FileScanConfigBuilder::new(
1872+
object_store_url.clone(),
1873+
Arc::clone(&file_source),
1874+
)
1875+
.with_projection_indices(Some(vec![2, 0, 1]))
1876+
.unwrap()
1877+
.with_output_ordering(vec![
1878+
[
1879+
PhysicalSortExpr::new_default(Arc::new(Column::new("tag1", 0))),
1880+
PhysicalSortExpr::new_default(Arc::new(Column::new("time", 1))),
1881+
]
1882+
.into(),
1883+
])
1884+
.build();
1885+
1886+
let eq_properties = config.eq_properties();
1887+
let ordering = eq_properties
1888+
.output_ordering()
1889+
.expect("expected output ordering");
1890+
1891+
let mut iter = ordering.iter();
1892+
let first = iter.next().unwrap();
1893+
let first_col = first.expr.as_any().downcast_ref::<Column>().unwrap();
1894+
assert_eq!(first_col.name(), "tag1");
1895+
assert_eq!(first_col.index(), 0);
1896+
1897+
let second = iter.next().unwrap();
1898+
let second_col = second.expr.as_any().downcast_ref::<Column>().unwrap();
1899+
assert_eq!(second_col.name(), "time");
1900+
assert_eq!(second_col.index(), 2);
1901+
}
1902+
18481903
#[test]
18491904
fn test_file_scan_config_builder_defaults() {
18501905
let file_schema = aggr_test_schema();

0 commit comments

Comments
 (0)