diff --git a/README.md b/README.md index 350b3bd..3d83008 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion. -A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*. +A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as _query rewriting_. -Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions: +Efficiently maintaining the up-to-dateness of a materialized view is a problem known as _incremental view maintenance_. It is a hard problem in general, but we make some simplifying assumptions: * Data is stored as Hive-partitioned files in object storage. * The smallest unit of data that can be updated is a single file. @@ -14,7 +14,7 @@ This is a typical pattern with DataFusion, as files in object storage usually ar ## Example Here we walk through a hypothetical example of setting up a materialized view, to illustrate -what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function), +what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function), called `mv_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine when partitions of the materialized view need to be recomputed. @@ -52,3 +52,15 @@ SELECT * FROM mv_dependencies('m1'); +--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+ ``` +## More detailed example (with code) + +As of now, actually implementing materialized views is somewhat complicated, as the library is initially focused on providing a minimal kernel of functionality that can be shared across multiple implementations of materialized views. Broadly, the process includes these steps: + +* Define a custom `MaterializedListingTable` type that implements `Materialized` +* Register the type globally using the `register_materialized` global function +* Initialize the `FileMetadata` component +* Initialize the `RowMetadataRegistry` +* Register the `mv_dependencies` and `stale_files` UDTFs (User Defined Table Functions) in your DataFusion `SessionContext` +* Periodically regenerate directories marked as stale by `stale_files` + +A full walkthrough of this process including implementation can be seen in an integration test, under [`tests/materialized_listing_table.rs`](tests/materialized_listing_table.rs). diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index 0b41ec8..62ee198 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -200,12 +200,9 @@ impl TableFunctionImpl for StaleFilesUdtf { col("expected_target").alias("target"), col("target_last_modified"), col("sources_last_modified"), - coalesce(vec![ - col("target_last_modified"), - lit(ScalarValue::TimestampNanosecond(Some(0), None)), - ]) - .lt(col("sources_last_modified")) - .alias("is_stale"), + nvl(col("target_last_modified"), lit(0)) + .lt(col("sources_last_modified")) + .alias("is_stale"), ])? .build()?; diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index add8df3..328618e 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -30,6 +30,7 @@ use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_CO #[derive(Default)] pub struct RowMetadataRegistry { metadata_sources: DashMap>, + default_source: Option>, } impl std::fmt::Debug for RowMetadataRegistry { @@ -48,6 +49,17 @@ impl std::fmt::Debug for RowMetadataRegistry { } impl RowMetadataRegistry { + /// Initializes this `RowMetadataRegistry` with a default `RowMetadataSource` + /// to be used if a table has not been explicitly registered with a specific source. + /// + /// Typically the [`FileMetadata`] source should be used as the default. + pub fn new_with_default_source(default_source: Arc) -> Self { + Self { + default_source: Some(default_source), + ..Default::default() + } + } + /// Registers a metadata source for a specific table. /// Returns the previously registered source for this table, if any pub fn register_source( @@ -63,6 +75,7 @@ impl RowMetadataRegistry { self.metadata_sources .get(&table.to_string()) .map(|o| Arc::clone(o.value())) + .or_else(|| self.default_source.clone()) .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table))) } } diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs new file mode 100644 index 0000000..fa26a7b --- /dev/null +++ b/tests/materialized_listing_table.rs @@ -0,0 +1,569 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{any::Any, borrow::Cow, collections::HashMap, ops::Deref, sync::Arc}; + +use anyhow::{bail, Context, Result}; +use arrow::{array::StringArray, compute::concat_batches, util::pretty}; +use arrow_schema::{DataType, SchemaRef}; +use datafusion::{ + catalog::{Session, TableProvider}, + datasource::{ + file_format::{parquet::ParquetFormat, FileFormat}, + listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, + }, + execution::{ + object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, + runtime_env::RuntimeEnvBuilder, + }, + prelude::{SessionConfig, SessionContext}, +}; +use datafusion_common::{Constraints, DataFusionError, ParamValues, ScalarValue, Statistics}; +use datafusion_expr::{ + col, dml::InsertOp, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, SortExpr, + TableProviderFilterPushDown, TableType, +}; +use datafusion_materialized_views::materialized::{ + dependencies::{mv_dependencies, stale_files}, + file_metadata::FileMetadata, + register_materialized, + row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry}, + ListingTableLike, Materialized, +}; +use datafusion_physical_plan::{collect, ExecutionPlan}; +use datafusion_sql::TableReference; +use futures::{StreamExt, TryStreamExt}; +use itertools::{Either, Itertools}; +use object_store::local::LocalFileSystem; +use object_store::path::Path as ObjectPath; +use tempfile::TempDir; +use url::Url; + +/// Configuration for a materialized listing table +struct MaterializedListingConfig { + /// Path in object storage where the materialized data will be stored + table_path: ListingTableUrl, + /// The query defining the view to be materialized + query: LogicalPlan, + /// Additional configuration options + options: Option, +} + +struct MaterializedListingOptions { + file_extension: String, + format: Arc, + table_partition_cols: Vec, + collect_stat: bool, + target_partitions: usize, + file_sort_order: Vec>, +} + +/// A materialized [`ListingTable`]. +#[derive(Debug)] +struct MaterializedListingTable { + /// The underlying [`ListingTable`] + inner: ListingTable, + /// The query defining this materialized view + query: LogicalPlan, + /// The Arrow schema of the query + schema: SchemaRef, +} + +impl ListingTableLike for MaterializedListingTable { + fn table_paths(&self) -> Vec { + ::table_paths(&self.inner) + } + + fn partition_columns(&self) -> Vec { + ::partition_columns(&self.inner) + } + + fn file_ext(&self) -> String { + self.inner.file_ext() + } +} + +impl Materialized for MaterializedListingTable { + fn query(&self) -> LogicalPlan { + self.query.clone() + } +} + +struct TestContext { + _dir: TempDir, + ctx: SessionContext, +} + +impl Deref for TestContext { + type Target = SessionContext; + + fn deref(&self) -> &Self::Target { + &self.ctx + } +} + +async fn setup() -> Result { + let _ = env_logger::builder().is_test(true).try_init(); + + // All custom table providers must be registered using the `register_materialized` and/or `register_listing_table` APIs. + register_materialized::(); + + // Override the object store with one rooted in a temporary directory + + let dir = TempDir::new().context("create tempdir")?; + let store = LocalFileSystem::new_with_prefix(&dir) + .map(Arc::new) + .context("create local file system object store")?; + + let registry = Arc::new(DefaultObjectStoreRegistry::new()); + registry + .register_store(&Url::parse("file://").unwrap(), store) + .context("register file system store") + .expect("should replace existing object store at file://"); + + let ctx = SessionContext::new_with_config_rt( + SessionConfig::new(), + RuntimeEnvBuilder::new() + .with_object_store_registry(registry) + .build_arc() + .context("create RuntimeEnv")?, + ); + + // Now register the `mv_dependencies` and `stale_files` UDTFs + // They have `FileMetadata` and `RowMetadataRegistry` as dependencies. + + let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))); + + let row_metadata_registry = Arc::new(RowMetadataRegistry::new_with_default_source(Arc::new( + ObjectStoreRowMetadataSource::new(Arc::clone(&file_metadata)), + ))); + + ctx.register_udtf( + "mv_dependencies", + mv_dependencies( + Arc::clone(ctx.state().catalog_list()), + Arc::clone(&row_metadata_registry), + ctx.state().config_options(), + ), + ); + ctx.register_udtf( + "stale_files", + stale_files( + Arc::clone(ctx.state().catalog_list()), + Arc::clone(&row_metadata_registry), + Arc::clone(&file_metadata) as Arc, + ctx.state().config_options(), + ), + ); + + // Create a table with some data + // Our materialized view will be derived from this table + ctx.sql( + " + CREATE EXTERNAL TABLE t1 (num INTEGER, date TEXT, feed CHAR) + STORED AS CSV + PARTITIONED BY (date, feed) + LOCATION 'file:///t1/' + ", + ) + .await? + .show() + .await?; + + ctx.sql( + "INSERT INTO t1 VALUES + (1, '2023-01-01', 'A'), + (2, '2023-01-02', 'B'), + (3, '2023-01-03', 'C'), + (4, '2024-12-04', 'X'), + (5, '2024-12-05', 'Y'), + (6, '2024-12-06', 'Z') + ", + ) + .await? + .collect() + .await?; + + Ok(TestContext { _dir: dir, ctx }) +} + +#[tokio::test] +async fn test_materialized_listing_table_incremental_maintenance() -> Result<()> { + let ctx = setup().await.context("setup")?; + + let query = ctx + .sql( + " + SELECT + COUNT(*) AS count, + CAST(date_part('YEAR', date) AS INT) AS year + FROM t1 + GROUP BY year + ", + ) + .await? + .into_unoptimized_plan(); + + let mv = Arc::new(MaterializedListingTable::try_new( + MaterializedListingConfig { + table_path: ListingTableUrl::parse("file:///m1/")?, + query, + options: Some(MaterializedListingOptions { + file_extension: ".parquet".to_string(), + format: Arc::::default(), + table_partition_cols: vec!["year".into()], + collect_stat: false, + target_partitions: 1, + file_sort_order: vec![vec![SortExpr { + expr: col("count"), + asc: true, + nulls_first: false, + }]], + }), + }, + )?); + + ctx.register_table("m1", Arc::clone(&mv) as Arc)?; + + assert_eq!( + refresh_materialized_listing_table(&ctx, "m1".into()).await?, + vec![ + // The initial call should refresh all Hive partitions + "file:///m1/year=2023/".to_string(), + "file:///m1/year=2024/".to_string() + ] + ); + assert!(materialized_view_up_to_date(&ctx, &mv, "m1").await?); + + // Insert another row into the source table + ctx.sql( + "INSERT INTO t1 VALUES + (7, '2024-12-07', 'W')", + ) + .await? + .collect() + .await?; + + // Materialized view should be out of date now + assert!(!materialized_view_up_to_date(&ctx, &mv, "m1").await?); + + // Refresh the materialized view and check that it's up to date again + assert_eq!( + refresh_materialized_listing_table(&ctx, "m1".into()).await?, + // Only the Hive partition for 2024 should be refreshed, + // since the row we added was in 2024. + vec!["file:///m1/year=2024/".to_string()] + ); + assert!(materialized_view_up_to_date(&ctx, &mv, "m1").await?); + + Ok(()) +} + +/// Check that the table's materialized data the same rows as its query. +async fn materialized_view_up_to_date( + ctx: &TestContext, + mv: &MaterializedListingTable, + table_name: impl Into, +) -> Result { + let table_name = table_name.into(); + // Using anti-joins, verify A ⊆ B and B ⊆ A (therefore A = B) + for join_type in [JoinType::LeftAnti, JoinType::RightAnti] { + let num_rows_not_matching = ctx + .sql(&format!("SELECT * FROM {table_name}")) + .await? + .join( + ctx.execute_logical_plan(mv.query.clone()).await?, + join_type, + &["count", "year"], + &["count", "year"], + None, + )? + .count() + .await?; + if num_rows_not_matching != 0 { + return Ok(false); + } + } + + Ok(true) +} + +impl MaterializedListingTable { + fn try_new(config: MaterializedListingConfig) -> Result { + let schema = config.query.schema().as_arrow(); + let (partition_indices, file_indices): (Vec, Vec) = schema + .fields() + .iter() + .enumerate() + .partition_map(|(i, field)| { + if config + .options + .as_ref() + .is_some_and(|opts| opts.table_partition_cols.contains(field.name())) + { + Either::Left(i) + } else { + Either::Right(i) + } + }); + + let file_schema = schema.project(&file_indices)?; + + // Rewrite the query to have the partition columns at the end. + // It's convention for Hive-partitioned tables to have the partition columns at the end. + let normalizing_projection = file_indices + .iter() + .copied() + .chain(partition_indices.iter().copied()) + .collect_vec(); + + let normalized_query = LogicalPlanBuilder::new(config.query.clone()) + .select(normalizing_projection)? + .build()?; + let normalized_schema = Arc::new(normalized_query.schema().as_arrow().clone()); + + let table_partition_cols = schema + .project(&partition_indices)? + .fields + .into_iter() + .map(|f| (f.name().clone(), f.data_type().clone())) + .collect_vec(); + + let options = config.options.map(|opts| ListingOptions { + file_extension: opts.file_extension, + format: opts.format, + table_partition_cols, + collect_stat: opts.collect_stat, + target_partitions: opts.target_partitions, + file_sort_order: opts.file_sort_order, + }); + + Ok(MaterializedListingTable { + inner: ListingTable::try_new(ListingTableConfig { + table_paths: vec![config.table_path], + file_schema: Some(Arc::new(file_schema)), + options, + })?, + query: normalized_query, + schema: normalized_schema, + }) + } +} + +/// Attempt to refresh the data in this materialized view. +async fn refresh_materialized_listing_table( + ctx: &TestContext, + table_name: TableReference, +) -> Result> { + let stale_targets = ctx + .sql(&format!( + "SELECT target FROM stale_files('{table_name}') WHERE is_stale ORDER BY target" + )) + .await? + .collect() + .await?; + + let batches = concat_batches(&stale_targets[0].schema(), &stale_targets)?; + let targets = batches + .column(0) + .as_any() + .downcast_ref::() + .unwrap() + .into_iter() + .flatten() + .collect_vec(); + + let table = ctx.table_provider(table_name.clone()).await?; + + for target in &targets { + refresh_mv_target(ctx, table.as_any().downcast_ref().unwrap(), target).await?; + } + + if ctx + .sql(&format!( + "SELECT target FROM stale_files('{table_name}') WHERE is_stale" + )) + .await? + .count() + .await? + != 0 + { + bail!("Expected no stale targets after materialization"); + } + + Ok(targets.iter().map(|s| s.to_string()).collect()) +} + +/// Refresh a particular target directory for a materialized view. +/// +/// More robust implementations should perform this atomically. +/// For the sake of ease, we just delete the data then use the `ExecutionPlan::insert_into` API. +async fn refresh_mv_target( + ctx: &TestContext, + table: &MaterializedListingTable, + target: &str, +) -> Result<()> { + let url = ListingTableUrl::parse(target)?; + let store = ctx.state().runtime_env().object_store(url.object_store())?; + + // Delete all of the data in this directory + store + .delete_stream( + store + .list(Some(url.prefix())) + .map_ok(|meta| meta.location) + .boxed(), + ) + .try_collect::>() + .await?; + + // Insert fresh data + let writer_plan = table + .insert_into( + &ctx.state(), + ctx.execute_logical_plan(table.job_for_target(url)?) + .await? + .create_physical_plan() + .await?, + InsertOp::Append, + ) + .await?; + + pretty::print_batches(&collect(writer_plan, ctx.task_ctx()).await?)?; + + Ok(()) +} + +impl MaterializedListingTable { + /// Returns a query in the form of a logical plan, with placeholder parameters corresponding to the + /// partition columns of this table. + /// Data for a single partition can be generated by executing this query with the partition values as parameters. + fn job_for_partition(&self) -> Result { + use datafusion::prelude::*; + let part_cols = self.partition_columns(); + + LogicalPlanBuilder::new(self.query.clone()) + .filter( + part_cols + .iter() + .enumerate() + .map(|(i, pc)| col(pc).eq(placeholder(format!("${}", i + 1)))) + .fold(lit(true), |a, b| a.and(b)), + )? + .sort(self.inner.options().file_sort_order[0].clone())? + .build() + } + + /// Extracts the partition columns from the target URL and returns a query that can be executed + /// to regenerate the data for this target directory. + fn job_for_target(&self, target: ListingTableUrl) -> Result { + let partition_columns_with_data_types = self.inner.options().table_partition_cols.clone(); + + self.job_for_partition()? + .with_param_values(ParamValues::List(parse_partition_values( + target.prefix(), + &partition_columns_with_data_types, + )?)) + } +} + +#[async_trait::async_trait] +impl TableProvider for MaterializedListingTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + fn constraints(&self) -> Option<&Constraints> { + self.inner.constraints() + } + + fn table_type(&self) -> TableType { + self.inner.table_type() + } + + fn get_table_definition(&self) -> Option<&str> { + self.inner.get_table_definition() + } + + fn get_logical_plan(&self) -> Option> { + // We _could_ return the LogicalPlan here, + // but it will cause this table to be treated like a regular view + // and the materialized results will not be used. + None + } + + fn get_column_default(&self, column: &str) -> Option<&Expr> { + self.inner.get_column_default(column) + } + + async fn scan( + &self, + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> Result, DataFusionError> { + self.inner.scan(state, projection, filters, limit).await + } + + fn supports_filters_pushdown( + &self, + filters: &[&Expr], + ) -> Result, DataFusionError> { + self.inner.supports_filters_pushdown(filters) + } + + fn statistics(&self) -> Option { + self.inner.statistics() + } + + async fn insert_into( + &self, + state: &dyn Session, + input: Arc, + insert_op: InsertOp, + ) -> Result, DataFusionError> { + self.inner.insert_into(state, input, insert_op).await + } +} + +/// Parse partition column values from an object path. +fn parse_partition_values( + path: &ObjectPath, + partition_columns: &[(String, DataType)], +) -> Result, DataFusionError> { + let parts = path.parts().map(|part| part.to_owned()).collect::>(); + + let pairs = parts + .iter() + .filter_map(|part| part.as_ref().split_once('=')) + .collect::>(); + + let partition_values = partition_columns + .iter() + .map(|(column, datatype)| { + let value = pairs.get(column.as_str()).copied().map(String::from); + ScalarValue::Utf8(value).cast_to(datatype) + }) + .collect::, _>>()?; + + Ok(partition_values) +}