Skip to content

Commit d541143

Browse files
committed
fix bug
1 parent f6a1312 commit d541143

File tree

1 file changed

+26
-10
lines changed

1 file changed

+26
-10
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ impl RowGroupsPreparedParquetOpen {
659659
file_metrics,
660660
mut file_pruner,
661661
metadata_size_hint,
662-
async_file_reader: _,
662+
async_file_reader,
663663
logical_file_schema: _,
664664
output_schema,
665665
projection: _,
@@ -693,7 +693,7 @@ impl RowGroupsPreparedParquetOpen {
693693
// --------------------------------------------------------
694694
let mut access_plan = row_groups.build();
695695
if !access_plan.is_empty()
696-
&& let Some(p) = page_pruning_predicate
696+
&& let Some(ref p) = page_pruning_predicate
697697
{
698698
access_plan = p.prune_plan_with_page_index(
699699
access_plan,
@@ -719,14 +719,26 @@ impl RowGroupsPreparedParquetOpen {
719719
return Ok(vec![]);
720720
}
721721

722-
let split_by_row_group =
723-
limit.is_none() && file_pruner.is_none() && !preserve_order;
722+
// Row-group fanout is currently only safe for plain unordered scans.
723+
// Filter pushdown and page-index pruning tests compare full output
724+
// batches across scan modes, and allowing individual row-group planners
725+
// to overtake one another changes the observable row order within a
726+
// single file. Keep those scans on the single-planner path until the
727+
// scheduler has explicit within-file ordering support for child
728+
// planners.
729+
let split_by_row_group = limit.is_none()
730+
&& file_pruner.is_none()
731+
&& !preserve_order
732+
&& predicate.is_none()
733+
&& !pushdown_filters
734+
&& page_pruning_predicate.is_none();
724735
let prepared_plans = if split_by_row_group {
725736
prepared_plan.into_single_row_group_plans(file_metadata.as_ref())?
726737
} else {
727738
vec![prepared_plan]
728739
};
729740

741+
let mut reusable_reader = Some(async_file_reader);
730742
let mut reading_states = Vec::with_capacity(prepared_plans.len());
731743
for prepared_plan in prepared_plans {
732744
// ---------------------------------------------------------
@@ -811,12 +823,16 @@ impl RowGroupsPreparedParquetOpen {
811823
let projector = projection.make_projector(&stream_schema)?;
812824
let push_decoder_state = PushDecoderStreamState {
813825
decoder,
814-
reader: state.parquet_file_reader_factory.create_reader(
815-
state.partition_index,
816-
partitioned_file.clone(),
817-
metadata_size_hint,
818-
&state.metrics,
819-
)?,
826+
reader: if let Some(reader) = reusable_reader.take() {
827+
reader
828+
} else {
829+
state.parquet_file_reader_factory.create_reader(
830+
state.partition_index,
831+
partitioned_file.clone(),
832+
metadata_size_hint,
833+
&state.metrics,
834+
)?
835+
},
820836
projector,
821837
output_schema: Arc::clone(&output_schema),
822838
replace_schema,

0 commit comments

Comments
 (0)