Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion.

A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*.
A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as _query rewriting_.

Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions:
Efficiently maintaining the up-to-dateness of a materialized view is a problem known as _incremental view maintenance_. It is a hard problem in general, but we make some simplifying assumptions:

* Data is stored as Hive-partitioned files in object storage.
* The smallest unit of data that can be updated is a single file.
Expand All @@ -14,7 +14,7 @@ This is a typical pattern with DataFusion, as files in object storage usually ar
## Example

Here we walk through a hypothetical example of setting up a materialized view, to illustrate
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
called `mv_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine
when partitions of the materialized view need to be recomputed.

Expand Down Expand Up @@ -52,3 +52,15 @@ SELECT * FROM mv_dependencies('m1');
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
```

## More detailed example (with code)

As of now, actually implementing materialized views is somewhat complicated, as the library is initially focused on providing a minimal kernel of functionality that can be shared across multiple implementations of materialized views. Broadly, the process includes these steps:

* Define a custom `MaterializedListingTable` type that implements `Materialized`
* Register the type globally using the `register_materialized` global function
* Initialize the `FileMetadata` component
* Initialize the `RowMetadataRegistry`
* Register the `mv_dependencies` and `stale_files` UDTFs (User Defined Table Functions) in your DataFusion `SessionContext`
* Periodically regenerate directories marked as stale by `stale_files`

A full walkthrough of this process including implementation can be seen in an integration test, under [`tests/materialized_listing_table.rs`](tests/materialized_listing_table.rs).
9 changes: 3 additions & 6 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,9 @@ impl TableFunctionImpl for StaleFilesUdtf {
col("expected_target").alias("target"),
col("target_last_modified"),
col("sources_last_modified"),
coalesce(vec![
col("target_last_modified"),
lit(ScalarValue::TimestampNanosecond(Some(0), None)),
])
.lt(col("sources_last_modified"))
.alias("is_stale"),
nvl(col("target_last_modified"), lit(0))
.lt(col("sources_last_modified"))
.alias("is_stale"),
])?
.build()?;

Expand Down
13 changes: 13 additions & 0 deletions src/materialized/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_CO
#[derive(Default)]
pub struct RowMetadataRegistry {
metadata_sources: DashMap<String, Arc<dyn RowMetadataSource>>,
default_source: Option<Arc<dyn RowMetadataSource>>,
}

impl std::fmt::Debug for RowMetadataRegistry {
Expand All @@ -48,6 +49,17 @@ impl std::fmt::Debug for RowMetadataRegistry {
}

impl RowMetadataRegistry {
/// Initializes this `RowMetadataRegistry` with a default `RowMetadataSource`
/// to be used if a table has not been explicitly registered with a specific source.
///
/// Typically the [`FileMetadata`] source should be used as the default.
pub fn new_with_default_source(default_source: Arc<dyn RowMetadataSource>) -> Self {
Self {
default_source: Some(default_source),
..Default::default()
}
}

/// Registers a metadata source for a specific table.
/// Returns the previously registered source for this table, if any
pub fn register_source(
Expand All @@ -63,6 +75,7 @@ impl RowMetadataRegistry {
self.metadata_sources
.get(&table.to_string())
.map(|o| Arc::clone(o.value()))
.or_else(|| self.default_source.clone())
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
}
}
Expand Down
Loading
Loading