Skip to content

Commit 617b1ef

Browse files
committed
fix
1 parent 733db4c commit 617b1ef

File tree

10 files changed

+29
-117
lines changed

10 files changed

+29
-117
lines changed

datafusion/core/tests/physical_optimizer/pushdown_sort.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ fn test_prefix_match_through_transparent_nodes() {
240240
OptimizationTest:
241241
input:
242242
- SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
243-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
243+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
244244
- CoalesceBatchesExec: target_batch_size=1024
245245
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 DESC NULLS LAST, b@1 ASC, c@2 DESC NULLS LAST], file_type=parquet
246246
output:
@@ -365,7 +365,7 @@ fn test_sort_through_repartition() {
365365
OptimizationTest:
366366
input:
367367
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
368-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
368+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
369369
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
370370
output:
371371
Ok:
@@ -459,7 +459,7 @@ fn test_sort_through_coalesce_partitions() {
459459
input:
460460
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
461461
- CoalescePartitionsExec
462-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
462+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
463463
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
464464
output:
465465
Ok:
@@ -492,7 +492,7 @@ fn test_complex_plan_with_multiple_operators() {
492492
input:
493493
- SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
494494
- CoalescePartitionsExec
495-
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, maintains_sort_order=true
495+
- RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1
496496
- CoalesceBatchesExec: target_batch_size=1024
497497
- DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC], file_type=parquet
498498
output:

datafusion/datasource-parquet/src/opener.rs

Lines changed: 5 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -823,8 +823,7 @@ fn should_enable_page_index(
823823

824824
#[cfg(test)]
825825
mod test {
826-
use std::sync::Arc;
827-
use crate::{DefaultParquetFileReaderFactory, opener::ParquetOpener};
826+
use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory};
828827
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
829828
use bytes::{BufMut, BytesMut};
830829
use datafusion_common::{
@@ -840,6 +839,8 @@ mod test {
840839
PartitionedFile,
841840
};
842841
use datafusion_expr::{col, lit};
842+
use datafusion_physical_expr::expressions::{Column, Literal};
843+
use datafusion_physical_expr::projection::ProjectionExprs;
843844
use datafusion_physical_expr::{
844845
expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
845846
};
@@ -849,8 +850,7 @@ mod test {
849850
use object_store::{memory::InMemory, path::Path, ObjectStore};
850851
use parquet::arrow::ArrowWriter;
851852
use parquet::file::properties::WriterProperties;
852-
use datafusion_physical_expr::expressions::{Column, Literal};
853-
use datafusion_physical_expr::projection::ProjectionExprs;
853+
use std::sync::Arc;
854854

855855
fn constant_int_stats() -> (Statistics, SchemaRef) {
856856
let schema = Arc::new(Schema::new(vec![
@@ -1398,88 +1398,6 @@ mod test {
13981398
}
13991399
}
14001400

1401-
#[tokio::test]
1402-
async fn test_reverse_scan_row_groups() {
1403-
use parquet::file::properties::WriterProperties;
1404-
1405-
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1406-
1407-
// Create multiple batches to ensure multiple row groups
1408-
let batch1 =
1409-
record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1410-
let batch2 =
1411-
record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap();
1412-
let batch3 =
1413-
record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap();
1414-
1415-
// Write parquet file with multiple row groups
1416-
// Force small row groups by setting max_row_group_size
1417-
let props = WriterProperties::builder()
1418-
.set_max_row_group_size(3) // Force each batch into its own row group
1419-
.build();
1420-
1421-
let data_len = write_parquet_batches(
1422-
Arc::clone(&store),
1423-
"test.parquet",
1424-
vec![batch1.clone(), batch2, batch3],
1425-
Some(props),
1426-
)
1427-
.await;
1428-
1429-
let schema = batch1.schema();
1430-
let file = PartitionedFile::new(
1431-
"test.parquet".to_string(),
1432-
u64::try_from(data_len).unwrap(),
1433-
);
1434-
1435-
let make_opener = |reverse_scan: bool| ParquetOpener {
1436-
projection: Arc::new([0]),
1437-
partition_index: 0,
1438-
batch_size: 1024,
1439-
limit: None,
1440-
predicate: None,
1441-
metadata_size_hint: None,
1442-
metrics: ExecutionPlanMetricsSet::new(),
1443-
parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(
1444-
Arc::clone(&store),
1445-
)),
1446-
partition_fields: vec![],
1447-
pushdown_filters: true,
1448-
reorder_filters: false,
1449-
enable_page_index: false,
1450-
enable_bloom_filter: false,
1451-
enable_limit_pruning: false,
1452-
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1453-
enable_row_group_stats_pruning: false,
1454-
coerce_int96: None,
1455-
#[cfg(feature = "parquet_encryption")]
1456-
file_decryption_properties: None,
1457-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1458-
#[cfg(feature = "parquet_encryption")]
1459-
encryption_factory: None,
1460-
max_predicate_cache_size: None,
1461-
reverse_row_groups: reverse_scan,
1462-
logical_file_schema: schema.clone(),
1463-
};
1464-
1465-
// Test normal scan (forward)
1466-
let opener = make_opener(false);
1467-
let stream = opener.open(file.clone()).unwrap().await.unwrap();
1468-
let forward_values = collect_int32_values(stream).await;
1469-
1470-
// Test reverse scan
1471-
let opener = make_opener(true);
1472-
let stream = opener.open(file.clone()).unwrap().await.unwrap();
1473-
let reverse_values = collect_int32_values(stream).await;
1474-
1475-
// The forward scan should return data in the order written
1476-
assert_eq!(forward_values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]);
1477-
1478-
// With reverse scan, row groups are reversed, so we expect:
1479-
// Row group 3 (7,8,9), then row group 2 (4,5,6), then row group 1 (1,2,3)
1480-
assert_eq!(reverse_values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]);
1481-
}
1482-
14831401
#[tokio::test]
14841402
async fn test_reverse_scan_row_groups() {
14851403
use parquet::file::properties::WriterProperties;
@@ -1506,7 +1424,7 @@ mod test {
15061424
vec![batch1.clone(), batch2, batch3],
15071425
Some(props),
15081426
)
1509-
.await;
1427+
.await;
15101428

15111429
let schema = batch1.schema();
15121430
let file = PartitionedFile::new(

datafusion/datasource-parquet/src/source.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ use datafusion_datasource::TableSchema;
4444
use datafusion_physical_expr::conjunction;
4545
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
4646
use datafusion_physical_expr_common::physical_expr::{fmt_sql, PhysicalExpr};
47-
use datafusion_physical_plan::SortOrderPushdownResult;
4847
use datafusion_physical_plan::filter_pushdown::PushedDown;
4948
use datafusion_physical_plan::filter_pushdown::{
5049
FilterPushdownPropagation, PushedDownPredicate,
5150
};
5251
use datafusion_physical_plan::metrics::Count;
5352
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
5453
use datafusion_physical_plan::DisplayFormatType;
54+
use datafusion_physical_plan::SortOrderPushdownResult;
5555

5656
#[cfg(feature = "parquet_encryption")]
5757
use datafusion_execution::parquet_encryption::EncryptionFactory;
@@ -60,7 +60,6 @@ use itertools::Itertools;
6060
use object_store::ObjectStore;
6161
#[cfg(feature = "parquet_encryption")]
6262
use parquet::encryption::decrypt::FileDecryptionProperties;
63-
use datafusion_physical_expr::projection::ProjectionExprs;
6463

6564
/// Execution plan for reading one or more Parquet files.
6665
///
@@ -308,7 +307,7 @@ impl ParquetSource {
308307
pub fn new(table_parquet_options: TableParquetOptions) -> Self {
309308
Self {
310309
table_schema: None,
311-
table_parquet_options: TableParquetOptions::default(),
310+
table_parquet_options,
312311
metrics: ExecutionPlanMetricsSet::new(),
313312
predicate: None,
314313
parquet_file_reader_factory: None,

datafusion/datasource/src/file.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ use crate::TableSchema;
3030
use datafusion_common::config::ConfigOptions;
3131
use datafusion_common::{not_impl_err, Result, Statistics};
3232
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
33-
use datafusion_physical_plan::SortOrderPushdownResult;
3433
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3534
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3635
use datafusion_physical_plan::DisplayFormatType;
36+
use datafusion_physical_plan::SortOrderPushdownResult;
3737

38+
use datafusion_physical_expr::projection::ProjectionExprs;
3839
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
3940
use object_store::ObjectStore;
40-
use datafusion_physical_expr::projection::ProjectionExprs;
4141

4242
/// Helper function to convert any type implementing FileSource to Arc&lt;dyn FileSource&gt;
4343
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {

datafusion/datasource/src/file_scan_config.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ use arrow::{
3636
datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type},
3737
};
3838
use datafusion_common::config::ConfigOptions;
39-
use datafusion_common::{exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics, Constraints, Result, ScalarValue, Statistics};
39+
use datafusion_common::{
40+
exec_datafusion_err, exec_err, internal_datafusion_err, ColumnStatistics,
41+
Constraints, Result, ScalarValue, Statistics,
42+
};
4043
use datafusion_execution::{
4144
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
4245
};
@@ -60,12 +63,14 @@ use datafusion_physical_plan::{
6063
use datafusion_physical_expr::equivalence::project_orderings;
6164
use datafusion_physical_plan::coop::cooperative;
6265
use datafusion_physical_plan::execution_plan::SchedulingType;
66+
use datafusion_physical_plan::projection::{
67+
all_alias_free_columns, new_projections_for_columns,
68+
};
6369
use log::{debug, warn};
64-
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
6570
use std::borrow::Cow;
6671
use std::collections::HashMap;
6772
use std::marker::PhantomData;
68-
use datafusion_physical_plan::projection::{all_alias_free_columns, new_projections_for_columns};
73+
use std::{any::Any, fmt::Debug, fmt::Formatter, fmt::Result as FmtResult, sync::Arc};
6974

7075
/// The base configurations for a [`DataSourceExec`], the a physical plan for
7176
/// any given file format.

datafusion/datasource/src/source.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ use datafusion_common::{Constraints, Result, Statistics};
4040
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
4141
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
4242
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
43-
use datafusion_physical_plan::SortOrderPushdownResult;
4443
use datafusion_physical_plan::filter_pushdown::{
4544
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
4645
};
46+
use datafusion_physical_plan::SortOrderPushdownResult;
4747

4848
/// A source of data, typically a list of files or memory
4949
///

datafusion/physical-optimizer/src/pushdown_sort.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,12 @@
5050
//! - Complete Sort elimination when ordering is guaranteed
5151
5252
use crate::PhysicalOptimizerRule;
53-
use datafusion_common::Result;
5453
use datafusion_common::config::ConfigOptions;
5554
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
55+
use datafusion_common::Result;
56+
use datafusion_physical_plan::sorts::sort::SortExec;
5657
use datafusion_physical_plan::ExecutionPlan;
5758
use datafusion_physical_plan::SortOrderPushdownResult;
58-
use datafusion_physical_plan::sorts::sort::SortExec;
5959
use std::sync::Arc;
6060

6161
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.

datafusion/physical-plan/src/coalesce_partitions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use super::{
3030
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3131
use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
3232

33-
use crate::projection::{ProjectionExec, make_with_child};
33+
use crate::projection::{make_with_child, ProjectionExec};
3434
use crate::sort_pushdown::SortOrderPushdownResult;
3535
use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
3636
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;

datafusion/physical-plan/src/repartition/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use std::{any::Any, vec};
2727

2828
use super::common::SharedMemoryReservation;
2929
use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet};
30-
use super::{DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, SortOrderPushdownResult};
30+
use super::{
31+
DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream,
32+
SortOrderPushdownResult,
33+
};
3134
use crate::execution_plan::{CardinalityEffect, EvaluationType, SchedulingType};
3235
use crate::hash_utils::create_hashes;
3336
use crate::metrics::{BaselineMetrics, SpillMetrics};

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,9 +1107,6 @@ impl serde::Serialize for ColumnStats {
11071107
if let Some(v) = self.distinct_count.as_ref() {
11081108
struct_ser.serialize_field("distinctCount", v)?;
11091109
}
1110-
if let Some(v) = self.byte_size.as_ref() {
1111-
struct_ser.serialize_field("byteSize", v)?;
1112-
}
11131110
struct_ser.end()
11141111
}
11151112
}
@@ -1130,8 +1127,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
11301127
"nullCount",
11311128
"distinct_count",
11321129
"distinctCount",
1133-
"byte_size",
1134-
"byteSize",
11351130
];
11361131

11371132
#[allow(clippy::enum_variant_names)]
@@ -1141,7 +1136,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
11411136
SumValue,
11421137
NullCount,
11431138
DistinctCount,
1144-
ByteSize,
11451139
}
11461140
impl<'de> serde::Deserialize<'de> for GeneratedField {
11471141
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -1168,7 +1162,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
11681162
"sumValue" | "sum_value" => Ok(GeneratedField::SumValue),
11691163
"nullCount" | "null_count" => Ok(GeneratedField::NullCount),
11701164
"distinctCount" | "distinct_count" => Ok(GeneratedField::DistinctCount),
1171-
"byteSize" | "byte_size" => Ok(GeneratedField::ByteSize),
11721165
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
11731166
}
11741167
}
@@ -1225,12 +1218,6 @@ impl<'de> serde::Deserialize<'de> for ColumnStats {
12251218
}
12261219
distinct_count__ = map_.next_value()?;
12271220
}
1228-
GeneratedField::ByteSize => {
1229-
if byte_size__.is_some() {
1230-
return Err(serde::de::Error::duplicate_field("byteSize"));
1231-
}
1232-
byte_size__ = map_.next_value()?;
1233-
}
12341221
}
12351222
}
12361223
Ok(ColumnStats {

0 commit comments

Comments
 (0)