|
| 1 | +use std::sync::Arc; |
| 2 | + |
| 3 | +use async_trait::async_trait; |
| 4 | +use datafusion::arrow::array::{ |
| 5 | + ArrayRef, BooleanArray, Float64Array, Int16Array, Int32Array, RecordBatch, StringArray, |
| 6 | +}; |
| 7 | +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; |
| 8 | +use datafusion::catalog::streaming::StreamingTable; |
| 9 | +use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider}; |
| 10 | +use datafusion::datasource::TableProvider; |
| 11 | +use datafusion::error::Result; |
| 12 | +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; |
| 13 | +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; |
| 14 | +use datafusion::physical_plan::streaming::PartitionStream; |
| 15 | + |
| 16 | +const PG_CATALOG_TABLE_PG_TYPE: &str = "pg_type"; |
| 17 | +const PG_CATALOG_TABLE_PG_CLASS: &str = "pg_class"; |
| 18 | +const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute"; |
| 19 | +const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace"; |
| 20 | +const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc"; |
| 21 | +const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database"; |
| 22 | + |
| 23 | +pub const PG_CATALOG_TABLES: &[&str] = &[ |
| 24 | + PG_CATALOG_TABLE_PG_TYPE, |
| 25 | + PG_CATALOG_TABLE_PG_CLASS, |
| 26 | + PG_CATALOG_TABLE_PG_ATTRIBUTE, |
| 27 | + PG_CATALOG_TABLE_PG_NAMESPACE, |
| 28 | + PG_CATALOG_TABLE_PG_PROC, |
| 29 | + PG_CATALOG_TABLE_PG_DATABASE, |
| 30 | +]; |
| 31 | + |
| 32 | +// Create custom schema provider for pg_catalog |
| 33 | +#[derive(Debug)] |
| 34 | +pub struct PgCatalogSchemaProvider { |
| 35 | + catalog_list: Arc<dyn CatalogProviderList>, |
| 36 | +} |
| 37 | + |
| 38 | +#[async_trait] |
| 39 | +impl SchemaProvider for PgCatalogSchemaProvider { |
| 40 | + fn as_any(&self) -> &dyn std::any::Any { |
| 41 | + self |
| 42 | + } |
| 43 | + |
| 44 | + fn table_names(&self) -> Vec<String> { |
| 45 | + PG_CATALOG_TABLES.iter().map(ToString::to_string).collect() |
| 46 | + } |
| 47 | + |
| 48 | + async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { |
| 49 | + match name.to_ascii_lowercase().as_str() { |
| 50 | + PG_CATALOG_TABLE_PG_TYPE => Some(self.create_pg_type_table()).transpose(), |
| 51 | + PG_CATALOG_TABLE_PG_CLASS => { |
| 52 | + let table = Arc::new(PgClassTable::new(self.catalog_list.clone())); |
| 53 | + Ok(Some(Arc::new( |
| 54 | + StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(), |
| 55 | + ))) |
| 56 | + } |
| 57 | + _ => Ok(None), |
| 58 | + } |
| 59 | + } |
| 60 | + |
| 61 | + fn table_exist(&self, name: &str) -> bool { |
| 62 | + PG_CATALOG_TABLES.contains(&name.to_ascii_lowercase().as_str()) |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +impl PgCatalogSchemaProvider { |
| 67 | + pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgCatalogSchemaProvider { |
| 68 | + Self { catalog_list } |
| 69 | + } |
| 70 | + |
| 71 | + /// Create a mock empty table for pg_type |
| 72 | + fn create_pg_type_table(&self) -> Result<Arc<dyn TableProvider>> { |
| 73 | + // Define schema for pg_type |
| 74 | + let schema = Arc::new(Schema::new(vec![ |
| 75 | + Field::new("oid", DataType::Int32, false), |
| 76 | + Field::new("typname", DataType::Utf8, false), |
| 77 | + Field::new("typnamespace", DataType::Int32, false), |
| 78 | + Field::new("typlen", DataType::Int16, false), |
| 79 | + // Add other necessary columns |
| 80 | + ])); |
| 81 | + |
| 82 | + // Create memory table with schema |
| 83 | + let provider = MemTable::try_new(schema, vec![])?; |
| 84 | + |
| 85 | + Ok(Arc::new(provider)) |
| 86 | + } |
| 87 | +} |
| 88 | + |
| 89 | +#[derive(Debug)] |
| 90 | +struct PgClassTable { |
| 91 | + schema: SchemaRef, |
| 92 | + catalog_list: Arc<dyn CatalogProviderList>, |
| 93 | +} |
| 94 | + |
| 95 | +impl PgClassTable { |
| 96 | + fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgClassTable { |
| 97 | + // Define the schema for pg_class |
| 98 | + // This matches key columns from PostgreSQL's pg_class |
| 99 | + let schema = Arc::new(Schema::new(vec![ |
| 100 | + Field::new("oid", DataType::Int32, false), // Object identifier |
| 101 | + Field::new("relname", DataType::Utf8, false), // Name of the table, index, view, etc. |
| 102 | + Field::new("relnamespace", DataType::Int32, false), // OID of the namespace that contains this relation |
| 103 | + Field::new("reltype", DataType::Int32, false), // OID of the data type (composite type) this table describes |
| 104 | + Field::new("reloftype", DataType::Int32, true), // OID of the composite type for typed table, 0 otherwise |
| 105 | + Field::new("relowner", DataType::Int32, false), // Owner of the relation |
| 106 | + Field::new("relam", DataType::Int32, false), // If this is an index, the access method used |
| 107 | + Field::new("relfilenode", DataType::Int32, false), // Name of the on-disk file of this relation |
| 108 | + Field::new("reltablespace", DataType::Int32, false), // Tablespace OID for this relation |
| 109 | + Field::new("relpages", DataType::Int32, false), // Size of the on-disk representation in pages |
| 110 | + Field::new("reltuples", DataType::Float64, false), // Number of tuples |
| 111 | + Field::new("relallvisible", DataType::Int32, false), // Number of all-visible pages |
| 112 | + Field::new("reltoastrelid", DataType::Int32, false), // OID of the TOAST table |
| 113 | + Field::new("relhasindex", DataType::Boolean, false), // True if this is a table and it has (or recently had) any indexes |
| 114 | + Field::new("relisshared", DataType::Boolean, false), // True if this table is shared across all databases |
| 115 | + Field::new("relpersistence", DataType::Utf8, false), // p=permanent table, u=unlogged table, t=temporary table |
| 116 | + Field::new("relkind", DataType::Utf8, false), // r=ordinary table, i=index, S=sequence, v=view, etc. |
| 117 | + Field::new("relnatts", DataType::Int16, false), // Number of user columns |
| 118 | + Field::new("relchecks", DataType::Int16, false), // Number of CHECK constraints |
| 119 | + Field::new("relhasrules", DataType::Boolean, false), // True if table has (or once had) rules |
| 120 | + Field::new("relhastriggers", DataType::Boolean, false), // True if table has (or once had) triggers |
| 121 | + Field::new("relhassubclass", DataType::Boolean, false), // True if table or index has (or once had) any inheritance children |
| 122 | + Field::new("relrowsecurity", DataType::Boolean, false), // True if row security is enabled |
| 123 | + Field::new("relforcerowsecurity", DataType::Boolean, false), // True if row security forced for owners |
| 124 | + Field::new("relispopulated", DataType::Boolean, false), // True if relation is populated (not true for some materialized views) |
| 125 | + Field::new("relreplident", DataType::Utf8, false), // Columns used to form "replica identity" for rows |
| 126 | + Field::new("relispartition", DataType::Boolean, false), // True if table is a partition |
| 127 | + Field::new("relrewrite", DataType::Int32, true), // OID of a rule that rewrites this relation |
| 128 | + Field::new("relfrozenxid", DataType::Int32, false), // All transaction IDs before this have been replaced with a permanent ("frozen") transaction ID |
| 129 | + Field::new("relminmxid", DataType::Int32, false), // All Multixact IDs before this have been replaced with a transaction ID |
| 130 | + ])); |
| 131 | + |
| 132 | + Self { |
| 133 | + schema, |
| 134 | + catalog_list, |
| 135 | + } |
| 136 | + } |
| 137 | + |
| 138 | + /// Generate record batches based on the current state of the catalog |
| 139 | + async fn get_data( |
| 140 | + schema: SchemaRef, |
| 141 | + catalog_list: Arc<dyn CatalogProviderList>, |
| 142 | + ) -> Result<RecordBatch> { |
| 143 | + // Vectors to store column data |
| 144 | + let mut oids = Vec::new(); |
| 145 | + let mut relnames = Vec::new(); |
| 146 | + let mut relnamespaces = Vec::new(); |
| 147 | + let mut reltypes = Vec::new(); |
| 148 | + let mut reloftypes = Vec::new(); |
| 149 | + let mut relowners = Vec::new(); |
| 150 | + let mut relams = Vec::new(); |
| 151 | + let mut relfilenodes = Vec::new(); |
| 152 | + let mut reltablespaces = Vec::new(); |
| 153 | + let mut relpages = Vec::new(); |
| 154 | + let mut reltuples = Vec::new(); |
| 155 | + let mut relallvisibles = Vec::new(); |
| 156 | + let mut reltoastrelids = Vec::new(); |
| 157 | + let mut relhasindexes = Vec::new(); |
| 158 | + let mut relisshareds = Vec::new(); |
| 159 | + let mut relpersistences = Vec::new(); |
| 160 | + let mut relkinds = Vec::new(); |
| 161 | + let mut relnattses = Vec::new(); |
| 162 | + let mut relcheckses = Vec::new(); |
| 163 | + let mut relhasruleses = Vec::new(); |
| 164 | + let mut relhastriggersses = Vec::new(); |
| 165 | + let mut relhassubclasses = Vec::new(); |
| 166 | + let mut relrowsecurities = Vec::new(); |
| 167 | + let mut relforcerowsecurities = Vec::new(); |
| 168 | + let mut relispopulateds = Vec::new(); |
| 169 | + let mut relreplidents = Vec::new(); |
| 170 | + let mut relispartitions = Vec::new(); |
| 171 | + let mut relrewrites = Vec::new(); |
| 172 | + let mut relfrozenxids = Vec::new(); |
| 173 | + let mut relminmxids = Vec::new(); |
| 174 | + |
| 175 | + // Start OID counter (this is simplistic and would need to be more robust in practice) |
| 176 | + let mut next_oid = 10000; |
| 177 | + |
| 178 | + // Iterate through all catalogs and schemas |
| 179 | + for catalog_name in catalog_list.catalog_names() { |
| 180 | + if let Some(catalog) = catalog_list.catalog(&catalog_name) { |
| 181 | + for schema_name in catalog.schema_names() { |
| 182 | + if let Some(schema) = catalog.schema(&schema_name) { |
| 183 | + let schema_oid = next_oid; |
| 184 | + next_oid += 1; |
| 185 | + |
| 186 | + // Add an entry for the schema itself (as a namespace) |
| 187 | + // (In a full implementation, this would go in pg_namespace) |
| 188 | + |
| 189 | + // Now process all tables in this schema |
| 190 | + for table_name in schema.table_names() { |
| 191 | + let table_oid = next_oid; |
| 192 | + next_oid += 1; |
| 193 | + |
| 194 | + if let Some(table) = schema.table(&table_name).await? { |
| 195 | + // TODO: correct table type |
| 196 | + let table_type = "r"; |
| 197 | + |
| 198 | + // Get column count from schema |
| 199 | + let column_count = table.schema().fields().len() as i16; |
| 200 | + |
| 201 | + // Add table entry |
| 202 | + oids.push(table_oid); |
| 203 | + relnames.push(table_name.clone()); |
| 204 | + relnamespaces.push(schema_oid); |
| 205 | + reltypes.push(0); // Simplified: we're not tracking data types |
| 206 | + reloftypes.push(None); |
| 207 | + relowners.push(0); // Simplified: no owner tracking |
| 208 | + relams.push(0); // Default access method |
| 209 | + relfilenodes.push(table_oid); // Use OID as filenode |
| 210 | + reltablespaces.push(0); // Default tablespace |
| 211 | + relpages.push(1); // Default page count |
| 212 | + reltuples.push(0.0); // No row count stats |
| 213 | + relallvisibles.push(0); |
| 214 | + reltoastrelids.push(0); |
| 215 | + relhasindexes.push(false); |
| 216 | + relisshareds.push(false); |
| 217 | + relpersistences.push("p".to_string()); // Permanent |
| 218 | + relkinds.push(table_type.to_string()); |
| 219 | + relnattses.push(column_count); |
| 220 | + relcheckses.push(0); |
| 221 | + relhasruleses.push(false); |
| 222 | + relhastriggersses.push(false); |
| 223 | + relhassubclasses.push(false); |
| 224 | + relrowsecurities.push(false); |
| 225 | + relforcerowsecurities.push(false); |
| 226 | + relispopulateds.push(true); |
| 227 | + relreplidents.push("d".to_string()); // Default |
| 228 | + relispartitions.push(false); |
| 229 | + relrewrites.push(None); |
| 230 | + relfrozenxids.push(0); |
| 231 | + relminmxids.push(0); |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + } |
| 236 | + } |
| 237 | + } |
| 238 | + |
| 239 | + // Create Arrow arrays from the collected data |
| 240 | + let arrays: Vec<ArrayRef> = vec![ |
| 241 | + Arc::new(Int32Array::from(oids)), |
| 242 | + Arc::new(StringArray::from(relnames)), |
| 243 | + Arc::new(Int32Array::from(relnamespaces)), |
| 244 | + Arc::new(Int32Array::from(reltypes)), |
| 245 | + Arc::new(Int32Array::from_iter(reloftypes.into_iter())), |
| 246 | + Arc::new(Int32Array::from(relowners)), |
| 247 | + Arc::new(Int32Array::from(relams)), |
| 248 | + Arc::new(Int32Array::from(relfilenodes)), |
| 249 | + Arc::new(Int32Array::from(reltablespaces)), |
| 250 | + Arc::new(Int32Array::from(relpages)), |
| 251 | + Arc::new(Float64Array::from_iter(reltuples.into_iter())), |
| 252 | + Arc::new(Int32Array::from(relallvisibles)), |
| 253 | + Arc::new(Int32Array::from(reltoastrelids)), |
| 254 | + Arc::new(BooleanArray::from(relhasindexes)), |
| 255 | + Arc::new(BooleanArray::from(relisshareds)), |
| 256 | + Arc::new(StringArray::from(relpersistences)), |
| 257 | + Arc::new(StringArray::from(relkinds)), |
| 258 | + Arc::new(Int16Array::from(relnattses)), |
| 259 | + Arc::new(Int16Array::from(relcheckses)), |
| 260 | + Arc::new(BooleanArray::from(relhasruleses)), |
| 261 | + Arc::new(BooleanArray::from(relhastriggersses)), |
| 262 | + Arc::new(BooleanArray::from(relhassubclasses)), |
| 263 | + Arc::new(BooleanArray::from(relrowsecurities)), |
| 264 | + Arc::new(BooleanArray::from(relforcerowsecurities)), |
| 265 | + Arc::new(BooleanArray::from(relispopulateds)), |
| 266 | + Arc::new(StringArray::from(relreplidents)), |
| 267 | + Arc::new(BooleanArray::from(relispartitions)), |
| 268 | + Arc::new(Int32Array::from_iter(relrewrites.into_iter())), |
| 269 | + Arc::new(Int32Array::from(relfrozenxids)), |
| 270 | + Arc::new(Int32Array::from(relminmxids)), |
| 271 | + ]; |
| 272 | + |
| 273 | + // Create a record batch |
| 274 | + let batch = RecordBatch::try_new(schema.clone(), arrays)?; |
| 275 | + |
| 276 | + Ok(batch) |
| 277 | + } |
| 278 | +} |
| 279 | + |
| 280 | +impl PartitionStream for PgClassTable { |
| 281 | + fn schema(&self) -> &SchemaRef { |
| 282 | + &self.schema |
| 283 | + } |
| 284 | + |
| 285 | + fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream { |
| 286 | + let catalog_list = self.catalog_list.clone(); |
| 287 | + let schema = Arc::clone(&self.schema); |
| 288 | + Box::pin(RecordBatchStreamAdapter::new( |
| 289 | + schema.clone(), |
| 290 | + futures::stream::once(async move { Self::get_data(schema, catalog_list).await }), |
| 291 | + )) |
| 292 | + } |
| 293 | +} |
0 commit comments