Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.85.1"

[dependencies]
aquamarine = "0.6.0"
arrow = "56.0.0"
arrow-schema = "56.0.0"
async-trait = "0.1.89"
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@
pub mod materialized;

/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
///
/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*.
pub mod rewrite;

/// Configuration options for materialized view related features.
Expand Down
1 change: 0 additions & 1 deletion src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.

/// Track dependencies of materialized data in object storage
pub mod dependencies;

/// Pluggable metadata sources for incremental view maintenance
Expand Down
101 changes: 101 additions & 0 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@
// specific language governing permissions and limitations
// under the License.

/*!

This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait.
Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed
into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is
implied by the type bound `Materialized: ListingTableLike`.

The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to
partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions
of the materialized view and its source tables. This gives rise to two natural phases of the algorithm:
1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources.
This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order,
or even set equality of the original query.
* Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'.
* This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies.
2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources.
After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource)
This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row.
The output rows are then transformed into object storage paths to generate the final graph.

The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`].
*/

use datafusion::{
catalog::{CatalogProviderList, TableFunctionImpl},
config::{CatalogOptions, ConfigOptions},
Expand Down Expand Up @@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> {
}
}

#[cfg_attr(doc, aquamarine::aquamarine)]
/// Returns a logical plan that, when executed, lists expected build targets
/// for this materialized view, together with the dependencies for each target.
///
/// See the [module documentation](super) for an overview of the algorithm.
///
/// # Example
///
/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily
/// close prices of a stock from its trades, together with the settlement price from a daily statistics table:
///
/// ```sql
/// SELECT
/// ticker,
/// LAST_VALUE(trades.price) AS close,
/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price,
/// trades.date AS date
/// FROM trades
/// JOIN daily_statistics ON
/// trades.ticker = daily_statistics.ticker AND
/// trades.date = daily_statistics.reference_date AND
/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS
/// GROUP BY ticker, date
/// ```
///
/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`.
/// This query gives us the following logical plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: <br>ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, <mark>trades.date AS date</mark>"]
/// A --> B["Aggregate: <br>expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)] <br>groupby=[ticker, <mark>trades.date</mark>]"]
/// B --> C["Inner Join: <br>trades.ticker = daily_statistics.ticker AND <br>trades.date = daily_statistics.reference_date AND <br><mark>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS</mark>"]
/// C --> D["TableScan: trades <br>projection=[ticker, price, <mark>date</mark>]"]
/// C --> E["TableScan: daily_statistics <br>projection=[ticker, settlement_price, reference_date, <mark>date</mark>]"]
/// ```
///
/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: trades.date AS date"]
/// A --> B["Projection: trades.date"]
/// B --> C["Inner Join: <br>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"]
/// C --> D["TableScan: trades (projection=[date])"]
/// C --> E["TableScan: daily_statistics (projection=[date])"]
/// ```
///
/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that
/// we cannot partition the materialized view on aggregate expressions.
///
/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan,
/// together with the target path constructed from the (static) partition columns. This gives us the following plan:
///
/// ```mermaid
/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
/// graph TD
/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"]
/// A --> B["Projection: __meta, trades.date AS date"]
/// B --> C["Projection: <br>concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"]
/// C --> D["Inner Join: <br><b>daily_statistics_meta</b>.date BETWEEN <b>trades_meta</b>.date AND <b>trades_meta</b>.date + INTERVAL 2 WEEKS"]
/// D --> E["TableScan: <b>trades_meta</b> (projection=[__meta, date])"]
/// D --> F["TableScan: <b>daily_statistics_meta</b> (projection=[__meta, date])"]
/// ```
///
/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column
/// unnested into its components. The final output looks roughly like this:
///
/// ```text
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 |
/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 |
/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 |
/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 |
/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
/// ```
pub fn mv_dependencies_plan(
materialized_view: &dyn Materialized,
row_metadata_registry: &RowMetadataRegistry,
Expand Down
2 changes: 1 addition & 1 deletion src/materialized/file_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl FileMetadataBuilder {
}
}

/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider.
/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider.
#[async_trait]
pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync {
/// List all files in the store for the given `url` prefix.
Expand Down
2 changes: 0 additions & 2 deletions src/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

use datafusion::{common::extensions_options, config::ConfigExtension};

/// Implements a query rewriting optimizer, also known as "view exploitation"
/// in some academic sources.
pub mod exploitation;

pub mod normal_form;
Expand Down
28 changes: 28 additions & 0 deletions src/rewrite/exploitation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,34 @@
// specific language governing permissions and limitations
// under the License.

/*!

This module implements a query rewriting optimizer, also known as "view exploitation"
in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code,
which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views.

The query rewriting process spans both the logical and physical planning phases and can be described as follows:

1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views
and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms.
If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression
and one or more candidate rewrites using materialized views.
2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`]
physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node.
3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions
to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost
estimations in the next phase.
4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node.
The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node
with its selected best candidate plan.

In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention
that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization.
While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner
to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes.

*/

use std::collections::HashMap;
use std::{collections::HashSet, sync::Arc};

Expand Down
2 changes: 1 addition & 1 deletion src/rewrite/normal_form.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

/*!

This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf),
This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf),
which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query.

The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows:
Expand Down