Skip to content

Commit 8350ac4

Browse files
committed
feat: add pg_views/pg_matviews/pg_tables views
1 parent 84f3e8a commit 8350ac4

File tree

3 files changed

+170
-234
lines changed

3 files changed

+170
-234
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod pg_database;
2626
mod pg_get_expr_udf;
2727
mod pg_namespace;
2828
mod pg_settings;
29+
mod pg_tables;
2930
mod pg_views;
3031

3132
const PG_CATALOG_TABLE_PG_AGGREGATE: &str = "pg_aggregate";
@@ -90,6 +91,9 @@ const PG_CATALOG_TABLE_PG_TABLESPACE: &str = "pg_tablespace";
9091
const PG_CATALOG_TABLE_PG_TRIGGER: &str = "pg_trigger";
9192
const PG_CATALOG_TABLE_PG_USER_MAPPING: &str = "pg_user_mapping";
9293
const PG_CATALOG_VIEW_PG_SETTINGS: &str = "pg_settings";
94+
const PG_CATALOG_VIEW_PG_VIEWS: &str = "pg_views";
95+
const PG_CATALOG_VIEW_PG_MATVIEWS: &str = "pg_matviews";
96+
const PG_CATALOG_VIEW_PG_TABLES: &str = "pg_tables";
9397

9498
/// Determine PostgreSQL table type (relkind) from DataFusion TableProvider
9599
fn get_table_type(table: &Arc<dyn TableProvider>) -> &'static str {
@@ -185,6 +189,8 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
185189
PG_CATALOG_TABLE_PG_TRIGGER,
186190
PG_CATALOG_TABLE_PG_USER_MAPPING,
187191
PG_CATALOG_VIEW_PG_SETTINGS,
192+
PG_CATALOG_VIEW_PG_VIEWS,
193+
PG_CATALOG_VIEW_PG_MATVIEWS,
188194
];
189195

190196
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd, Ord)]
@@ -316,44 +322,63 @@ impl SchemaProvider for PgCatalogSchemaProvider {
316322
self.oid_counter.clone(),
317323
self.oid_cache.clone(),
318324
));
319-
Ok(Some(Arc::new(
320-
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
321-
)))
325+
Ok(Some(Arc::new(StreamingTable::try_new(
326+
Arc::clone(table.schema()),
327+
vec![table],
328+
)?)))
322329
}
323330
PG_CATALOG_TABLE_PG_CLASS => {
324331
let table = Arc::new(pg_class::PgClassTable::new(
325332
self.catalog_list.clone(),
326333
self.oid_counter.clone(),
327334
self.oid_cache.clone(),
328335
));
329-
Ok(Some(Arc::new(
330-
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
331-
)))
336+
Ok(Some(Arc::new(StreamingTable::try_new(
337+
Arc::clone(table.schema()),
338+
vec![table],
339+
)?)))
332340
}
333341
PG_CATALOG_TABLE_PG_DATABASE => {
334342
let table = Arc::new(pg_database::PgDatabaseTable::new(
335343
self.catalog_list.clone(),
336344
self.oid_counter.clone(),
337345
self.oid_cache.clone(),
338346
));
339-
Ok(Some(Arc::new(
340-
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
341-
)))
347+
Ok(Some(Arc::new(StreamingTable::try_new(
348+
Arc::clone(table.schema()),
349+
vec![table],
350+
)?)))
342351
}
343352
PG_CATALOG_TABLE_PG_NAMESPACE => {
344353
let table = Arc::new(pg_namespace::PgNamespaceTable::new(
345354
self.catalog_list.clone(),
346355
self.oid_counter.clone(),
347356
self.oid_cache.clone(),
348357
));
349-
Ok(Some(Arc::new(
350-
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
351-
)))
358+
Ok(Some(Arc::new(StreamingTable::try_new(
359+
Arc::clone(table.schema()),
360+
vec![table],
361+
)?)))
362+
}
363+
PG_CATALOG_VIEW_PG_TABLES => {
364+
let table = Arc::new(pg_tables::PgTablesTable::new(self.catalog_list.clone()));
365+
Ok(Some(Arc::new(StreamingTable::try_new(
366+
Arc::clone(table.schema()),
367+
vec![table],
368+
)?)))
352369
}
353370
PG_CATALOG_VIEW_PG_SETTINGS => {
354371
let table = pg_settings::PgSettingsView::try_new()?;
355372
Ok(Some(Arc::new(table.try_into_memtable()?)))
356373
}
374+
PG_CATALOG_VIEW_PG_VIEWS => {
375+
let table = pg_views::PgViewsTable::new();
376+
Ok(Some(Arc::new(table.try_into_memtable()?)))
377+
}
378+
PG_CATALOG_VIEW_PG_MATVIEWS => {
379+
let table = pg_views::PgMatviewsTable::new();
380+
Ok(Some(Arc::new(table.try_into_memtable()?)))
381+
}
357382

358383
_ => Ok(None),
359384
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use std::sync::Arc;
2+
3+
use datafusion::arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
4+
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5+
use datafusion::catalog::CatalogProviderList;
6+
use datafusion::error::Result;
7+
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
8+
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
9+
use datafusion::physical_plan::streaming::PartitionStream;
10+
11+
#[derive(Debug, Clone)]
12+
pub(crate) struct PgTablesTable {
13+
schema: SchemaRef,
14+
catalog_list: Arc<dyn CatalogProviderList>,
15+
}
16+
17+
impl PgTablesTable {
18+
pub(crate) fn new(catalog_list: Arc<dyn CatalogProviderList>) -> PgTablesTable {
19+
// Define the schema for pg_class
20+
// This matches key columns from PostgreSQL's pg_class
21+
let schema = Arc::new(Schema::new(vec![
22+
Field::new("schemaname", DataType::Utf8, false),
23+
Field::new("tablename", DataType::Utf8, false),
24+
Field::new("tableowner", DataType::Utf8, false),
25+
Field::new("tablespace", DataType::Utf8, true),
26+
Field::new("hasindex", DataType::Boolean, false),
27+
Field::new("hasrules", DataType::Boolean, false),
28+
Field::new("hastriggers", DataType::Boolean, false),
29+
Field::new("rowsecurity", DataType::Boolean, false),
30+
]));
31+
32+
Self {
33+
schema,
34+
catalog_list,
35+
}
36+
}
37+
38+
/// Generate record batches based on the current state of the catalog
39+
async fn get_data(this: PgTablesTable) -> Result<RecordBatch> {
40+
// Vectors to store column data
41+
let mut schema_names = Vec::new();
42+
let mut table_names = Vec::new();
43+
let mut table_owners = Vec::new();
44+
let mut table_spaces: Vec<Option<String>> = Vec::new();
45+
let mut has_index = Vec::new();
46+
let mut has_rules = Vec::new();
47+
let mut has_triggers = Vec::new();
48+
let mut row_security = Vec::new();
49+
50+
// Iterate through all catalogs and schemas
51+
for catalog_name in this.catalog_list.catalog_names() {
52+
if let Some(catalog) = this.catalog_list.catalog(&catalog_name) {
53+
for schema_name in catalog.schema_names() {
54+
if let Some(schema) = catalog.schema(&schema_name) {
55+
// Now process all tables in this schema
56+
for table_name in schema.table_names() {
57+
schema_names.push(schema_name.to_string());
58+
table_names.push(table_name.to_string());
59+
table_owners.push("postgres".to_string());
60+
table_spaces.push(None);
61+
has_index.push(false);
62+
has_rules.push(false);
63+
has_triggers.push(false);
64+
row_security.push(false);
65+
}
66+
}
67+
}
68+
}
69+
}
70+
71+
// Create Arrow arrays from the collected data
72+
let arrays: Vec<ArrayRef> = vec![
73+
Arc::new(StringArray::from(schema_names)),
74+
Arc::new(StringArray::from(table_names)),
75+
Arc::new(StringArray::from(table_owners)),
76+
Arc::new(StringArray::from(table_spaces)),
77+
Arc::new(BooleanArray::from(has_index)),
78+
Arc::new(BooleanArray::from(has_rules)),
79+
Arc::new(BooleanArray::from(has_triggers)),
80+
Arc::new(BooleanArray::from(row_security)),
81+
];
82+
83+
// Create a record batch
84+
let batch = RecordBatch::try_new(this.schema.clone(), arrays)?;
85+
86+
Ok(batch)
87+
}
88+
}
89+
90+
impl PartitionStream for PgTablesTable {
91+
fn schema(&self) -> &SchemaRef {
92+
&self.schema
93+
}
94+
95+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
96+
let this = self.clone();
97+
Box::pin(RecordBatchStreamAdapter::new(
98+
this.schema.clone(),
99+
futures::stream::once(async move { PgTablesTable::get_data(this).await }),
100+
))
101+
}
102+
}

0 commit comments

Comments
 (0)