Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ authors = ["Matthew Cramerus <[email protected]>"]
license = "Apache-2.0"
description = "Materialized Views & Query Rewriting in DataFusion"
keywords = ["arrow", "arrow-rs", "datafusion"]
rust-version = "1.73"
rust-version = "1.80"

[dependencies]
arrow = "53"
arrow-schema = "53"
async-trait = "0.1"
dashmap = "6"
datafusion = "43"
datafusion-common = "43"
datafusion-expr = "43"
datafusion-functions = "43"
datafusion-functions-aggregate = "43"
datafusion-physical-expr = "43"
datafusion-physical-plan = "43"
datafusion-sql = "43"
futures = "0.3"
itertools = "0.13"
log = "0.4"
object_store = "0.11"

[dev-dependencies]
anyhow = "1.0.95"
env_logger = "0.11.6"
tempfile = "3.14.0"
tokio = "1.42.0"
url = "2.5.4"
10 changes: 9 additions & 1 deletion deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,13 @@
# under the License.

[licenses]
allow = ["Apache-2.0"]
allow = [
"Apache-2.0",
"Apache-2.0 WITH LLVM-exception",
"MIT",
"BSD-2-Clause",
"BSD-3-Clause",
"CC0-1.0",
"Unicode-3.0"
]
version = 2
26 changes: 17 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
// specific language governing permissions and limitations
// under the License.

/// Code for incremental view maintenance.
mod materialized;
#![deny(missing_docs)]

/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
mod rewrite;
//! `datafusion-materialized-views` implements algorithms and functionality for materialized views in DataFusion.

/// Code for incremental view maintenance against Hive-partitioned tables.
///
/// An example of a Hive-partitioned table is the [`ListingTable`](datafusion::datasource::listing::ListingTable).
/// By analyzing the fragment of the materialized view query pertaining to the partition columns,
/// we can derive a build graph that relates the files of a materialized views and the files of the tables it depends on.
///
/// A central trait is defined for Hive-partitioned tables, [`ListingTableLike`](materialized::ListingTableLike). Note that
/// all implementations of [`ListingTableLike`](materialized::ListingTableLike) must be registered using the
/// [`register_listing_table`](materialized::register_listing_table) function, otherwise the tables may not be detected by
/// the incremental view maintenance code, including components such as [`FileMetadata`](materialized::file_metadata::FileMetadata)
/// or [`RowMetadataRegistry`](materialized::row_metadata::RowMetadataRegistry).
pub mod materialized;

#[cfg(test)]
mod test {
#[test]
fn test_it_works() {}
}
/// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views.
pub mod rewrite;
138 changes: 138 additions & 0 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,141 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

/// Track dependencies of materialized data in object storage
mod dependencies;

/// Pluggable metadata sources for incremental view maintenance
pub mod row_metadata;

/// A virtual table that exposes files in object storage.
pub mod file_metadata;

/// A UDF that parses Hive partition elements from object storage paths.
mod hive_partition;

use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
sync::{Arc, LazyLock},
};

use dashmap::DashMap;
use datafusion::{
catalog::TableProvider,
datasource::listing::{ListingTable, ListingTableUrl},
};
use datafusion_expr::LogicalPlan;
use itertools::Itertools;

/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
pub const META_COLUMN: &str = "__meta";

static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);

/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage.
pub trait ListingTableLike: TableProvider + 'static {
/// Object store URLs for this table
fn table_paths(&self) -> Vec<ListingTableUrl>;

/// Hive partition columns
fn partition_columns(&self) -> Vec<String>;

/// File extension used by this listing table
fn file_ext(&self) -> String;
}

impl ListingTableLike for ListingTable {
fn table_paths(&self) -> Vec<ListingTableUrl> {
self.table_paths().clone()
}

fn partition_columns(&self) -> Vec<String> {
self.options()
.table_partition_cols
.iter()
.map(|(name, _data_type)| name.clone())
.collect_vec()
}

fn file_ext(&self) -> String {
self.options().file_extension.clone()
}
}

/// Register a [`ListingTableLike`] implementation in this registry.
/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
/// into a [`ListingTableLike`] where possible.
pub fn register_listing_table<T: ListingTableLike>() {
TABLE_TYPE_REGISTRY.register_listing_table::<T>();
}

/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
}

/// A hive-partitioned table in object storage that is defined by a user-provided query.
pub trait Materialized: ListingTableLike {
/// The query that defines this materialized view.
fn query(&self) -> LogicalPlan;
}

type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;

/// A registry for implementations of [`ListingTableLike`], used for downcasting
/// arbitrary TableProviders into `dyn ListingTableLike` where possible.
///
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`.
/// By default, [`ListingTable`] is registered.
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
}

impl Debug for TableTypeRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableTypeRegistry")
.field(
"listing_table_accessors",
&self
.listing_table_accessors
.iter()
.map(|r| r.value().0)
.collect_vec(),
)
.finish()
}
}

impl Default for TableTypeRegistry {
fn default() -> Self {
let new = Self {
listing_table_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();

new
}
}

impl TableTypeRegistry {
fn register_listing_table<T: ListingTableLike>(&self) {
self.listing_table_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
),
);
}

fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn ListingTableLike> {
self.listing_table_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}
16 changes: 16 additions & 0 deletions src/materialized/dependencies.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
Loading
Loading