|
14 | 14 | // KIND, either express or implied. See the License for the |
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
| 17 | + |
| 18 | +/// Track dependencies of materialized data in object storage |
| 19 | +mod dependencies; |
| 20 | + |
| 21 | +/// Pluggable metadata sources for incremental view maintenance |
| 22 | +pub mod row_metadata; |
| 23 | + |
| 24 | +/// A virtual table that exposes files in object storage. |
| 25 | +pub mod file_metadata; |
| 26 | + |
| 27 | +/// A UDF that parses Hive partition elements from object storage paths. |
| 28 | +mod hive_partition; |
| 29 | + |
| 30 | +use std::{ |
| 31 | + any::{type_name, Any, TypeId}, |
| 32 | + fmt::Debug, |
| 33 | + sync::{Arc, LazyLock}, |
| 34 | +}; |
| 35 | + |
| 36 | +use dashmap::DashMap; |
| 37 | +use datafusion::{ |
| 38 | + catalog::TableProvider, |
| 39 | + datasource::listing::{ListingTable, ListingTableUrl}, |
| 40 | +}; |
| 41 | +use datafusion_expr::LogicalPlan; |
| 42 | +use itertools::Itertools; |
| 43 | + |
| 44 | +/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in. |
| 45 | +pub const META_COLUMN: &str = "__meta"; |
| 46 | + |
| 47 | +static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default); |
| 48 | + |
| 49 | +/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage. |
| 50 | +pub trait ListingTableLike: TableProvider + 'static { |
| 51 | + /// Object store URLs for this table |
| 52 | + fn table_paths(&self) -> Vec<ListingTableUrl>; |
| 53 | + |
| 54 | + /// Hive partition columns |
| 55 | + fn partition_columns(&self) -> Vec<String>; |
| 56 | + |
| 57 | + /// File extension used by this listing table |
| 58 | + fn file_ext(&self) -> String; |
| 59 | +} |
| 60 | + |
| 61 | +impl ListingTableLike for ListingTable { |
| 62 | + fn table_paths(&self) -> Vec<ListingTableUrl> { |
| 63 | + self.table_paths().clone() |
| 64 | + } |
| 65 | + |
| 66 | + fn partition_columns(&self) -> Vec<String> { |
| 67 | + self.options() |
| 68 | + .table_partition_cols |
| 69 | + .iter() |
| 70 | + .map(|(name, _data_type)| name.clone()) |
| 71 | + .collect_vec() |
| 72 | + } |
| 73 | + |
| 74 | + fn file_ext(&self) -> String { |
| 75 | + self.options().file_extension.clone() |
| 76 | + } |
| 77 | +} |
| 78 | + |
| 79 | +/// Register a [`ListingTableLike`] implementation in this registry. |
| 80 | +/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`] |
| 81 | +/// into a [`ListingTableLike`] where possible. |
| 82 | +pub fn register_listing_table<T: ListingTableLike>() { |
| 83 | + TABLE_TYPE_REGISTRY.register_listing_table::<T>(); |
| 84 | +} |
| 85 | + |
| 86 | +/// Attempt to cast the given TableProvider into a [`ListingTableLike`]. |
| 87 | +/// If the table's type has not been registered using [`register_listing_table`], will return `None`. |
| 88 | +pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> { |
| 89 | + TABLE_TYPE_REGISTRY.cast_to_listing_table(table) |
| 90 | +} |
| 91 | + |
| 92 | +/// A hive-partitioned table in object storage that is defined by a user-provided query. |
| 93 | +pub trait Materialized: ListingTableLike { |
| 94 | + /// The query that defines this materialized view. |
| 95 | + fn query(&self) -> LogicalPlan; |
| 96 | +} |
| 97 | + |
| 98 | +type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>; |
| 99 | + |
| 100 | +/// A registry for implementations of [`ListingTableLike`], used for downcasting |
| 101 | +/// arbitrary TableProviders into `dyn ListingTableLike` where possible. |
| 102 | +/// |
| 103 | +/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike`. |
| 104 | +/// By default, [`ListingTable`] is registered. |
| 105 | +struct TableTypeRegistry { |
| 106 | + listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>, |
| 107 | +} |
| 108 | + |
| 109 | +impl Debug for TableTypeRegistry { |
| 110 | + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
| 111 | + f.debug_struct("TableTypeRegistry") |
| 112 | + .field( |
| 113 | + "listing_table_accessors", |
| 114 | + &self |
| 115 | + .listing_table_accessors |
| 116 | + .iter() |
| 117 | + .map(|r| r.value().0) |
| 118 | + .collect_vec(), |
| 119 | + ) |
| 120 | + .finish() |
| 121 | + } |
| 122 | +} |
| 123 | + |
| 124 | +impl Default for TableTypeRegistry { |
| 125 | + fn default() -> Self { |
| 126 | + let new = Self { |
| 127 | + listing_table_accessors: DashMap::new(), |
| 128 | + }; |
| 129 | + new.register_listing_table::<ListingTable>(); |
| 130 | + |
| 131 | + new |
| 132 | + } |
| 133 | +} |
| 134 | + |
| 135 | +impl TableTypeRegistry { |
| 136 | + fn register_listing_table<T: ListingTableLike>(&self) { |
| 137 | + self.listing_table_accessors.insert( |
| 138 | + TypeId::of::<T>(), |
| 139 | + ( |
| 140 | + type_name::<T>(), |
| 141 | + Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)), |
| 142 | + ), |
| 143 | + ); |
| 144 | + } |
| 145 | + |
| 146 | + fn cast_to_listing_table<'a>( |
| 147 | + &'a self, |
| 148 | + table: &'a dyn TableProvider, |
| 149 | + ) -> Option<&'a dyn ListingTableLike> { |
| 150 | + self.listing_table_accessors |
| 151 | + .get(&table.as_any().type_id()) |
| 152 | + .and_then(|r| r.value().1(table.as_any())) |
| 153 | + } |
| 154 | +} |
0 commit comments