Skip to content

Commit 64eaabd

Browse files
authored
feat: Decorator trait (datafusion-contrib#26)
* new decorator api * exercise decorator in tests
1 parent 5fdd03e commit 64eaabd

File tree

2 files changed

+96
-15
lines changed

2 files changed

+96
-15
lines changed

src/materialized.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,13 @@ pub fn register_listing_table<T: ListingTableLike>() {
8989
/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
9090
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
9191
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
92-
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
92+
TABLE_TYPE_REGISTRY
93+
.cast_to_listing_table(table)
94+
.or_else(|| {
95+
TABLE_TYPE_REGISTRY
96+
.cast_to_decorator(table)
97+
.and_then(|decorator| cast_to_listing_table(decorator.base()))
98+
})
9399
}
94100

95101
/// A hive-partitioned table in object storage that is defined by a user-provided query.
@@ -110,7 +116,24 @@ pub fn register_materialized<T: Materialized>() {
110116
/// Attempt to cast the given TableProvider into a [`Materialized`].
111117
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
112118
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
113-
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
119+
TABLE_TYPE_REGISTRY.cast_to_materialized(table).or_else(|| {
120+
TABLE_TYPE_REGISTRY
121+
.cast_to_decorator(table)
122+
.and_then(|decorator| cast_to_materialized(decorator.base()))
123+
})
124+
}
125+
126+
/// A `TableProvider` that decorates other `TableProvider`s.
127+
/// Sometimes users may implement a `TableProvider` that overrides functionality of a base `TableProvider`.
128+
/// This API allows the decorator to also be recognized as `ListingTableLike` or `Materialized` automatically.
129+
pub trait Decorator: TableProvider + 'static {
130+
/// The underlying `TableProvider` that this decorator wraps.
131+
fn base(&self) -> &dyn TableProvider;
132+
}
133+
134+
/// Register `T` as a [`Decorator`].
135+
pub fn register_decorator<T: Decorator>() {
136+
TABLE_TYPE_REGISTRY.register_decorator::<T>()
114137
}
115138

116139
type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
@@ -123,6 +146,7 @@ type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
123146
struct TableTypeRegistry {
124147
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
125148
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
149+
decorator_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Decorator>)>,
126150
}
127151

128152
impl Debug for TableTypeRegistry {
@@ -145,6 +169,7 @@ impl Default for TableTypeRegistry {
145169
let new = Self {
146170
listing_table_accessors: DashMap::new(),
147171
materialized_accessors: DashMap::new(),
172+
decorator_accessors: DashMap::new(),
148173
};
149174
new.register_listing_table::<ListingTable>();
150175

@@ -175,6 +200,16 @@ impl TableTypeRegistry {
175200
self.register_listing_table::<T>();
176201
}
177202

203+
fn register_decorator<T: Decorator>(&self) {
204+
self.decorator_accessors.insert(
205+
TypeId::of::<T>(),
206+
(
207+
type_name::<T>(),
208+
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Decorator)),
209+
),
210+
);
211+
}
212+
178213
fn cast_to_listing_table<'a>(
179214
&'a self,
180215
table: &'a dyn TableProvider,
@@ -192,4 +227,10 @@ impl TableTypeRegistry {
192227
.get(&table.as_any().type_id())
193228
.and_then(|r| r.value().1(table.as_any()))
194229
}
230+
231+
fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> {
232+
self.decorator_accessors
233+
.get(&table.as_any().type_id())
234+
.and_then(|r| r.value().1(table.as_any()))
235+
}
195236
}

src/materialized/dependencies.rs

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ impl TableFunctionImpl for FileDependenciesUdtf {
107107
let table = util::get_table(self.catalog_list.as_ref(), &table_ref)
108108
.map_err(|e| DataFusionError::Plan(e.to_string()))?;
109109

110-
let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(
111-
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized".to_string(),
110+
let mv = cast_to_materialized(table.as_ref()).ok_or(DataFusionError::Plan(format!(
111+
"mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
112112
))?;
113113

114114
Ok(Arc::new(ViewTable::try_new(
@@ -846,9 +846,9 @@ mod test {
846846

847847
use crate::materialized::{
848848
dependencies::pushdown_projection_inexact,
849-
register_materialized,
849+
register_decorator, register_materialized,
850850
row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
851-
ListingTableLike, Materialized,
851+
Decorator, ListingTableLike, Materialized,
852852
};
853853

854854
use super::{mv_dependencies, stale_files};
@@ -907,10 +907,47 @@ mod test {
907907
}
908908
}
909909

910+
#[derive(Debug)]
911+
struct DecoratorTable {
912+
inner: Arc<dyn TableProvider>,
913+
}
914+
915+
#[async_trait::async_trait]
916+
impl TableProvider for DecoratorTable {
917+
fn as_any(&self) -> &dyn Any {
918+
self
919+
}
920+
921+
fn schema(&self) -> SchemaRef {
922+
self.inner.schema()
923+
}
924+
925+
fn table_type(&self) -> TableType {
926+
self.inner.table_type()
927+
}
928+
929+
async fn scan(
930+
&self,
931+
state: &dyn Session,
932+
projection: Option<&Vec<usize>>,
933+
filters: &[Expr],
934+
limit: Option<usize>,
935+
) -> Result<Arc<dyn ExecutionPlan>> {
936+
self.inner.scan(state, projection, filters, limit).await
937+
}
938+
}
939+
940+
impl Decorator for DecoratorTable {
941+
fn base(&self) -> &dyn TableProvider {
942+
self.inner.as_ref()
943+
}
944+
}
945+
910946
async fn setup() -> Result<SessionContext> {
911947
let _ = env_logger::builder().is_test(true).try_init();
912948

913949
register_materialized::<MockMaterializedView>();
950+
register_decorator::<DecoratorTable>();
914951

915952
let state = SessionStateBuilder::new()
916953
.with_default_features()
@@ -1298,15 +1335,18 @@ mod test {
12981335
context
12991336
.register_table(
13001337
case.table_name,
1301-
Arc::new(MockMaterializedView {
1302-
table_path: case.table_path.clone(),
1303-
partition_columns: case
1304-
.partition_cols
1305-
.iter()
1306-
.map(|s| s.to_string())
1307-
.collect(),
1308-
query: plan,
1309-
file_ext: case.file_extension,
1338+
// Register table with a decorator to exercise this functionality
1339+
Arc::new(DecoratorTable {
1340+
inner: Arc::new(MockMaterializedView {
1341+
table_path: case.table_path.clone(),
1342+
partition_columns: case
1343+
.partition_cols
1344+
.iter()
1345+
.map(|s| s.to_string())
1346+
.collect(),
1347+
query: plan,
1348+
file_ext: case.file_extension,
1349+
}),
13101350
}),
13111351
)
13121352
.expect("couldn't register materialized view");

0 commit comments

Comments
 (0)