Skip to content

Commit a4e3691

Browse files
committed
feat: add pg_am and pg_namespace table
1 parent 3e52376 commit a4e3691

File tree

1 file changed

+160
-0
lines changed

1 file changed

+160
-0
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const PG_CATALOG_TABLE_PG_ATTRIBUTE: &str = "pg_attribute";
1919
const PG_CATALOG_TABLE_PG_NAMESPACE: &str = "pg_namespace";
2020
const PG_CATALOG_TABLE_PG_PROC: &str = "pg_proc";
2121
const PG_CATALOG_TABLE_PG_DATABASE: &str = "pg_database";
22+
const PG_CATALOG_TABLE_PG_AM: &str = "pg_am";
2223

2324
pub const PG_CATALOG_TABLES: &[&str] = &[
2425
PG_CATALOG_TABLE_PG_TYPE,
@@ -27,6 +28,7 @@ pub const PG_CATALOG_TABLES: &[&str] = &[
2728
PG_CATALOG_TABLE_PG_NAMESPACE,
2829
PG_CATALOG_TABLE_PG_PROC,
2930
PG_CATALOG_TABLE_PG_DATABASE,
31+
PG_CATALOG_TABLE_PG_AM,
3032
];
3133

3234
// Create custom schema provider for pg_catalog
@@ -48,12 +50,19 @@ impl SchemaProvider for PgCatalogSchemaProvider {
4850
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
4951
match name.to_ascii_lowercase().as_str() {
5052
PG_CATALOG_TABLE_PG_TYPE => Some(self.create_pg_type_table()).transpose(),
53+
PG_CATALOG_TABLE_PG_AM => Some(self.create_pg_am_table()).transpose(),
5154
PG_CATALOG_TABLE_PG_CLASS => {
5255
let table = Arc::new(PgClassTable::new(self.catalog_list.clone()));
5356
Ok(Some(Arc::new(
5457
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
5558
)))
5659
}
60+
PG_CATALOG_TABLE_PG_NAMESPACE => {
61+
let table = Arc::new(PgNamespaceTable::new(self.catalog_list.clone()));
62+
Ok(Some(Arc::new(
63+
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
64+
)))
65+
}
5766
_ => Ok(None),
5867
}
5968
}
@@ -84,6 +93,41 @@ impl PgCatalogSchemaProvider {
8493

8594
Ok(Arc::new(provider))
8695
}
96+
97+
/// Create a mock empty table for pg_am
98+
fn create_pg_am_table(&self) -> Result<Arc<dyn TableProvider>> {
99+
// Define the schema for pg_am
100+
// This matches PostgreSQL's pg_am table columns
101+
let schema = Arc::new(Schema::new(vec![
102+
Field::new("oid", DataType::Int32, false), // Object identifier
103+
Field::new("amname", DataType::Utf8, false), // Name of the access method
104+
Field::new("amhandler", DataType::Int32, false), // OID of handler function
105+
Field::new("amtype", DataType::Utf8, false), // Type of access method (i=index, t=table)
106+
Field::new("amstrategies", DataType::Int32, false), // Number of operator strategies
107+
Field::new("amsupport", DataType::Int32, false), // Number of support routines
108+
Field::new("amcanorder", DataType::Boolean, false), // Does AM support ordered scans?
109+
Field::new("amcanorderbyop", DataType::Boolean, false), // Does AM support order by operator result?
110+
Field::new("amcanbackward", DataType::Boolean, false), // Does AM support backward scanning?
111+
Field::new("amcanunique", DataType::Boolean, false), // Does AM support unique indexes?
112+
Field::new("amcanmulticol", DataType::Boolean, false), // Does AM support multi-column indexes?
113+
Field::new("amoptionalkey", DataType::Boolean, false), // Can first index column be omitted in search?
114+
Field::new("amsearcharray", DataType::Boolean, false), // Does AM support ScalarArrayOpExpr searches?
115+
Field::new("amsearchnulls", DataType::Boolean, false), // Does AM support searching for NULL/NOT NULL?
116+
Field::new("amstorage", DataType::Boolean, false), // Can storage type differ from column type?
117+
Field::new("amclusterable", DataType::Boolean, false), // Can index be clustered on?
118+
Field::new("ampredlocks", DataType::Boolean, false), // Does AM manage fine-grained predicate locks?
119+
Field::new("amcanparallel", DataType::Boolean, false), // Does AM support parallel scan?
120+
Field::new("amcanbeginscan", DataType::Boolean, false), // Does AM support BRIN index scans?
121+
Field::new("amcanmarkpos", DataType::Boolean, false), // Does AM support mark/restore positions?
122+
Field::new("amcanfetch", DataType::Boolean, false), // Does AM support fetching specific tuples?
123+
Field::new("amkeytype", DataType::Int32, false), // Type of data in index
124+
]));
125+
126+
// Create memory table with schema
127+
let provider = MemTable::try_new(schema, vec![])?;
128+
129+
Ok(Arc::new(provider))
130+
}
87131
}
88132

89133
#[derive(Debug)]
@@ -291,3 +335,119 @@ impl PartitionStream for PgClassTable {
291335
))
292336
}
293337
}
338+
339+
#[derive(Debug)]
340+
struct PgNamespaceTable {
341+
schema: SchemaRef,
342+
catalog_list: Arc<dyn CatalogProviderList>,
343+
}
344+
345+
impl PgNamespaceTable {
346+
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
347+
// Define the schema for pg_namespace
348+
// This matches the columns from PostgreSQL's pg_namespace
349+
let schema = Arc::new(Schema::new(vec![
350+
Field::new("oid", DataType::Int32, false), // Object identifier
351+
Field::new("nspname", DataType::Utf8, false), // Name of the namespace (schema)
352+
Field::new("nspowner", DataType::Int32, false), // Owner of the namespace
353+
Field::new("nspacl", DataType::Utf8, true), // Access privileges
354+
Field::new("options", DataType::Utf8, true), // Schema-level options
355+
]));
356+
357+
Self {
358+
schema,
359+
catalog_list,
360+
}
361+
}
362+
363+
/// Generate record batches based on the current state of the catalog
364+
async fn get_data(
365+
schema: SchemaRef,
366+
catalog_list: Arc<dyn CatalogProviderList>,
367+
) -> Result<RecordBatch> {
368+
// Vectors to store column data
369+
let mut oids = Vec::new();
370+
let mut nspnames = Vec::new();
371+
let mut nspowners = Vec::new();
372+
let mut nspacls: Vec<Option<String>> = Vec::new();
373+
let mut options: Vec<Option<String>> = Vec::new();
374+
375+
// Start OID counter (should be consistent with the values used in pg_class)
376+
let mut next_oid = 10000;
377+
378+
// Add standard PostgreSQL system schemas
379+
// pg_catalog schema (OID 11)
380+
oids.push(11);
381+
nspnames.push("pg_catalog".to_string());
382+
nspowners.push(10); // Default superuser
383+
nspacls.push(None);
384+
options.push(None);
385+
386+
// public schema (OID 2200)
387+
oids.push(2200);
388+
nspnames.push("public".to_string());
389+
nspowners.push(10); // Default superuser
390+
nspacls.push(None);
391+
options.push(None);
392+
393+
// information_schema (OID 12)
394+
oids.push(12);
395+
nspnames.push("information_schema".to_string());
396+
nspowners.push(10); // Default superuser
397+
nspacls.push(None);
398+
options.push(None);
399+
400+
// Now add all schemas from DataFusion catalogs
401+
for catalog_name in catalog_list.catalog_names() {
402+
if let Some(catalog) = catalog_list.catalog(&catalog_name) {
403+
for schema_name in catalog.schema_names() {
404+
// Skip schemas we've already added as system schemas
405+
if schema_name == "pg_catalog"
406+
|| schema_name == "public"
407+
|| schema_name == "information_schema"
408+
{
409+
continue;
410+
}
411+
412+
let schema_oid = next_oid;
413+
next_oid += 1;
414+
415+
oids.push(schema_oid);
416+
nspnames.push(schema_name.clone());
417+
nspowners.push(10); // Default owner
418+
nspacls.push(None);
419+
options.push(None);
420+
}
421+
}
422+
}
423+
424+
// Create Arrow arrays from the collected data
425+
let arrays: Vec<ArrayRef> = vec![
426+
Arc::new(Int32Array::from(oids)),
427+
Arc::new(StringArray::from(nspnames)),
428+
Arc::new(Int32Array::from(nspowners)),
429+
Arc::new(StringArray::from_iter(nspacls.into_iter())),
430+
Arc::new(StringArray::from_iter(options.into_iter())),
431+
];
432+
433+
// Create a full record batch
434+
let batch = RecordBatch::try_new(schema.clone(), arrays)?;
435+
436+
Ok(batch)
437+
}
438+
}
439+
440+
impl PartitionStream for PgNamespaceTable {
441+
fn schema(&self) -> &SchemaRef {
442+
&self.schema
443+
}
444+
445+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
446+
let catalog_list = self.catalog_list.clone();
447+
let schema = Arc::clone(&self.schema);
448+
Box::pin(RecordBatchStreamAdapter::new(
449+
schema.clone(),
450+
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
451+
))
452+
}
453+
}

0 commit comments

Comments
 (0)