Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::plan::LirId;
use mz_compute_types::plan::render_plan::RenderPlan;
use mz_dyncfg::ConfigSet;
use mz_expr::SafeMfpPlan;
use mz_expr::{SafeMfpPlan, StaticMapFilterProject};
use mz_expr::row::RowCollection;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
Expand Down Expand Up @@ -1430,6 +1430,8 @@ impl IndexPeek {
let mut literals = peek.literal_constraints.iter().flatten();
let mut current_literal = None;

let static_mfp = StaticMapFilterProject::from(&peek.map_filter_project);

while cursor.key_valid(&storage) {
if has_literal_constraints {
loop {
Expand Down Expand Up @@ -1485,8 +1487,7 @@ impl IndexPeek {
// loop.
datum_vec.extend(current_literal.unwrap().iter());
}
if let Some(result) = peek
.map_filter_project
if let Some(result) = static_mfp
.evaluate_into(&mut borrow, &arena, &mut row_builder)
.map(|row| row.cloned())
.map_err_to_string_with_causes()?
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use mz_compute_types::dataflows::DataflowDescription;
use mz_compute_types::dyncfgs::ENABLE_COMPUTE_RENDER_FUELED_AS_SPECIFIC_COLLECTION;
use mz_compute_types::plan::{AvailableCollections, LirId};
use mz_dyncfg::ConfigSet;
use mz_expr::{Id, MapFilterProject, MirScalarExpr};
use mz_expr::{Id, MapFilterProject, MirScalarExpr, StaticMirScalarExprs};
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow};
use mz_storage_types::controller::CollectionMetadata;
Expand Down Expand Up @@ -905,6 +905,7 @@ where
Arranged<S, RowRowAgent<S::Timestamp, Diff>>,
Collection<S, DataflowError, Diff>,
) {
let key: Vec<_> = key.iter().map(StaticMirScalarExprs::from).collect();
// The following `unary_fallible` implements a `map_fallible`, but produces columnar updates
// for the ok stream. The `map_fallible` cannot be used here because the closure cannot
// return references, which is what we need to push into columnar streams. Instead, we use
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/render/flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

use columnar::Columnar;
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use mz_expr::MfpPlan;
use mz_expr::{MapFilterProject, MirScalarExpr, TableFunc};
use mz_expr::{MfpPlan, StaticMirScalarExprs};
use mz_repr::{DatumVec, RowArena, SharedRow};
use mz_repr::{Diff, Row, Timestamp};
use mz_timely_util::operator::StreamExt;
Expand Down Expand Up @@ -38,6 +38,7 @@ where
mfp: MapFilterProject,
input_key: Option<Vec<MirScalarExpr>>,
) -> CollectionBundle<G> {
let exprs: Vec<_> = exprs.iter().map(StaticMirScalarExprs::from).collect();
let until = self.until.clone();
let mfp_plan = mfp.into_plan().expect("MapFilterProject planning failed");
let (ok_collection, err_collection) =
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/render/join/delta_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use differential_dataflow::trace::{BatchReader, Cursor, TraceReader};
use differential_dataflow::{AsCollection, Collection};
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan};
use mz_expr::MirScalarExpr;
use mz_expr::{MirScalarExpr, StaticMirScalarExprs};
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
Expand Down Expand Up @@ -345,6 +345,7 @@ where
for<'a> Tr::Val<'a>: ToDatumIter,
CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static,
{
let prev_key: Vec<_> = prev_key.iter().map(StaticMirScalarExprs::from).collect();
let name = "DeltaJoinKeyPreparation";
type CB<C> = CapacityContainerBuilder<C>;
let (updates, errs) = updates.map_fallible::<CB<_>, CB<_>, _, _, _>(name, {
Expand Down
2 changes: 2 additions & 0 deletions src/compute/src/render/join/linear_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use mz_compute_types::dyncfgs::{ENABLE_MZ_JOIN_CORE, LINEAR_JOIN_YIELDING};
use mz_compute_types::plan::join::JoinClosure;
use mz_compute_types::plan::join::linear_join::{LinearJoinPlan, LinearStagePlan};
use mz_dyncfg::ConfigSet;
use mz_expr::StaticMirScalarExprs;
use mz_repr::fixed_length::ToDatumIter;
use mz_repr::{DatumVec, Diff, Row, RowArena, SharedRow};
use mz_storage_types::errors::DataflowError;
Expand Down Expand Up @@ -358,6 +359,7 @@ where
where
S: Scope<Timestamp = G::Timestamp>,
{
let stream_key: Vec<_> = stream_key.iter().map(StaticMirScalarExprs::from).collect();
// If we have only a streamed collection, we must first form an arrangement.
if let JoinedFlavor::Collection(stream) = joined {
let name = "LinearJoinKeyPreparation";
Expand Down
3 changes: 3 additions & 0 deletions src/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod scalar;

pub mod explain;
pub mod row;
mod static_eval;
pub mod virtual_syntax;
pub mod visit;

Expand All @@ -35,6 +36,7 @@ pub use linear::plan::{MfpPlan, SafeMfpPlan};
pub use linear::util::{join_permutations, permutation_for_arrangement};
pub use linear::{
MapFilterProject, ProtoMapFilterProject, ProtoMfpPlan, ProtoSafeMfpPlan, memoize_expr,
StaticMapFilterProject,
};
pub use relation::func::order_aggregate_datums as order_aggregate_datums_exported_for_benchmarking;
pub use relation::func::{
Expand All @@ -54,6 +56,7 @@ pub use scalar::{
EvalError, FilterCharacteristics, MirScalarExpr, ProtoDomainLimit, ProtoEvalError,
ProtoMirScalarExpr, like_pattern,
};
pub use static_eval::StaticMirScalarExprs;

/// A [`MirRelationExpr`] that claims to have been optimized, e.g., by an
/// `transform::Optimizer`.
Expand Down
113 changes: 111 additions & 2 deletions src/expr/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;

use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Datum, Row};
use mz_repr::{Datum, Row, RowArena};
use proptest::prelude::*;
use serde::{Deserialize, Serialize};

use crate::linear::proto_map_filter_project::ProtoPredicate;
use crate::static_eval::StaticMirScalarExprs;
use crate::visit::Visit;
use crate::{MirRelationExpr, MirScalarExpr};
use crate::{EvalError, MirRelationExpr, MirScalarExpr, SafeMfpPlan};

include!(concat!(env!("OUT_DIR"), "/mz_expr.linear.rs"));

Expand Down Expand Up @@ -2101,6 +2102,114 @@ pub mod plan {
}
}

#[derive(Debug)]
pub struct StaticMapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
/// Many of these expressions may not be produced in the output,
/// and may only be present as common subexpressions.
pub expressions: Vec<StaticMirScalarExprs>,
/// Expressions that must evaluate to `Datum::True` for the output
/// row to be produced.
///
/// Each entry is prepended with a column identifier indicating
/// the column *before* which the predicate should first be applied.
/// Most commonly this would be one plus the largest column identifier
/// in the predicate's support, but it could be larger to implement
/// guarded evaluation of predicates.
///
/// This list should be sorted by the first field.
pub predicates: Vec<(usize, StaticMirScalarExprs)>,
/// A sequence of column identifiers whose data form the output row.
pub projection: Vec<usize>,
/// The expected number of input columns.
///
/// This is needed to ensure correct identification of newly formed
/// columns in the output.
pub input_arity: usize,
}

impl From<&SafeMfpPlan> for StaticMapFilterProject {
fn from(value: &SafeMfpPlan) -> Self {
(&value.mfp).into()
}
}

impl From<&MapFilterProject> for StaticMapFilterProject {
fn from(mfp: &MapFilterProject) -> Self {
Self {
expressions: mfp.expressions.iter().map(|e| e.into()).collect(),
predicates: mfp.predicates.iter().map(|(i, e)| (*i, e.into())).collect(),
projection: mfp.projection.clone(),
input_arity: mfp.input_arity,
}
}
}

impl StaticMapFilterProject {
#[inline(always)]
pub fn evaluate_into<'a, 'row>(
&'a self,
datums: &mut Vec<Datum<'a>>,
arena: &'a RowArena,
row_buf: &'row mut Row,
) -> Result<Option<&'row Row>, EvalError> {
let passed_predicates = self.evaluate_inner(datums, arena)?;
if !passed_predicates {
Ok(None)
} else {
row_buf
.packer()
.extend(self.projection.iter().map(|c| datums[*c]));
Ok(Some(row_buf))
}
}

/// A version of `evaluate` which produces an iterator over `Datum`
/// as output.
///
/// This version can be useful when one wants to capture the resulting
/// datums without packing and then unpacking a row.
#[inline(always)]
pub fn evaluate_iter<'b, 'a: 'b>(
&'a self,
datums: &'b mut Vec<Datum<'a>>,
arena: &'a RowArena,
) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
let passed_predicates = self.evaluate_inner(datums, arena)?;
if !passed_predicates {
Ok(None)
} else {
Ok(Some(self.projection.iter().map(move |i| datums[*i])))
}
}

/// Populates `datums` with `self.expressions` and tests `self.predicates`.
///
/// This does not apply `self.projection`, which is up to the calling method.
pub fn evaluate_inner<'b, 'a: 'b>(
&'a self,
datums: &'b mut Vec<Datum<'a>>,
arena: &'a RowArena,
) -> Result<bool, EvalError> {
let mut expression = 0;
for (support, predicate) in self.predicates.iter() {
while self.input_arity + expression < *support {
datums.push(self.expressions[expression].eval(&datums[..], arena)?);
expression += 1;
}
if predicate.eval(&datums[..], arena)? != Datum::True {
return Ok(false);
}
}
while expression < self.expressions.len() {
datums.push(self.expressions[expression].eval(&datums[..], arena)?);
expression += 1;
}
Ok(true)
}
}

#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,7 @@ impl MirScalarExpr {
temp_storage: &'a RowArena,
) -> Result<Datum<'a>, EvalError> {
match self {
MirScalarExpr::Column(index, _name) => Ok(datums[*index].clone()),
MirScalarExpr::Column(index, _name) => Ok(datums[*index]),
MirScalarExpr::Literal(res, _column_type) => match res {
Ok(row) => Ok(row.unpack_first()),
Err(e) => Err(e.clone()),
Expand Down
Loading