|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
| 18 | +/*! |
| 19 | +
|
| 20 | +This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait. |
| 21 | +Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed |
| 22 | +into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is |
| 23 | +implied by the type bound `Materialized: ListingTableLike`. |
| 24 | +
|
| 25 | +The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to |
| 26 | +partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions |
| 27 | +of the materialized view and its source tables. This gives rise to two natural phases of the algorithm: |
| 28 | +1. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources. |
| 29 | + This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order, |
| 30 | + or even set equality of the original query. |
| 31 | + * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'. |
| 32 | + * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies. |
| 33 | +2. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources. |
| 34 | + After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource) |
| 35 | + This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row. |
| 36 | + The output rows are then transformed into object storage paths to generate the final graph. |
| 37 | +
|
| 38 | +The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`]. |
| 39 | + */ |
| 40 | + |
18 | 41 | use datafusion::{ |
19 | 42 | catalog::{CatalogProviderList, TableFunctionImpl}, |
20 | 43 | config::{CatalogOptions, ConfigOptions}, |
@@ -252,8 +275,86 @@ fn get_table_name(args: &[Expr]) -> Result<&String> { |
252 | 275 | } |
253 | 276 | } |
254 | 277 |
|
| 278 | +#[cfg_attr(doc, aquamarine::aquamarine)] |
255 | 279 | /// Returns a logical plan that, when executed, lists expected build targets |
256 | 280 | /// for this materialized view, together with the dependencies for each target. |
| 281 | +/// |
| 282 | +/// See the [module documentation](super) for an overview of the algorithm. |
| 283 | +/// |
| 284 | +/// # Example |
| 285 | +/// |
| 286 | +/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily |
| 287 | +/// close prices of a stock from its trades, together with the settlement price from a daily statistics table: |
| 288 | +/// |
| 289 | +/// ```sql |
| 290 | +/// SELECT |
| 291 | +/// ticker, |
| 292 | +/// LAST_VALUE(trades.price) AS close, |
| 293 | +/// LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, |
| 294 | +/// trades.date AS date |
| 295 | +/// FROM trades |
| 296 | +/// JOIN daily_statistics ON |
| 297 | +/// trades.ticker = daily_statistics.ticker AND |
| 298 | +/// trades.date = daily_statistics.reference_date AND |
| 299 | +/// daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS |
| 300 | +/// GROUP BY ticker, date |
| 301 | +/// ``` |
| 302 | +/// |
| 303 | +/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`. |
| 304 | +/// This query gives us the following logical plan: |
| 305 | +/// |
| 306 | +/// ```mermaid |
| 307 | +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% |
| 308 | +/// graph TD |
| 309 | +/// A["Projection: <br>ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, <mark>trades.date AS date</mark>"] |
| 310 | +/// A --> B["Aggregate: <br>expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)] <br>groupby=[ticker, <mark>trades.date</mark>]"] |
| 311 | +/// B --> C["Inner Join: <br>trades.ticker = daily_statistics.ticker AND <br>trades.date = daily_statistics.reference_date AND <br><mark>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS</mark>"] |
| 312 | +/// C --> D["TableScan: trades <br>projection=[ticker, price, <mark>date</mark>]"] |
| 313 | +/// C --> E["TableScan: daily_statistics <br>projection=[ticker, settlement_price, reference_date, <mark>date</mark>]"] |
| 314 | +/// ``` |
| 315 | +/// |
| 316 | +/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan: |
| 317 | +/// |
| 318 | +/// ```mermaid |
| 319 | +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% |
| 320 | +/// graph TD |
| 321 | +/// A["Projection: trades.date AS date"] |
| 322 | +/// A --> B["Projection: trades.date"] |
| 323 | +/// B --> C["Inner Join: <br>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"] |
| 324 | +/// C --> D["TableScan: trades (projection=[date])"] |
| 325 | +/// C --> E["TableScan: daily_statistics (projection=[date])"] |
| 326 | +/// ``` |
| 327 | +/// |
| 328 | +/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that |
| 329 | +/// we cannot partition the materialized view on aggregate expressions. |
| 330 | +/// |
| 331 | +/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan, |
| 332 | +/// together with the target path constructed from the (static) partition columns. This gives us the following plan: |
| 333 | +/// |
| 334 | +/// ```mermaid |
| 335 | +/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%% |
| 336 | +/// graph TD |
| 337 | +/// A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"] |
| 338 | +/// A --> B["Projection: __meta, trades.date AS date"] |
| 339 | +/// B --> C["Projection: <br>concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"] |
| 340 | +/// C --> D["Inner Join: <br><b>daily_statistics_meta</b>.date BETWEEN <b>trades_meta</b>.date AND <b>trades_meta</b>.date + INTERVAL 2 WEEKS"] |
| 341 | +/// D --> E["TableScan: <b>trades_meta</b> (projection=[__meta, date])"] |
| 342 | +/// D --> F["TableScan: <b>daily_statistics_meta</b> (projection=[__meta, date])"] |
| 343 | +/// ``` |
| 344 | +/// |
| 345 | +/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column |
| 346 | +/// unnested into its components. The final output looks roughly like this: |
| 347 | +/// |
| 348 | +/// ```text |
| 349 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ |
| 350 | +/// | target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified | |
| 351 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ |
| 352 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | trades | s3://trades/date=2023-01-01/data.01.parquet | 2023-07-11T16:29:26 | |
| 353 | +/// | s3://daily_close/date=2023-01-01/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22 | |
| 354 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | trades | s3://trades/date=2023-01-02/data.01.parquet | 2023-07-11T16:45:44 | |
| 355 | +/// | s3://daily_close/date=2023-01-02/ | datafusion | public | daily_statistics | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10 | |
| 356 | +/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+ |
| 357 | +/// ``` |
257 | 358 | pub fn mv_dependencies_plan( |
258 | 359 | materialized_view: &dyn Materialized, |
259 | 360 | row_metadata_registry: &RowMetadataRegistry, |
|
0 commit comments