Skip to content

Commit fd45ae8

Browse files
committed
fix
1 parent 1fd6a01 commit fd45ae8

File tree

1 file changed

+175
-4
lines changed

1 file changed

+175
-4
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 175 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -824,11 +824,15 @@ fn should_enable_page_index(
824824
#[cfg(test)]
825825
mod test {
826826
use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory};
827-
use arrow::datatypes::{DataType, Field, Schema};
827+
use arrow::compute::cast;
828+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
828829
use bytes::{BufMut, BytesMut};
829830
use datafusion_common::{
830-
record_batch, stats::Precision, ColumnStatistics, DataFusionError, ScalarValue,
831-
Statistics,
831+
assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
832+
DataFusionError, ScalarValue, Statistics,
833+
};
834+
use datafusion_datasource::schema_adapter::{
835+
SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
832836
};
833837
use datafusion_datasource::{
834838
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
@@ -839,12 +843,13 @@ mod test {
839843
expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
840844
};
841845
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
842-
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
846+
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
843847
use futures::{Stream, StreamExt};
844848
use object_store::{memory::InMemory, path::Path, ObjectStore};
845849
use parquet::arrow::ArrowWriter;
846850
use parquet::file::properties::WriterProperties;
847851
use std::sync::Arc;
852+
848853
async fn count_batches_and_rows(
849854
mut stream: std::pin::Pin<
850855
Box<
@@ -1357,6 +1362,172 @@ mod test {
13571362
assert_eq!(num_batches, 0);
13581363
assert_eq!(num_rows, 0);
13591364
}
1365+
async fn collect_batches(
1366+
mut stream: std::pin::Pin<
1367+
Box<
1368+
dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
1369+
+ Send,
1370+
>,
1371+
>,
1372+
) -> Vec<arrow::array::RecordBatch> {
1373+
let mut batches = vec![];
1374+
while let Some(Ok(batch)) = stream.next().await {
1375+
batches.push(batch);
1376+
}
1377+
batches
1378+
}
1379+
1380+
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
1381+
match metrics.sum_by_name(metric_name) {
1382+
Some(v) => v.as_usize(),
1383+
_ => {
1384+
panic!(
1385+
"Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1386+
);
1387+
}
1388+
}
1389+
}
1390+
1391+
#[tokio::test]
1392+
async fn test_custom_schema_adapter_no_rewriter() {
1393+
// Make a hardcoded schema adapter that adds a new column "b" with default value 0.0
1394+
// and converts the first column "a" from Int32 to UInt64.
1395+
#[derive(Debug, Clone)]
1396+
struct CustomSchemaMapper;
1397+
1398+
impl SchemaMapper for CustomSchemaMapper {
1399+
fn map_batch(
1400+
&self,
1401+
batch: arrow::array::RecordBatch,
1402+
) -> datafusion_common::Result<arrow::array::RecordBatch> {
1403+
let a_column = cast(batch.column(0), &DataType::UInt64)?;
1404+
// Add in a new column "b" with default value 0.0
1405+
let b_column =
1406+
arrow::array::Float64Array::from(vec![Some(0.0); batch.num_rows()]);
1407+
let columns = vec![a_column, Arc::new(b_column)];
1408+
let new_schema = Arc::new(Schema::new(vec![
1409+
Field::new("a", DataType::UInt64, false),
1410+
Field::new("b", DataType::Float64, false),
1411+
]));
1412+
Ok(arrow::record_batch::RecordBatch::try_new(
1413+
new_schema, columns,
1414+
)?)
1415+
}
1416+
1417+
fn map_column_statistics(
1418+
&self,
1419+
file_col_statistics: &[ColumnStatistics],
1420+
) -> datafusion_common::Result<Vec<ColumnStatistics>> {
1421+
Ok(vec![
1422+
file_col_statistics[0].clone(),
1423+
ColumnStatistics::new_unknown(),
1424+
])
1425+
}
1426+
}
1427+
1428+
#[derive(Debug, Clone)]
1429+
struct CustomSchemaAdapter;
1430+
1431+
impl SchemaAdapter for CustomSchemaAdapter {
1432+
fn map_schema(
1433+
&self,
1434+
_file_schema: &Schema,
1435+
) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>
1436+
{
1437+
let mapper = Arc::new(CustomSchemaMapper);
1438+
let projection = vec![0]; // We only need to read the first column "a" from the file
1439+
Ok((mapper, projection))
1440+
}
1441+
1442+
fn map_column_index(
1443+
&self,
1444+
index: usize,
1445+
file_schema: &Schema,
1446+
) -> Option<usize> {
1447+
if index < file_schema.fields().len() {
1448+
Some(index)
1449+
} else {
1450+
None // The new column "b" is not in the original schema
1451+
}
1452+
}
1453+
}
1454+
1455+
#[derive(Debug, Clone)]
1456+
struct CustomSchemaAdapterFactory;
1457+
1458+
impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
1459+
fn create(
1460+
&self,
1461+
_projected_table_schema: SchemaRef,
1462+
_table_schema: SchemaRef,
1463+
) -> Box<dyn SchemaAdapter> {
1464+
Box::new(CustomSchemaAdapter)
1465+
}
1466+
}
1467+
1468+
// Test that if no expression rewriter is provided we use a schemaadapter to adapt the data to the expression
1469+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1470+
let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1471+
// Write out the batch to a Parquet file
1472+
let data_size =
1473+
write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
1474+
let file = PartitionedFile::new(
1475+
"test.parquet".to_string(),
1476+
u64::try_from(data_size).unwrap(),
1477+
);
1478+
let table_schema = Arc::new(Schema::new(vec![
1479+
Field::new("a", DataType::UInt64, false),
1480+
Field::new("b", DataType::Float64, false),
1481+
]));
1482+
1483+
let make_opener = |predicate| ParquetOpener {
1484+
partition_index: 0,
1485+
projection: Arc::new([0, 1]),
1486+
batch_size: 1024,
1487+
limit: None,
1488+
predicate: Some(predicate),
1489+
logical_file_schema: Arc::clone(&table_schema),
1490+
metadata_size_hint: None,
1491+
metrics: ExecutionPlanMetricsSet::new(),
1492+
parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(
1493+
Arc::clone(&store),
1494+
)),
1495+
partition_fields: vec![],
1496+
pushdown_filters: true,
1497+
reorder_filters: false,
1498+
enable_page_index: false,
1499+
enable_bloom_filter: false,
1500+
enable_limit_pruning: false,
1501+
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
1502+
enable_row_group_stats_pruning: false,
1503+
coerce_int96: None,
1504+
#[cfg(feature = "parquet_encryption")]
1505+
file_decryption_properties: None,
1506+
expr_adapter_factory: None,
1507+
#[cfg(feature = "parquet_encryption")]
1508+
encryption_factory: None,
1509+
max_predicate_cache_size: None,
1510+
reverse_row_groups: false,
1511+
};
1512+
1513+
let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
1514+
let opener = make_opener(predicate);
1515+
let stream = opener.open(file.clone()).unwrap().await.unwrap();
1516+
let batches = collect_batches(stream).await;
1517+
1518+
#[rustfmt::skip]
1519+
let expected = [
1520+
"+---+-----+",
1521+
"| a | b |",
1522+
"+---+-----+",
1523+
"| 1 | 0.0 |",
1524+
"+---+-----+",
1525+
];
1526+
assert_batches_eq!(expected, &batches);
1527+
let metrics = opener.metrics.clone_inner();
1528+
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1529+
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
1530+
}
13601531

13611532
#[tokio::test]
13621533
async fn test_reverse_scan_row_groups() {

0 commit comments

Comments
 (0)