Skip to content

Commit f7085f5

Browse files
committed
try to fix
1 parent faca92d commit f7085f5

File tree

1 file changed

+14
-0
lines changed

1 file changed

+14
-0
lines changed

datafusion/physical-optimizer/src/limit_pushdown.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
2929
use datafusion_common::utils::combine_limit;
3030
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
3131
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
32+
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
33+
use datafusion_physical_plan::projection::ProjectionExec;
3234
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
3335
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3436
/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
@@ -159,6 +161,18 @@ pub fn pushdown_limit_helper(
159161
));
160162
}
161163

164+
if let Some(projection) = pushdown_plan.as_any().downcast_ref::<ProjectionExec>() {
165+
// If `PlaceholderRowExec`, it's possible that the projection is from aggregate_statistics,
166+
// So to be conservative, we should not push down the limit.
167+
// Like this:
168+
// LimitExec(fetch=1)
169+
// ProjectionExec(expr=[10 as count(*)])
170+
// PlaceholderRowExec
171+
if projection.input().as_any().is::<PlaceholderRowExec>() {
172+
return Ok((Transformed::no(pushdown_plan), global_state));
173+
}
174+
}
175+
162176
// If we have a non-limit operator with fetch capability, update global
163177
// state as necessary:
164178
if pushdown_plan.fetch().is_some() {

0 commit comments

Comments
 (0)