Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions src/materialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use datafusion::{
catalog::TableProvider,
datasource::listing::{ListingTable, ListingTableUrl},
};
use datafusion_common::DataFusionError;
use datafusion_expr::LogicalPlan;
use itertools::Itertools;

Expand Down Expand Up @@ -110,6 +111,14 @@ pub trait Materialized: ListingTableLike {
fn config(&self) -> MaterializedConfig {
MaterializedConfig::default()
}

/// Which partition columns are 'static'.
/// Static partition columns are those that are used in incremental view maintenance.
/// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`].
/// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh.
fn static_partition_columns(&self) -> Vec<String> {
<Self as ListingTableLike>::partition_columns(self)
}
}

/// Register a [`Materialized`] implementation in this registry.
Expand All @@ -122,13 +131,38 @@ pub fn register_materialized<T: Materialized>() {
}

/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_materialized(decorator.base()))
})
/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`.
///
/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated.
/// In particular, checks that the static partition columns are a prefix of all partition columns.
pub fn cast_to_materialized(
table: &dyn TableProvider,
) -> Result<Option<&dyn Materialized>, DataFusionError> {
let materialized = match TABLE_TYPE_REGISTRY
.cast_to_materialized(table)
.map(Ok)
.or_else(|| {
TABLE_TYPE_REGISTRY
.cast_to_decorator(table)
.and_then(|decorator| cast_to_materialized(decorator.base()).transpose())
})
.transpose()?
{
None => return Ok(None),
Some(m) => m,
};

let static_partition_cols = materialized.static_partition_columns();
let all_partition_cols = materialized.partition_columns();

if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..]
{
return Err(DataFusionError::Plan(format!(
"Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})"
)));
}

Ok(Some(materialized))
}

/// A `TableProvider` that decorates other `TableProvider`s.
Expand Down
Loading