Skip to content

Commit 14a3518

Browse files
authored
Incremental view maintenance (#3)
* port MV dependency code * improved documentation * fix spelling mistake * fix typo * don't forget license header * readme * explain what UDTF means * fix typo in readme
1 parent b7974e8 commit 14a3518

File tree

9 files changed

+1795
-93
lines changed

9 files changed

+1795
-93
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ datafusion-common = "43"
3737
datafusion-expr = "43"
3838
datafusion-functions = "43"
3939
datafusion-functions-aggregate = "43"
40+
datafusion-optimizer = "43"
4041
datafusion-physical-expr = "43"
4142
datafusion-physical-plan = "43"
4243
datafusion-sql = "43"

README.md

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,54 @@
1-
# datafusion-materialized-views
1+
# datafusion-materialized-views
2+
3+
An implementation of incremental view maintenance & query rewriting for materialized views in DataFusion.
4+
5+
A **materialized view** is a view whose query has been pre-computed and saved for later use. This can drastically speed up workloads by pre-computing at least a large fragment of a user-provided query. Furthermore, by implementing a _view matching_ algorithm, we can implement an optimizer that rewrites queries to automatically make use of materialized views where possible and beneficial, a concept known as *query rewriting*.
6+
7+
Efficiently maintaining the up-to-dateness of a materialized view is a problem known as *incremental view maintenance*. It is a hard problem in general, but we make some simplifying assumptions:
8+
9+
* Data is stored as Hive-partitioned files in object storage.
10+
* The smallest unit of data that can be updated is a single file.
11+
12+
This is a typical pattern with DataFusion, as files in object storage usually are immutable (especially if they are Parquet) and can only be replaced, not appended to or modified. However, it does mean that our implementation of incremental view maintenance only works for Hive-partitioned materialized views in object storage. (Future work may generalize this to alternate storage sources, but the requirement of logically partitioned tables remains.) In contrast, the view matching problem does not depend on the underlying physical representation of the tables.
13+
14+
## Example
15+
16+
Here we walk through a hypothetical example of setting up a materialized view, to illustrate
17+
what this library offers. The core of the incremental view maintenance implementation is a UDTF (User-Defined Table Function),
18+
called `file_dependencies`, that outputs a build graph for a materialized view. This gives users the information they need to determine
19+
when partitions of the materialized view need to be recomputed.
20+
21+
```sql
22+
-- Create a base table
23+
CREATE EXTERNAL TABLE t1 (column0 TEXT, date DATE)
24+
STORED AS PARQUET
25+
PARTITIONED BY (date)
26+
LOCATION 's3://t1/';
27+
28+
INSERT INTO t1 VALUES
29+
('a', '2021-01-01'),
30+
('b', '2022-02-02'),
31+
('c', '2022-02-03'), -- Two values in the year 2022
32+
('d', '2023-03-03');
33+
34+
-- Pretend we can create materialized views in SQL
35+
-- The TableProvider implementation will need to implement the Materialized trait.
36+
CREATE MATERIALIZED VIEW m1 AS SELECT
37+
COUNT(*) AS count,
38+
date_part('YEAR', date) AS year
39+
PARTITIONED BY (year)
40+
LOCATION 's3://m1/';
41+
42+
-- Show the dependency graph for m1 using the file_dependencies UDTF
43+
SELECT * FROM file_dependencies('m1');
44+
45+
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
46+
| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |
47+
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
48+
| s3://m1/year=2021/ | datafusion | public | t1 | s3://t1/date=2021-01-01/data.parquet | 2023-07-11T16:29:26 |
49+
| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-02/data.parquet | 2023-07-11T16:45:22 |
50+
| s3://m1/year=2022/ | datafusion | public | t1 | s3://t1/date=2022-02-03/data.parquet | 2023-07-11T16:45:44 |
51+
| s3://m1/year=2023/ | datafusion | public | t1 | s3://t1/date=2023-03-03/data.parquet | 2023-07-11T16:45:44 |
52+
+--------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
53+
```
54+

src/lib.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,20 @@
2525
/// By analyzing the fragment of the materialized view query pertaining to the partition columns,
2626
/// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on.
2727
///
28-
/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that
29-
/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the
30-
/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by
31-
/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata)
32-
/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry).
28+
/// Two central traits are defined:
29+
///
30+
/// * [`ListingTableLike`](materialized::ListingTableLike): a trait that abstracts Hive-partitioned tables in object storage;
31+
/// * [`Materialized`](materialized::Materialized): a materialized `ListingTableLike` defined by a user-provided query.
32+
///
33+
/// Note that all implementations of `ListingTableLike` and `Materialized` must be registered using the
34+
/// [`register_listing_table`](materialized::register_listing_table) and
35+
/// [`register_materialized`](materialized::register_materialized) functions respectively,
36+
/// otherwise the tables may not be detected by the incremental view maintenance code,
37+
/// including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata),
38+
/// [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry), or the
39+
/// [`file_dependencies`](materialized::dependencies::file_dependencies) UDTF.
40+
///
41+
/// By default, `ListingTableLike` is implemented for [`ListingTable`](datafusion::datasource::listing::ListingTable),
3342
pub mod materialized;
3443

3544
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.

src/materialized.rs

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
/// Track dependencies of materialized data in object storage
19-
mod dependencies;
19+
pub mod dependencies;
2020

2121
/// Pluggable metadata sources for incremental view maintenance
2222
pub mod row_metadata;
@@ -27,6 +27,9 @@ pub mod file_metadata;
2727
/// A UDF that parses Hive partition elements from object storage paths.
2828
mod hive_partition;
2929

30+
/// Some private utility functions
31+
mod util;
32+
3033
use std::{
3134
any::{type_name, Any, TypeId},
3235
fmt::Debug,
@@ -78,7 +81,7 @@ impl ListingTableLike for ListingTable {
7881

7982
/// Register a [`ListingTableLike`] implementation in this registry.
8083
/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
81-
/// into a [`ListingTableLike`] where possible.
84+
/// into a `ListingTableLike` where possible.
8285
pub fn register_listing_table<T: ListingTableLike>() {
8386
TABLE_TYPE_REGISTRY.register_listing_table::<T>();
8487
}
@@ -95,15 +98,31 @@ pub trait Materialized: ListingTableLike {
9598
fn query(&self) -> LogicalPlan;
9699
}
97100

101+
/// Register a [`Materialized`] implementation in this registry.
102+
/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
103+
/// into a `Materialized` where possible.
104+
///
105+
/// Note that this will also register `T` as a [`ListingTableLike`].
106+
pub fn register_materialized<T: Materialized>() {
107+
TABLE_TYPE_REGISTRY.register_materialized::<T>();
108+
}
109+
110+
/// Attempt to cast the given TableProvider into a [`Materialized`].
111+
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
112+
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
113+
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
114+
}
115+
98116
type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
99117

100-
/// A registry for implementations of [`ListingTableLike`], used for downcasting
101-
/// arbitrary TableProviders into `dyn ListingTableLike` where possible.
118+
/// A registry for implementations of library-defined traits, used for downcasting
119+
/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible.
102120
///
103-
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`.
104-
/// By default, [`ListingTable`] is registered.
121+
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`.
122+
/// By default, [`ListingTable`] is registered as a `ListingTableLike`.
105123
struct TableTypeRegistry {
106124
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
125+
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
107126
}
108127

109128
impl Debug for TableTypeRegistry {
@@ -125,6 +144,7 @@ impl Default for TableTypeRegistry {
125144
fn default() -> Self {
126145
let new = Self {
127146
listing_table_accessors: DashMap::new(),
147+
materialized_accessors: DashMap::new(),
128148
};
129149
new.register_listing_table::<ListingTable>();
130150

@@ -143,6 +163,18 @@ impl TableTypeRegistry {
143163
);
144164
}
145165

166+
fn register_materialized<T: Materialized>(&self) {
167+
self.materialized_accessors.insert(
168+
TypeId::of::<T>(),
169+
(
170+
type_name::<T>(),
171+
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
172+
),
173+
);
174+
175+
self.register_listing_table::<T>();
176+
}
177+
146178
fn cast_to_listing_table<'a>(
147179
&'a self,
148180
table: &'a dyn TableProvider,
@@ -151,4 +183,13 @@ impl TableTypeRegistry {
151183
.get(&table.as_any().type_id())
152184
.and_then(|r| r.value().1(table.as_any()))
153185
}
186+
187+
fn cast_to_materialized<'a>(
188+
&'a self,
189+
table: &'a dyn TableProvider,
190+
) -> Option<&'a dyn Materialized> {
191+
self.materialized_accessors
192+
.get(&table.as_any().type_id())
193+
.and_then(|r| r.value().1(table.as_any()))
194+
}
154195
}

0 commit comments

Comments
 (0)