diff --git a/Cargo.toml b/Cargo.toml index c30b62e..cec3755 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.85.1" [dependencies] +aquamarine = "0.6.0" arrow = "56.0.0" arrow-schema = "56.0.0" async-trait = "0.1.89" diff --git a/src/lib.rs b/src/lib.rs index 8b26d85..238bbd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,6 +42,9 @@ pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. +/// +/// The implementation is based heavily on [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), +/// *Optimizing Queries Using Materialized Views: A Practical, Scalable Solution*. pub mod rewrite; /// Configuration options for materialized view related features. diff --git a/src/materialized.rs b/src/materialized.rs index 8e093c0..c3cf30e 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -/// Track dependencies of materialized data in object storage pub mod dependencies; /// Pluggable metadata sources for incremental view maintenance diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 9cf8b8a..42ddd35 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -15,6 +15,29 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait. +Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed +into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is +implied by the type bound `Materialized: ListingTableLike`. + +The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to +partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions +of the materialized view and its source tables. This gives rise to two natural phases of the algorithm: +1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources. + This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order, + or even set equality of the original query. + * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. + * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies. +2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources. + After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource) + This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row. + The output rows are then transformed into object storage paths to generate the final graph. + +The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`]. + */ + use datafusion::{ catalog::{CatalogProviderList, TableFunctionImpl}, config::{CatalogOptions, ConfigOptions}, @@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> { } } +#[cfg_attr(doc, aquamarine::aquamarine)] /// Returns a logical plan that, when executed, lists expected build targets /// for this materialized view, together with the dependencies for each target. +/// +/// See the [module documentation](super) for an overview of the algorithm. +/// +/// # Example +/// +/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily +/// close prices of a stock from its trades, together with the settlement price from a daily statistics table: +/// +/// ```sql +/// SELECT +/// ticker, +/// LAST_VALUE(trades.price) AS close, +/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, +/// trades.date AS date +/// FROM trades +/// JOIN daily_statistics ON +/// trades.ticker = daily_statistics.ticker AND +/// trades.date = daily_statistics.reference_date AND +/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS +/// GROUP BY ticker, date +/// ``` +/// +/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`. +/// This query gives us the following logical plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection:
ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, trades.date AS date"] +/// A --> B["Aggregate:
expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)]
groupby=[ticker, trades.date]"] +/// B --> C["Inner Join:
trades.ticker = daily_statistics.ticker AND
trades.date = daily_statistics.reference_date AND
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades
projection=[ticker, price, date]"] +/// C --> E["TableScan: daily_statistics
projection=[ticker, settlement_price, reference_date, date]"] +/// ``` +/// +/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: trades.date AS date"] +/// A --> B["Projection: trades.date"] +/// B --> C["Inner Join:
daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] +/// C --> D["TableScan: trades (projection=[date])"] +/// C --> E["TableScan: daily_statistics (projection=[date])"] +/// ``` +/// +/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that +/// we cannot partition the materialized view on aggregate expressions. +/// +/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan, +/// together with the target path constructed from the (static) partition columns. This gives us the following plan: +/// +/// ```mermaid +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% +/// graph TD +/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"] +/// A --> B["Projection: __meta, trades.date AS date"] +/// B --> C["Projection:
concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"] +/// C --> D["Inner Join:
daily_statistics_meta.date BETWEEN trades_meta.date AND trades_meta.date + INTERVAL 2 WEEKS"] +/// D --> E["TableScan: trades_meta (projection=[__meta, date])"] +/// D --> F["TableScan: daily_statistics_meta (projection=[__meta, date])"] +/// ``` +/// +/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column +/// unnested into its components. The final output looks roughly like this: +/// +/// ```text +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ +/// ``` pub fn mv_dependencies_plan( materialized_view: &dyn Materialized, row_metadata_registry: &RowMetadataRegistry, diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index 85e5838..bca69ea 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -722,7 +722,7 @@ impl FileMetadataBuilder { } } -/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +/// Provides [`ObjectMeta`] data to the [`FileMetadata`] table provider. #[async_trait] pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { /// List all files in the store for the given `url` prefix. diff --git a/src/rewrite.rs b/src/rewrite.rs index 170c88f..da24cc5 100644 --- a/src/rewrite.rs +++ b/src/rewrite.rs @@ -17,8 +17,6 @@ use datafusion::{common::extensions_options, config::ConfigExtension}; -/// Implements a query rewriting optimizer, also known as "view exploitation" -/// in some academic sources. pub mod exploitation; pub mod normal_form; diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 06145ae..7a95113 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -15,6 +15,34 @@ // specific language governing permissions and limitations // under the License. +/*! + +This module implements a query rewriting optimizer, also known as "view exploitation" +in some academic sources. The "view matching" subproblem is implemented in the [`SpjNormalForm`] code, +which is used by the [`ViewMatcher`] logical optimizer to compare queries with materialized views. + +The query rewriting process spans both the logical and physical planning phases and can be described as follows: + +1. During logical optimization, the [`ViewMatcher`] rule scans all available materialized views + and attempts to match them against each sub-expression of the query plan by comparing their SPJ normal forms. + If a match is found, the sub-expression is replaced with a [`OneOf`] node, which contains the original sub-expression + and one or more candidate rewrites using materialized views. +2. During physical planning, the [`ViewExploitationPlanner`] identifies [`OneOf`] nodes and generates a [`OneOfExec`] + physical plan node, which contains all candidate physical plans corresponding to the logical plans in the original [`OneOf`] node. +3. DataFusion is allowed to run its usual physical optimization rules, which may add additional operators such as sorts or repartitions + to the candidate plans. Filter, sort, and projection pushdown into the `OneOfExec` nodes are important as these can affect cost + estimations in the next phase. +4. Finally, a user-defined cost function is used to choose the "best" candidate within each `OneOfExec` node. + The [`PruneCandidates`] physical optimizer rule is used to finalize the choice by replacing each `OneOfExec` node + with its selected best candidate plan. + +In the [reference paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf) for this implementation, the authors mention +that the database's builtin cost optimizer takes care of selecting the best rewrite. However, DataFusion lacks cost-based optimization. +While we do use a user-defined cost function to select the best candidate at each `OneOfExec`, this requires cooperation from the planner +to push down relevant information such as projections, sorts, and filters into the `OneOfExec` nodes. + +*/ + use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 7efd53d..e09a54b 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -17,7 +17,7 @@ /*! -This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://courses.cs.washington.edu/courses/cse591d/01sp/opt_views.pdf), +This module contains code primarily used for view matching. We implement the view matching algorithm from [this paper](https://dsg.uwaterloo.ca/seminars/notes/larson-paper.pdf), which provides a method for determining when one Select-Project-Join query can be rewritten in terms of another Select-Project-Join query. The implementation is contained in [`SpjNormalForm::rewrite_from`]. The method can be summarized as follows: