Skip to content

Commit 0139549

Browse files
committed
optimizer: Check ProjectionPushdown against Demand
1 parent 9bacb1a commit 0139549

File tree

3 files changed

+87
-16
lines changed

3 files changed

+87
-16
lines changed

src/expr/src/linear.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -126,19 +126,23 @@ impl RustType<ProtoPredicate> for (usize, MirScalarExpr) {
126126

127127
impl Display for MapFilterProject {
128128
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129-
writeln!(f, "MapFilterProject(")?;
130-
writeln!(f, " expressions:")?;
131-
self.expressions
132-
.iter()
133-
.enumerate()
134-
.try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
135-
writeln!(f, " predicates:")?;
136-
self.predicates
137-
.iter()
138-
.try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
139-
writeln!(f, " projection: {:?}", self.projection)?;
140-
writeln!(f, " input_arity: {}", self.input_arity)?;
141-
writeln!(f, ")")
129+
if !mz_ore::assert::soft_assertions_enabled() {
130+
writeln!(f, "<redacted MFP; see `impl Display for MapFilterProject`>")
131+
} else {
132+
writeln!(f, "MapFilterProject(")?;
133+
writeln!(f, " expressions:")?;
134+
self.expressions
135+
.iter()
136+
.enumerate()
137+
.try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
138+
writeln!(f, " predicates:")?;
139+
self.predicates
140+
.iter()
141+
.try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
142+
writeln!(f, " projection: {:?}", self.projection)?;
143+
writeln!(f, " input_arity: {}", self.input_arity)?;
144+
writeln!(f, ")")
145+
}
142146
}
143147
}
144148

src/transform/src/dataflow.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use mz_repr::GlobalId;
2929
use proptest_derive::Arbitrary;
3030
use serde::{Deserialize, Serialize};
3131

32+
use crate::demand::Demand;
3233
use crate::monotonic::MonotonicFlag;
3334
use crate::notice::RawOptimizerNotice;
3435
use crate::{IndexOracle, Optimizer, TransformCtx, TransformError};
@@ -91,6 +92,10 @@ pub fn optimize_dataflow(
9192
)?;
9293

9394
optimize_dataflow_monotonic(dataflow, transform_ctx)?;
95+
96+
for object in dataflow.objects_to_build.iter() {
97+
Demand::soft_assert_no_more_projection_pushdown(&object.plan.0)?;
98+
}
9499
}
95100

96101
prune_and_annotate_dataflow_index_imports(

src/transform/src/demand.rs

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@
1010
//! Transformation based on pushing demand information about columns toward sources.
1111
1212
use itertools::Itertools;
13-
use mz_ore::assert_none;
1413
use std::collections::{BTreeMap, BTreeSet};
1514

15+
use mz_expr::visit::Visit;
1616
use mz_expr::{
17-
AggregateExpr, AggregateFunc, Id, JoinInputMapper, MirRelationExpr, MirScalarExpr,
18-
RECURSION_LIMIT,
17+
AggregateExpr, AggregateFunc, Id, JoinInputMapper, MapFilterProject, MirRelationExpr,
18+
MirScalarExpr, RECURSION_LIMIT,
1919
};
2020
use mz_ore::stack::{CheckedRecursion, RecursionGuard};
21+
use mz_ore::{assert_none, soft_assert_or_log};
2122
use mz_repr::{Datum, Row};
2223

2324
use crate::TransformCtx;
@@ -365,4 +366,65 @@ impl Demand {
365366
}
366367
})
367368
}
369+
370+
/// Checks the optimizer invariant that there are no more opportunities for projection pushdown,
371+
/// comparing against Demand.
372+
///
373+
/// More specifically, we check that the MFPs directly on top of global Gets project at most
374+
/// those columns that `Demand` determines are demanded from that global Get. An exception is
375+
/// when there is an `ArrangeBy` on top of a global `Get`, in which case the relevant MFP might
376+
/// have been lifted away by `JoinImplementation` to enable index reuse. We skip checking the
377+
/// MFP in this case.
378+
///
379+
/// This is meant to be called at the end of the MIR pipeline, where it can catch 3 types of
380+
/// issues:
381+
/// 1. If `ProjectionPushdown` is not as smart as `Demand`.
382+
/// 2. If some transform after the last run of `ProjectionPushdown` undoes something that
383+
/// `ProjectionPushdown` did.
384+
/// 3. If some transform after the last run of `ProjectionPushdown` opens up new opportunities
385+
/// for projection pushdown.
386+
pub fn soft_assert_no_more_projection_pushdown(
387+
relation: &MirRelationExpr,
388+
) -> Result<(), crate::TransformError> {
389+
let mut relation = relation.clone();
390+
let arity = relation.arity();
391+
// Gather demanded columns of each of the Ids.
392+
let mut demand_on_ids = BTreeMap::new();
393+
let demand = Demand::default();
394+
demand.action(&mut relation, (0..arity).collect(), &mut demand_on_ids)?;
395+
// Check that MFPs on top of global Gets project at most those columns that Demand thinks
396+
// are demanded.
397+
//
398+
// We use `visit_pre_post`, because we'd like to control how we descend to children: we want
399+
// to jump over an MFP that we find at the current node, in order to avoid observing partial
400+
// MFPs, that are without their Projects.
401+
relation.visit_pre_post(&mut |expr| {
402+
let (mfp, expr) = MapFilterProject::extract_from_expression(expr);
403+
match expr {
404+
MirRelationExpr::Get { id: Id::Global(id), .. } => {
405+
let demand = demand_on_ids.get(&Id::Global(*id)).expect("`Demand` should have an opinion on all ids");
406+
let actual_proj_above_get = mfp.projection.iter().filter(|c| **c < mfp.input_arity).collect_vec();
407+
soft_assert_or_log!(
408+
actual_proj_above_get.iter().all(|c| demand.contains(c)),
409+
"Missed ProjectionPushdown opportunity: demand on {}: {:?}. actual_proj_above_get: {:?}. The whole plan:\n{}\n\nmfp:{},\nexpr:{}",
410+
id, demand, actual_proj_above_get, relation.pretty(), mfp, expr.pretty()
411+
);
412+
// Don't descend to children.
413+
Some(vec![])
414+
},
415+
// If there is an ArrangeBy on top of a global Get, then a projection might have
416+
// been lifted away by JoinImplementation to re-use an index, so skip checking the
417+
// invariant in this case.
418+
MirRelationExpr::ArrangeBy { input, .. } if matches!(**input, MirRelationExpr::Get { id: Id::Global(_), .. }) => {
419+
Some(vec![])
420+
},
421+
// Just continue with the children of the operator that we found below the MFP.
422+
_ => {
423+
Some(expr.children().collect())
424+
},
425+
}
426+
427+
}, &mut |_| {})?;
428+
Ok(())
429+
}
368430
}

0 commit comments

Comments
 (0)