Skip to content

Commit ca8f555

Browse files
committed
feat: add pg_database
1 parent a4e3691 commit ca8f555

File tree

1 file changed

+144
-0
lines changed

1 file changed

+144
-0
lines changed

datafusion-postgres/src/pg_catalog.rs

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,12 @@ impl SchemaProvider for PgCatalogSchemaProvider {
6363
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
6464
)))
6565
}
66+
PG_CATALOG_TABLE_PG_DATABASE => {
67+
let table = Arc::new(PgDatabaseTable::new(self.catalog_list.clone()));
68+
Ok(Some(Arc::new(
69+
StreamingTable::try_new(Arc::clone(table.schema()), vec![table]).unwrap(),
70+
)))
71+
}
6672
_ => Ok(None),
6773
}
6874
}
@@ -451,3 +457,141 @@ impl PartitionStream for PgNamespaceTable {
451457
))
452458
}
453459
}
460+
461+
#[derive(Debug)]
462+
struct PgDatabaseTable {
463+
schema: SchemaRef,
464+
catalog_list: Arc<dyn CatalogProviderList>,
465+
}
466+
467+
impl PgDatabaseTable {
468+
pub fn new(catalog_list: Arc<dyn CatalogProviderList>) -> Self {
469+
// Define the schema for pg_database
470+
// This matches PostgreSQL's pg_database table columns
471+
let schema = Arc::new(Schema::new(vec![
472+
Field::new("oid", DataType::Int32, false), // Object identifier
473+
Field::new("datname", DataType::Utf8, false), // Database name
474+
Field::new("datdba", DataType::Int32, false), // Database owner's user ID
475+
Field::new("encoding", DataType::Int32, false), // Character encoding
476+
Field::new("datcollate", DataType::Utf8, false), // LC_COLLATE for this database
477+
Field::new("datctype", DataType::Utf8, false), // LC_CTYPE for this database
478+
Field::new("datistemplate", DataType::Boolean, false), // If true, database can be used as a template
479+
Field::new("datallowconn", DataType::Boolean, false), // If false, no one can connect to this database
480+
Field::new("datconnlimit", DataType::Int32, false), // Max number of concurrent connections (-1=no limit)
481+
Field::new("datlastsysoid", DataType::Int32, false), // Last system OID in database
482+
Field::new("datfrozenxid", DataType::Int32, false), // Frozen XID for this database
483+
Field::new("datminmxid", DataType::Int32, false), // Minimum multixact ID
484+
Field::new("dattablespace", DataType::Int32, false), // Default tablespace for this database
485+
Field::new("datacl", DataType::Utf8, true), // Access privileges
486+
]));
487+
488+
Self {
489+
schema,
490+
catalog_list,
491+
}
492+
}
493+
494+
/// Generate record batches based on the current state of the catalog
495+
async fn get_data(
496+
schema: SchemaRef,
497+
catalog_list: Arc<dyn CatalogProviderList>,
498+
) -> Result<RecordBatch> {
499+
// Vectors to store column data
500+
let mut oids = Vec::new();
501+
let mut datnames = Vec::new();
502+
let mut datdbas = Vec::new();
503+
let mut encodings = Vec::new();
504+
let mut datcollates = Vec::new();
505+
let mut datctypes = Vec::new();
506+
let mut datistemplates = Vec::new();
507+
let mut datallowconns = Vec::new();
508+
let mut datconnlimits = Vec::new();
509+
let mut datlastsysoids = Vec::new();
510+
let mut datfrozenxids = Vec::new();
511+
let mut datminmxids = Vec::new();
512+
let mut dattablespaces = Vec::new();
513+
let mut datacles: Vec<Option<String>> = Vec::new();
514+
515+
// Start OID counter (this is simplistic and would need to be more robust in practice)
516+
let mut next_oid = 16384; // Standard PostgreSQL starting OID for user databases
517+
518+
// Add a record for each catalog (treating catalogs as "databases")
519+
for catalog_name in catalog_list.catalog_names() {
520+
let oid = next_oid;
521+
next_oid += 1;
522+
523+
oids.push(oid);
524+
datnames.push(catalog_name.clone());
525+
datdbas.push(10); // Default owner (assuming 10 = postgres user)
526+
encodings.push(6); // 6 = UTF8 in PostgreSQL
527+
datcollates.push("en_US.UTF-8".to_string()); // Default collation
528+
datctypes.push("en_US.UTF-8".to_string()); // Default ctype
529+
datistemplates.push(false);
530+
datallowconns.push(true);
531+
datconnlimits.push(-1); // No connection limit
532+
datlastsysoids.push(100000); // Arbitrary last system OID
533+
datfrozenxids.push(1); // Simplified transaction ID
534+
datminmxids.push(1); // Simplified multixact ID
535+
dattablespaces.push(1663); // Default tablespace (1663 = pg_default in PostgreSQL)
536+
datacles.push(None); // No specific ACLs
537+
}
538+
539+
// Always include a "postgres" database entry if not already present
540+
// (This is for compatibility with tools that expect it)
541+
if !datnames.contains(&"postgres".to_string()) {
542+
let oid = next_oid;
543+
544+
oids.push(oid);
545+
datnames.push("postgres".to_string());
546+
datdbas.push(10);
547+
encodings.push(6);
548+
datcollates.push("en_US.UTF-8".to_string());
549+
datctypes.push("en_US.UTF-8".to_string());
550+
datistemplates.push(false);
551+
datallowconns.push(true);
552+
datconnlimits.push(-1);
553+
datlastsysoids.push(100000);
554+
datfrozenxids.push(1);
555+
datminmxids.push(1);
556+
dattablespaces.push(1663);
557+
datacles.push(None);
558+
}
559+
560+
// Create Arrow arrays from the collected data
561+
let arrays: Vec<ArrayRef> = vec![
562+
Arc::new(Int32Array::from(oids)),
563+
Arc::new(StringArray::from(datnames)),
564+
Arc::new(Int32Array::from(datdbas)),
565+
Arc::new(Int32Array::from(encodings)),
566+
Arc::new(StringArray::from(datcollates)),
567+
Arc::new(StringArray::from(datctypes)),
568+
Arc::new(BooleanArray::from(datistemplates)),
569+
Arc::new(BooleanArray::from(datallowconns)),
570+
Arc::new(Int32Array::from(datconnlimits)),
571+
Arc::new(Int32Array::from(datlastsysoids)),
572+
Arc::new(Int32Array::from(datfrozenxids)),
573+
Arc::new(Int32Array::from(datminmxids)),
574+
Arc::new(Int32Array::from(dattablespaces)),
575+
Arc::new(StringArray::from_iter(datacles.into_iter())),
576+
];
577+
578+
// Create a full record batch
579+
let full_batch = RecordBatch::try_new(schema.clone(), arrays)?;
580+
Ok(full_batch)
581+
}
582+
}
583+
584+
impl PartitionStream for PgDatabaseTable {
585+
fn schema(&self) -> &SchemaRef {
586+
&self.schema
587+
}
588+
589+
fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
590+
let catalog_list = self.catalog_list.clone();
591+
let schema = Arc::clone(&self.schema);
592+
Box::pin(RecordBatchStreamAdapter::new(
593+
schema.clone(),
594+
futures::stream::once(async move { Self::get_data(schema, catalog_list).await }),
595+
))
596+
}
597+
}

0 commit comments

Comments
 (0)