Skip to content

Commit 899a762

Browse files
adriangbmartin-g
andauthored
Populate partition column statistics for PartitionedFile (#19284)
Superseeds #15865 Part of #16800 The idea here was to remove usage of `SchemaAdapter` and at the same time actually populate the partition column statistics. --------- Co-authored-by: Martin Grigorov <[email protected]>
1 parent 2bea796 commit 899a762

File tree

7 files changed

+296
-311
lines changed

7 files changed

+296
-311
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@ use async_trait::async_trait;
2323
use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
2424
use datafusion_common::stats::Precision;
2525
use datafusion_common::{
26-
Constraints, DataFusionError, SchemaExt, Statistics, internal_datafusion_err,
27-
plan_err, project_schema,
26+
Constraints, SchemaExt, Statistics, internal_datafusion_err, plan_err, project_schema,
2827
};
2928
use datafusion_datasource::file::FileSource;
3029
use datafusion_datasource::file_groups::FileGroup;
3130
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3231
use datafusion_datasource::file_sink_config::FileSinkConfig;
33-
use datafusion_datasource::schema_adapter::{
34-
DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
35-
};
32+
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3633
use datafusion_datasource::{
3734
ListingTableUrl, PartitionedFile, TableSchema, compute_all_files_statistics,
3835
};
@@ -331,20 +328,6 @@ impl ListingTable {
331328
self.schema_adapter_factory.as_ref()
332329
}
333330

334-
/// Creates a schema adapter for mapping between file and table schemas
335-
///
336-
/// Uses the configured schema adapter factory if available, otherwise falls back
337-
/// to the default implementation.
338-
fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
339-
let table_schema = self.schema();
340-
match &self.schema_adapter_factory {
341-
Some(factory) => {
342-
factory.create_with_projected_schema(Arc::clone(&table_schema))
343-
}
344-
None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
345-
}
346-
}
347-
348331
/// Creates a file source and applies schema adapter factory if available
349332
fn create_file_source_with_schema_adapter(
350333
&self,
@@ -359,10 +342,8 @@ impl ListingTable {
359342
);
360343

361344
let mut source = self.options.format.file_source(table_schema);
362-
// Apply schema adapter to source if available
363-
//
345+
// Apply schema adapter to source if available.
364346
// The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
365-
// Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics.
366347
if let Some(factory) = &self.schema_adapter_factory {
367348
source = source.with_schema_adapter_factory(Arc::clone(factory))?;
368349
}
@@ -709,25 +690,17 @@ impl ListingTable {
709690
)
710691
};
711692

712-
let (mut file_groups, mut stats) = compute_all_files_statistics(
693+
let (file_groups, stats) = compute_all_files_statistics(
713694
file_groups,
714695
self.schema(),
715696
self.options.collect_stat,
716697
inexact_stats,
717698
)?;
718699

719-
let schema_adapter = self.create_schema_adapter();
720-
let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
721-
722-
stats.column_statistics =
723-
schema_mapper.map_column_statistics(&stats.column_statistics)?;
724-
file_groups.iter_mut().try_for_each(|file_group| {
725-
if let Some(stat) = file_group.statistics_mut() {
726-
stat.column_statistics =
727-
schema_mapper.map_column_statistics(&stat.column_statistics)?;
728-
}
729-
Ok::<_, DataFusionError>(())
730-
})?;
700+
// Note: Statistics already include both file columns and partition columns.
701+
// PartitionedFile::with_statistics automatically appends exact partition column
702+
// statistics (min=max=partition_value, null_count=0, distinct_count=1) computed
703+
// from partition_values.
731704
Ok(ListFilesResult {
732705
file_groups,
733706
statistics: stats,

datafusion/core/src/datasource/listing/table.rs

Lines changed: 2 additions & 228 deletions
Original file line numberDiff line numberDiff line change
@@ -129,16 +129,13 @@ mod tests {
129129
ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
130130
};
131131
use datafusion_common::{
132-
assert_contains, plan_err,
132+
assert_contains,
133133
stats::Precision,
134134
test_util::{batches_to_string, datafusion_test_data},
135-
ColumnStatistics, DataFusionError, Result, ScalarValue,
135+
DataFusionError, Result, ScalarValue,
136136
};
137137
use datafusion_datasource::file_compression_type::FileCompressionType;
138138
use datafusion_datasource::file_format::FileFormat;
139-
use datafusion_datasource::schema_adapter::{
140-
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
141-
};
142139
use datafusion_datasource::ListingTableUrl;
143140
use datafusion_expr::dml::InsertOp;
144141
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
@@ -147,15 +144,12 @@ mod tests {
147144
use datafusion_physical_expr_common::sort_expr::LexOrdering;
148145
use datafusion_physical_plan::empty::EmptyExec;
149146
use datafusion_physical_plan::{collect, ExecutionPlanProperties};
150-
use rstest::rstest;
151147
use std::collections::HashMap;
152148
use std::io::Write;
153149
use std::sync::Arc;
154150
use tempfile::TempDir;
155151
use url::Url;
156152

157-
const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
158-
159153
/// Creates a test schema with standard field types used in tests
160154
fn create_test_schema() -> SchemaRef {
161155
Arc::new(Schema::new(vec![
@@ -1448,31 +1442,6 @@ mod tests {
14481442
Ok(())
14491443
}
14501444

1451-
#[tokio::test]
1452-
async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
1453-
let ctx = SessionContext::new();
1454-
let table = create_test_listing_table_with_json_and_adapter(
1455-
&ctx,
1456-
false,
1457-
// NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT
1458-
Arc::new(NullStatsAdapterFactory {}),
1459-
)?;
1460-
1461-
let result = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1462-
1463-
assert_eq!(
1464-
result.statistics.column_statistics[0].null_count,
1465-
DUMMY_NULL_COUNT
1466-
);
1467-
for g in result.file_groups {
1468-
if let Some(s) = g.file_statistics(None) {
1469-
assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT);
1470-
}
1471-
}
1472-
1473-
Ok(())
1474-
}
1475-
14761445
#[tokio::test]
14771446
async fn test_statistics_mapping_with_default_factory() -> Result<()> {
14781447
let ctx = SessionContext::new();
@@ -1513,199 +1482,4 @@ mod tests {
15131482

15141483
Ok(())
15151484
}
1516-
1517-
#[rstest]
1518-
#[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
1519-
#[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
1520-
#[case(
1521-
MapSchemaError::InvalidProjection,
1522-
"Invalid projection in schema mapping"
1523-
)]
1524-
#[tokio::test]
1525-
async fn test_schema_adapter_map_schema_errors(
1526-
#[case] error_type: MapSchemaError,
1527-
#[case] expected_error_msg: &str,
1528-
) -> Result<()> {
1529-
let ctx = SessionContext::new();
1530-
let table = create_test_listing_table_with_json_and_adapter(
1531-
&ctx,
1532-
false,
1533-
Arc::new(FailingMapSchemaAdapterFactory { error_type }),
1534-
)?;
1535-
1536-
// The error should bubble up from the scan operation when schema mapping fails
1537-
let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1538-
1539-
assert!(scan_result.is_err());
1540-
let error_msg = scan_result.unwrap_err().to_string();
1541-
assert!(
1542-
error_msg.contains(expected_error_msg),
1543-
"Expected error containing '{expected_error_msg}', got: {error_msg}"
1544-
);
1545-
1546-
Ok(())
1547-
}
1548-
1549-
// Test that errors during file listing also bubble up correctly
1550-
#[tokio::test]
1551-
async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
1552-
let ctx = SessionContext::new();
1553-
let table = create_test_listing_table_with_json_and_adapter(
1554-
&ctx,
1555-
true,
1556-
Arc::new(FailingMapSchemaAdapterFactory {
1557-
error_type: MapSchemaError::TypeIncompatible,
1558-
}),
1559-
)?;
1560-
1561-
// The error should bubble up from list_files_for_scan when collecting statistics
1562-
let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await;
1563-
1564-
assert!(list_result.is_err());
1565-
let error_msg = list_result.unwrap_err().to_string();
1566-
assert!(
1567-
error_msg.contains("Cannot map incompatible types"),
1568-
"Expected type incompatibility error during file listing, got: {error_msg}"
1569-
);
1570-
1571-
Ok(())
1572-
}
1573-
1574-
#[derive(Debug, Copy, Clone)]
1575-
enum MapSchemaError {
1576-
TypeIncompatible,
1577-
GeneralFailure,
1578-
InvalidProjection,
1579-
}
1580-
1581-
#[derive(Debug)]
1582-
struct FailingMapSchemaAdapterFactory {
1583-
error_type: MapSchemaError,
1584-
}
1585-
1586-
impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
1587-
fn create(
1588-
&self,
1589-
projected_table_schema: SchemaRef,
1590-
_table_schema: SchemaRef,
1591-
) -> Box<dyn SchemaAdapter> {
1592-
Box::new(FailingMapSchemaAdapter {
1593-
schema: projected_table_schema,
1594-
error_type: self.error_type,
1595-
})
1596-
}
1597-
}
1598-
1599-
#[derive(Debug)]
1600-
struct FailingMapSchemaAdapter {
1601-
schema: SchemaRef,
1602-
error_type: MapSchemaError,
1603-
}
1604-
1605-
impl SchemaAdapter for FailingMapSchemaAdapter {
1606-
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1607-
let field = self.schema.field(index);
1608-
file_schema.fields.find(field.name()).map(|(i, _)| i)
1609-
}
1610-
1611-
fn map_schema(
1612-
&self,
1613-
_file_schema: &Schema,
1614-
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1615-
// Always fail with different error types based on the configured error_type
1616-
match self.error_type {
1617-
MapSchemaError::TypeIncompatible => {
1618-
plan_err!(
1619-
"Cannot map incompatible types: Boolean cannot be cast to Utf8"
1620-
)
1621-
}
1622-
MapSchemaError::GeneralFailure => {
1623-
plan_err!("Schema adapter mapping failed due to internal error")
1624-
}
1625-
MapSchemaError::InvalidProjection => {
1626-
plan_err!("Invalid projection in schema mapping: column index out of bounds")
1627-
}
1628-
}
1629-
}
1630-
}
1631-
1632-
#[derive(Debug)]
1633-
struct NullStatsAdapterFactory;
1634-
1635-
impl SchemaAdapterFactory for NullStatsAdapterFactory {
1636-
fn create(
1637-
&self,
1638-
projected_table_schema: SchemaRef,
1639-
_table_schema: SchemaRef,
1640-
) -> Box<dyn SchemaAdapter> {
1641-
Box::new(NullStatsAdapter {
1642-
schema: projected_table_schema,
1643-
})
1644-
}
1645-
}
1646-
1647-
#[derive(Debug)]
1648-
struct NullStatsAdapter {
1649-
schema: SchemaRef,
1650-
}
1651-
1652-
impl SchemaAdapter for NullStatsAdapter {
1653-
fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1654-
let field = self.schema.field(index);
1655-
file_schema.fields.find(field.name()).map(|(i, _)| i)
1656-
}
1657-
1658-
fn map_schema(
1659-
&self,
1660-
file_schema: &Schema,
1661-
) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1662-
let projection = (0..file_schema.fields().len()).collect();
1663-
Ok((Arc::new(NullStatsMapper {}), projection))
1664-
}
1665-
}
1666-
1667-
#[derive(Debug)]
1668-
struct NullStatsMapper;
1669-
1670-
impl SchemaMapper for NullStatsMapper {
1671-
fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
1672-
Ok(batch)
1673-
}
1674-
1675-
fn map_column_statistics(
1676-
&self,
1677-
stats: &[ColumnStatistics],
1678-
) -> Result<Vec<ColumnStatistics>> {
1679-
Ok(stats
1680-
.iter()
1681-
.map(|s| {
1682-
let mut s = s.clone();
1683-
s.null_count = DUMMY_NULL_COUNT;
1684-
s
1685-
})
1686-
.collect())
1687-
}
1688-
}
1689-
1690-
/// Helper function to create a test ListingTable with JSON format and custom schema adapter factory
1691-
fn create_test_listing_table_with_json_and_adapter(
1692-
ctx: &SessionContext,
1693-
collect_stat: bool,
1694-
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
1695-
) -> Result<ListingTable> {
1696-
let path = "table/file.json";
1697-
register_test_store(ctx, &[(path, 10)]);
1698-
1699-
let format = JsonFormat::default();
1700-
let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
1701-
let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1702-
let table_path = ListingTableUrl::parse("test:///table/")?;
1703-
1704-
let config = ListingTableConfig::new(table_path)
1705-
.with_listing_options(opt)
1706-
.with_schema(Arc::new(schema))
1707-
.with_schema_adapter_factory(schema_adapter_factory);
1708-
1709-
ListingTable::try_new(config)
1710-
}
17111485
}

0 commit comments

Comments
 (0)